rbd: get rid of some version parameters
[linux-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
770eba6e 83#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e 110
500d0c0f
AE
111 u64 stripe_unit;
112 u64 stripe_count;
59c2be1e
YS
113};
114
0d7dbfce
AE
115/*
116 * An rbd image specification.
117 *
118 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
119 * identify an image. Each rbd_dev structure includes a pointer to
120 * an rbd_spec structure that encapsulates this identity.
121 *
122 * Each of the id's in an rbd_spec has an associated name. For a
123 * user-mapped image, the names are supplied and the id's associated
124 * with them are looked up. For a layered image, a parent image is
125 * defined by the tuple, and the names are looked up.
126 *
127 * An rbd_dev structure contains a parent_spec pointer which is
128 * non-null if the image it represents is a child in a layered
129 * image. This pointer will refer to the rbd_spec structure used
130 * by the parent rbd_dev for its own identity (i.e., the structure
131 * is shared between the parent and child).
132 *
133 * Since these structures are populated once, during the discovery
134 * phase of image construction, they are effectively immutable so
135 * we make no effort to synchronize access to them.
136 *
137 * Note that code herein does not assume the image name is known (it
138 * could be a null pointer).
0d7dbfce
AE
139 */
140struct rbd_spec {
141 u64 pool_id;
ecb4dc22 142 const char *pool_name;
0d7dbfce 143
ecb4dc22
AE
144 const char *image_id;
145 const char *image_name;
0d7dbfce
AE
146
147 u64 snap_id;
ecb4dc22 148 const char *snap_name;
0d7dbfce
AE
149
150 struct kref kref;
151};
152
602adf40 153/*
f0f8cef5 154 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
155 */
156struct rbd_client {
157 struct ceph_client *client;
158 struct kref kref;
159 struct list_head node;
160};
161
bf0d5f50
AE
162struct rbd_img_request;
163typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164
165#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
166
167struct rbd_obj_request;
168typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169
9969ebc5
AE
170enum obj_request_type {
171 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
172};
bf0d5f50 173
926f9b3f
AE
174enum obj_req_flags {
175 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 176 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
177 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
178 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
179};
180
bf0d5f50
AE
181struct rbd_obj_request {
182 const char *object_name;
183 u64 offset; /* object start byte */
184 u64 length; /* bytes from offset */
926f9b3f 185 unsigned long flags;
bf0d5f50 186
c5b5ef6c
AE
187 /*
188 * An object request associated with an image will have its
189 * img_data flag set; a standalone object request will not.
190 *
191 * A standalone object request will have which == BAD_WHICH
192 * and a null obj_request pointer.
193 *
194 * An object request initiated in support of a layered image
195 * object (to check for its existence before a write) will
196 * have which == BAD_WHICH and a non-null obj_request pointer.
197 *
198 * Finally, an object request for rbd image data will have
199 * which != BAD_WHICH, and will have a non-null img_request
200 * pointer. The value of which will be in the range
201 * 0..(img_request->obj_request_count-1).
202 */
203 union {
204 struct rbd_obj_request *obj_request; /* STAT op */
205 struct {
206 struct rbd_img_request *img_request;
207 u64 img_offset;
208 /* links for img_request->obj_requests list */
209 struct list_head links;
210 };
211 };
bf0d5f50
AE
212 u32 which; /* posn image request list */
213
214 enum obj_request_type type;
788e2df3
AE
215 union {
216 struct bio *bio_list;
217 struct {
218 struct page **pages;
219 u32 page_count;
220 };
221 };
0eefd470 222 struct page **copyup_pages;
bf0d5f50
AE
223
224 struct ceph_osd_request *osd_req;
225
226 u64 xferred; /* bytes transferred */
227 u64 version;
1b83bef2 228 int result;
bf0d5f50
AE
229
230 rbd_obj_callback_t callback;
788e2df3 231 struct completion completion;
bf0d5f50
AE
232
233 struct kref kref;
234};
235
0c425248 236enum img_req_flags {
9849e986
AE
237 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
238 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 239 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
240};
241
bf0d5f50 242struct rbd_img_request {
bf0d5f50
AE
243 struct rbd_device *rbd_dev;
244 u64 offset; /* starting image byte offset */
245 u64 length; /* byte count from offset */
0c425248 246 unsigned long flags;
bf0d5f50 247 union {
9849e986 248 u64 snap_id; /* for reads */
bf0d5f50 249 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
250 };
251 union {
252 struct request *rq; /* block request */
253 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 254 };
3d7efd18 255 struct page **copyup_pages;
bf0d5f50
AE
256 spinlock_t completion_lock;/* protects next_completion */
257 u32 next_completion;
258 rbd_img_callback_t callback;
55f27e09 259 u64 xferred;/* aggregate bytes transferred */
a5a337d4 260 int result; /* first nonzero obj_request result */
bf0d5f50
AE
261
262 u32 obj_request_count;
263 struct list_head obj_requests; /* rbd_obj_request structs */
264
265 struct kref kref;
266};
267
268#define for_each_obj_request(ireq, oreq) \
ef06f4d3 269 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 270#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 271 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 272#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 273 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 274
dfc5606d 275struct rbd_snap {
dfc5606d 276 const char *name;
3591538f 277 u64 size;
dfc5606d
YS
278 struct list_head node;
279 u64 id;
34b13184 280 u64 features;
dfc5606d
YS
281};
282
f84344f3 283struct rbd_mapping {
99c1f08f 284 u64 size;
34b13184 285 u64 features;
f84344f3
AE
286 bool read_only;
287};
288
602adf40
YS
289/*
290 * a single device
291 */
292struct rbd_device {
de71a297 293 int dev_id; /* blkdev unique id */
602adf40
YS
294
295 int major; /* blkdev assigned major */
296 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 297
a30b71b9 298 u32 image_format; /* Either 1 or 2 */
602adf40
YS
299 struct rbd_client *rbd_client;
300
301 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
b82d167b 303 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
304
305 struct rbd_image_header header;
b82d167b 306 unsigned long flags; /* possibly lock protected */
0d7dbfce 307 struct rbd_spec *spec;
602adf40 308
0d7dbfce 309 char *header_name;
971f839a 310
0903e875
AE
311 struct ceph_file_layout layout;
312
59c2be1e 313 struct ceph_osd_event *watch_event;
975241af 314 struct rbd_obj_request *watch_request;
59c2be1e 315
86b00e0d
AE
316 struct rbd_spec *parent_spec;
317 u64 parent_overlap;
2f82ee54 318 struct rbd_device *parent;
86b00e0d 319
c666601a
JD
320 /* protects updating the header */
321 struct rw_semaphore header_rwsem;
f84344f3
AE
322
323 struct rbd_mapping mapping;
602adf40
YS
324
325 struct list_head node;
dfc5606d
YS
326
327 /* list of snapshots */
328 struct list_head snaps;
329
330 /* sysfs related */
331 struct device dev;
b82d167b 332 unsigned long open_count; /* protected by lock */
dfc5606d
YS
333};
334
b82d167b
AE
335/*
336 * Flag bits for rbd_dev->flags. If atomicity is required,
337 * rbd_dev->lock is used to protect access.
338 *
339 * Currently, only the "removing" flag (which is coupled with the
340 * "open_count" field) requires atomic access.
341 */
6d292906
AE
342enum rbd_dev_flags {
343 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 344 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
345};
346
602adf40 347static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 348
602adf40 349static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
350static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
432b8587
AE
352static LIST_HEAD(rbd_client_list); /* clients */
353static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 354
3d7efd18
AE
355static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
304f6808 357static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
304f6808 358
200a6a8b 359static void rbd_dev_device_release(struct device *dev);
6087b51b 360static void rbd_snap_destroy(struct rbd_snap *snap);
dfc5606d 361
f0f8cef5
AE
362static ssize_t rbd_add(struct bus_type *bus, const char *buf,
363 size_t count);
364static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
365 size_t count);
71f293e2 366static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
367
368static struct bus_attribute rbd_bus_attrs[] = {
369 __ATTR(add, S_IWUSR, NULL, rbd_add),
370 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371 __ATTR_NULL
372};
373
374static struct bus_type rbd_bus_type = {
375 .name = "rbd",
376 .bus_attrs = rbd_bus_attrs,
377};
378
379static void rbd_root_dev_release(struct device *dev)
380{
381}
382
383static struct device rbd_root_dev = {
384 .init_name = "rbd",
385 .release = rbd_root_dev_release,
386};
387
06ecc6cb
AE
388static __printf(2, 3)
389void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
390{
391 struct va_format vaf;
392 va_list args;
393
394 va_start(args, fmt);
395 vaf.fmt = fmt;
396 vaf.va = &args;
397
398 if (!rbd_dev)
399 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400 else if (rbd_dev->disk)
401 printk(KERN_WARNING "%s: %s: %pV\n",
402 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403 else if (rbd_dev->spec && rbd_dev->spec->image_name)
404 printk(KERN_WARNING "%s: image %s: %pV\n",
405 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406 else if (rbd_dev->spec && rbd_dev->spec->image_id)
407 printk(KERN_WARNING "%s: id %s: %pV\n",
408 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
409 else /* punt */
410 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411 RBD_DRV_NAME, rbd_dev, &vaf);
412 va_end(args);
413}
414
aafb230e
AE
415#ifdef RBD_DEBUG
416#define rbd_assert(expr) \
417 if (unlikely(!(expr))) { \
418 printk(KERN_ERR "\nAssertion failure in %s() " \
419 "at line %d:\n\n" \
420 "\trbd_assert(%s);\n\n", \
421 __func__, __LINE__, #expr); \
422 BUG(); \
423 }
424#else /* !RBD_DEBUG */
425# define rbd_assert(expr) ((void) 0)
426#endif /* !RBD_DEBUG */
dfc5606d 427
b454e36d 428static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
429static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 431
117973fb
AE
432static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
433static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 434
602adf40
YS
435static int rbd_open(struct block_device *bdev, fmode_t mode)
436{
f0f8cef5 437 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 438 bool removing = false;
602adf40 439
f84344f3 440 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
441 return -EROFS;
442
a14ea269 443 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
444 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445 removing = true;
446 else
447 rbd_dev->open_count++;
a14ea269 448 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
449 if (removing)
450 return -ENOENT;
451
42382b70 452 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 453 (void) get_device(&rbd_dev->dev);
f84344f3 454 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 455 mutex_unlock(&ctl_mutex);
340c7a2b 456
602adf40
YS
457 return 0;
458}
459
dfc5606d
YS
460static int rbd_release(struct gendisk *disk, fmode_t mode)
461{
462 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
463 unsigned long open_count_before;
464
a14ea269 465 spin_lock_irq(&rbd_dev->lock);
b82d167b 466 open_count_before = rbd_dev->open_count--;
a14ea269 467 spin_unlock_irq(&rbd_dev->lock);
b82d167b 468 rbd_assert(open_count_before > 0);
dfc5606d 469
42382b70 470 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 471 put_device(&rbd_dev->dev);
42382b70 472 mutex_unlock(&ctl_mutex);
dfc5606d
YS
473
474 return 0;
475}
476
602adf40
YS
477static const struct block_device_operations rbd_bd_ops = {
478 .owner = THIS_MODULE,
479 .open = rbd_open,
dfc5606d 480 .release = rbd_release,
602adf40
YS
481};
482
483/*
484 * Initialize an rbd client instance.
43ae4701 485 * We own *ceph_opts.
602adf40 486 */
f8c38929 487static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
488{
489 struct rbd_client *rbdc;
490 int ret = -ENOMEM;
491
37206ee5 492 dout("%s:\n", __func__);
602adf40
YS
493 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494 if (!rbdc)
495 goto out_opt;
496
497 kref_init(&rbdc->kref);
498 INIT_LIST_HEAD(&rbdc->node);
499
bc534d86
AE
500 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
43ae4701 502 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 503 if (IS_ERR(rbdc->client))
bc534d86 504 goto out_mutex;
43ae4701 505 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
506
507 ret = ceph_open_session(rbdc->client);
508 if (ret < 0)
509 goto out_err;
510
432b8587 511 spin_lock(&rbd_client_list_lock);
602adf40 512 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 513 spin_unlock(&rbd_client_list_lock);
602adf40 514
bc534d86 515 mutex_unlock(&ctl_mutex);
37206ee5 516 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 517
602adf40
YS
518 return rbdc;
519
520out_err:
521 ceph_destroy_client(rbdc->client);
bc534d86
AE
522out_mutex:
523 mutex_unlock(&ctl_mutex);
602adf40
YS
524 kfree(rbdc);
525out_opt:
43ae4701
AE
526 if (ceph_opts)
527 ceph_destroy_options(ceph_opts);
37206ee5
AE
528 dout("%s: error %d\n", __func__, ret);
529
28f259b7 530 return ERR_PTR(ret);
602adf40
YS
531}
532
2f82ee54
AE
533static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534{
535 kref_get(&rbdc->kref);
536
537 return rbdc;
538}
539
602adf40 540/*
1f7ba331
AE
541 * Find a ceph client with specific addr and configuration. If
542 * found, bump its reference count.
602adf40 543 */
1f7ba331 544static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
545{
546 struct rbd_client *client_node;
1f7ba331 547 bool found = false;
602adf40 548
43ae4701 549 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
550 return NULL;
551
1f7ba331
AE
552 spin_lock(&rbd_client_list_lock);
553 list_for_each_entry(client_node, &rbd_client_list, node) {
554 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
555 __rbd_get_client(client_node);
556
1f7ba331
AE
557 found = true;
558 break;
559 }
560 }
561 spin_unlock(&rbd_client_list_lock);
562
563 return found ? client_node : NULL;
602adf40
YS
564}
565
59c2be1e
YS
566/*
567 * mount options
568 */
569enum {
59c2be1e
YS
570 Opt_last_int,
571 /* int args above */
572 Opt_last_string,
573 /* string args above */
cc0538b6
AE
574 Opt_read_only,
575 Opt_read_write,
576 /* Boolean args above */
577 Opt_last_bool,
59c2be1e
YS
578};
579
43ae4701 580static match_table_t rbd_opts_tokens = {
59c2be1e
YS
581 /* int args above */
582 /* string args above */
be466c1c 583 {Opt_read_only, "read_only"},
cc0538b6
AE
584 {Opt_read_only, "ro"}, /* Alternate spelling */
585 {Opt_read_write, "read_write"},
586 {Opt_read_write, "rw"}, /* Alternate spelling */
587 /* Boolean args above */
59c2be1e
YS
588 {-1, NULL}
589};
590
98571b5a
AE
591struct rbd_options {
592 bool read_only;
593};
594
595#define RBD_READ_ONLY_DEFAULT false
596
59c2be1e
YS
597static int parse_rbd_opts_token(char *c, void *private)
598{
43ae4701 599 struct rbd_options *rbd_opts = private;
59c2be1e
YS
600 substring_t argstr[MAX_OPT_ARGS];
601 int token, intval, ret;
602
43ae4701 603 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
604 if (token < 0)
605 return -EINVAL;
606
607 if (token < Opt_last_int) {
608 ret = match_int(&argstr[0], &intval);
609 if (ret < 0) {
610 pr_err("bad mount option arg (not int) "
611 "at '%s'\n", c);
612 return ret;
613 }
614 dout("got int token %d val %d\n", token, intval);
615 } else if (token > Opt_last_int && token < Opt_last_string) {
616 dout("got string token %d val %s\n", token,
617 argstr[0].from);
cc0538b6
AE
618 } else if (token > Opt_last_string && token < Opt_last_bool) {
619 dout("got Boolean token %d\n", token);
59c2be1e
YS
620 } else {
621 dout("got token %d\n", token);
622 }
623
624 switch (token) {
cc0538b6
AE
625 case Opt_read_only:
626 rbd_opts->read_only = true;
627 break;
628 case Opt_read_write:
629 rbd_opts->read_only = false;
630 break;
59c2be1e 631 default:
aafb230e
AE
632 rbd_assert(false);
633 break;
59c2be1e
YS
634 }
635 return 0;
636}
637
602adf40
YS
638/*
639 * Get a ceph client with specific addr and configuration, if one does
640 * not exist create it.
641 */
9d3997fd 642static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 643{
f8c38929 644 struct rbd_client *rbdc;
59c2be1e 645
1f7ba331 646 rbdc = rbd_client_find(ceph_opts);
9d3997fd 647 if (rbdc) /* using an existing client */
43ae4701 648 ceph_destroy_options(ceph_opts);
9d3997fd 649 else
f8c38929 650 rbdc = rbd_client_create(ceph_opts);
602adf40 651
9d3997fd 652 return rbdc;
602adf40
YS
653}
654
655/*
656 * Destroy ceph client
d23a4b3f 657 *
432b8587 658 * Caller must hold rbd_client_list_lock.
602adf40
YS
659 */
660static void rbd_client_release(struct kref *kref)
661{
662 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
37206ee5 664 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 665 spin_lock(&rbd_client_list_lock);
602adf40 666 list_del(&rbdc->node);
cd9d9f5d 667 spin_unlock(&rbd_client_list_lock);
602adf40
YS
668
669 ceph_destroy_client(rbdc->client);
670 kfree(rbdc);
671}
672
673/*
674 * Drop reference to ceph client node. If it's not referenced anymore, release
675 * it.
676 */
9d3997fd 677static void rbd_put_client(struct rbd_client *rbdc)
602adf40 678{
c53d5893
AE
679 if (rbdc)
680 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
681}
682
a30b71b9
AE
683static bool rbd_image_format_valid(u32 image_format)
684{
685 return image_format == 1 || image_format == 2;
686}
687
8e94af8e
AE
688static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689{
103a150f
AE
690 size_t size;
691 u32 snap_count;
692
693 /* The header has to start with the magic rbd header text */
694 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695 return false;
696
db2388b6
AE
697 /* The bio layer requires at least sector-sized I/O */
698
699 if (ondisk->options.order < SECTOR_SHIFT)
700 return false;
701
702 /* If we use u64 in a few spots we may be able to loosen this */
703
704 if (ondisk->options.order > 8 * sizeof (int) - 1)
705 return false;
706
103a150f
AE
707 /*
708 * The size of a snapshot header has to fit in a size_t, and
709 * that limits the number of snapshots.
710 */
711 snap_count = le32_to_cpu(ondisk->snap_count);
712 size = SIZE_MAX - sizeof (struct ceph_snap_context);
713 if (snap_count > size / sizeof (__le64))
714 return false;
715
716 /*
717 * Not only that, but the size of the entire the snapshot
718 * header must also be representable in a size_t.
719 */
720 size -= snap_count * sizeof (__le64);
721 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722 return false;
723
724 return true;
8e94af8e
AE
725}
726
602adf40
YS
727/*
728 * Create a new header structure, translate header format from the on-disk
729 * header.
730 */
731static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 732 struct rbd_image_header_ondisk *ondisk)
602adf40 733{
ccece235 734 u32 snap_count;
58c17b0e 735 size_t len;
d2bb24e5 736 size_t size;
621901d6 737 u32 i;
602adf40 738
6a52325f
AE
739 memset(header, 0, sizeof (*header));
740
103a150f
AE
741 snap_count = le32_to_cpu(ondisk->snap_count);
742
58c17b0e
AE
743 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 745 if (!header->object_prefix)
602adf40 746 return -ENOMEM;
58c17b0e
AE
747 memcpy(header->object_prefix, ondisk->object_prefix, len);
748 header->object_prefix[len] = '\0';
00f1f36f 749
602adf40 750 if (snap_count) {
f785cc1d
AE
751 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
621901d6
AE
753 /* Save a copy of the snapshot names */
754
f785cc1d
AE
755 if (snap_names_len > (u64) SIZE_MAX)
756 return -EIO;
757 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 758 if (!header->snap_names)
6a52325f 759 goto out_err;
f785cc1d
AE
760 /*
761 * Note that rbd_dev_v1_header_read() guarantees
762 * the ondisk buffer we're working with has
763 * snap_names_len bytes beyond the end of the
764 * snapshot id array, this memcpy() is safe.
765 */
766 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767 snap_names_len);
6a52325f 768
621901d6
AE
769 /* Record each snapshot's size */
770
d2bb24e5
AE
771 size = snap_count * sizeof (*header->snap_sizes);
772 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 773 if (!header->snap_sizes)
6a52325f 774 goto out_err;
621901d6
AE
775 for (i = 0; i < snap_count; i++)
776 header->snap_sizes[i] =
777 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40
YS
778 } else {
779 header->snap_names = NULL;
780 header->snap_sizes = NULL;
781 }
849b4260 782
34b13184 783 header->features = 0; /* No features support in v1 images */
602adf40
YS
784 header->obj_order = ondisk->options.order;
785 header->crypt_type = ondisk->options.crypt_type;
786 header->comp_type = ondisk->options.comp_type;
6a52325f 787
621901d6
AE
788 /* Allocate and fill in the snapshot context */
789
f84344f3 790 header->image_size = le64_to_cpu(ondisk->image_size);
468521c1 791
812164f8 792 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6a52325f
AE
793 if (!header->snapc)
794 goto out_err;
505cbb9b 795 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
621901d6 796 for (i = 0; i < snap_count; i++)
468521c1 797 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
798
799 return 0;
800
6a52325f 801out_err:
849b4260 802 kfree(header->snap_sizes);
ccece235 803 header->snap_sizes = NULL;
602adf40 804 kfree(header->snap_names);
ccece235 805 header->snap_names = NULL;
6a52325f
AE
806 kfree(header->object_prefix);
807 header->object_prefix = NULL;
ccece235 808
00f1f36f 809 return -ENOMEM;
602adf40
YS
810}
811
9e15b77d
AE
812static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
813{
814 struct rbd_snap *snap;
815
816 if (snap_id == CEPH_NOSNAP)
817 return RBD_SNAP_HEAD_NAME;
818
819 list_for_each_entry(snap, &rbd_dev->snaps, node)
820 if (snap_id == snap->id)
821 return snap->name;
822
823 return NULL;
824}
825
8b0241f8
AE
826static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
827 const char *snap_name)
602adf40 828{
e86924a8 829 struct rbd_snap *snap;
602adf40 830
8b0241f8
AE
831 list_for_each_entry(snap, &rbd_dev->snaps, node)
832 if (!strcmp(snap_name, snap->name))
833 return snap;
e86924a8 834
8b0241f8 835 return NULL;
602adf40
YS
836}
837
d1cf5788 838static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
602adf40 839{
0d7dbfce 840 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 841 sizeof (RBD_SNAP_HEAD_NAME))) {
99c1f08f 842 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 843 rbd_dev->mapping.features = rbd_dev->header.features;
602adf40 844 } else {
8b0241f8
AE
845 struct rbd_snap *snap;
846
847 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
848 if (!snap)
849 return -ENOENT;
8b0241f8
AE
850 rbd_dev->mapping.size = snap->size;
851 rbd_dev->mapping.features = snap->features;
f84344f3 852 rbd_dev->mapping.read_only = true;
602adf40 853 }
6d292906 854
8b0241f8 855 return 0;
602adf40
YS
856}
857
d1cf5788
AE
858static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
859{
860 rbd_dev->mapping.size = 0;
861 rbd_dev->mapping.features = 0;
862 rbd_dev->mapping.read_only = true;
863}
864
200a6a8b
AE
865static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
866{
867 rbd_dev->mapping.size = 0;
868 rbd_dev->mapping.features = 0;
869 rbd_dev->mapping.read_only = true;
870}
871
98571b5a 872static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 873{
65ccfe21
AE
874 char *name;
875 u64 segment;
876 int ret;
602adf40 877
2fd82b9e 878 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
879 if (!name)
880 return NULL;
881 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 882 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 883 rbd_dev->header.object_prefix, segment);
2fd82b9e 884 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
885 pr_err("error formatting segment name for #%llu (%d)\n",
886 segment, ret);
887 kfree(name);
888 name = NULL;
889 }
602adf40 890
65ccfe21
AE
891 return name;
892}
602adf40 893
65ccfe21
AE
894static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
895{
896 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 897
65ccfe21
AE
898 return offset & (segment_size - 1);
899}
900
901static u64 rbd_segment_length(struct rbd_device *rbd_dev,
902 u64 offset, u64 length)
903{
904 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
905
906 offset &= segment_size - 1;
907
aafb230e 908 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
909 if (offset + length > segment_size)
910 length = segment_size - offset;
911
912 return length;
602adf40
YS
913}
914
029bcbd8
JD
915/*
916 * returns the size of an object in the image
917 */
918static u64 rbd_obj_bytes(struct rbd_image_header *header)
919{
920 return 1 << header->obj_order;
921}
922
602adf40
YS
923/*
924 * bio helpers
925 */
926
927static void bio_chain_put(struct bio *chain)
928{
929 struct bio *tmp;
930
931 while (chain) {
932 tmp = chain;
933 chain = chain->bi_next;
934 bio_put(tmp);
935 }
936}
937
938/*
939 * zeros a bio chain, starting at specific offset
940 */
941static void zero_bio_chain(struct bio *chain, int start_ofs)
942{
943 struct bio_vec *bv;
944 unsigned long flags;
945 void *buf;
946 int i;
947 int pos = 0;
948
949 while (chain) {
950 bio_for_each_segment(bv, chain, i) {
951 if (pos + bv->bv_len > start_ofs) {
952 int remainder = max(start_ofs - pos, 0);
953 buf = bvec_kmap_irq(bv, &flags);
954 memset(buf + remainder, 0,
955 bv->bv_len - remainder);
85b5aaa6 956 bvec_kunmap_irq(buf, &flags);
602adf40
YS
957 }
958 pos += bv->bv_len;
959 }
960
961 chain = chain->bi_next;
962 }
963}
964
b9434c5b
AE
965/*
966 * similar to zero_bio_chain(), zeros data defined by a page array,
967 * starting at the given byte offset from the start of the array and
968 * continuing up to the given end offset. The pages array is
969 * assumed to be big enough to hold all bytes up to the end.
970 */
971static void zero_pages(struct page **pages, u64 offset, u64 end)
972{
973 struct page **page = &pages[offset >> PAGE_SHIFT];
974
975 rbd_assert(end > offset);
976 rbd_assert(end - offset <= (u64)SIZE_MAX);
977 while (offset < end) {
978 size_t page_offset;
979 size_t length;
980 unsigned long flags;
981 void *kaddr;
982
983 page_offset = (size_t)(offset & ~PAGE_MASK);
984 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
985 local_irq_save(flags);
986 kaddr = kmap_atomic(*page);
987 memset(kaddr + page_offset, 0, length);
988 kunmap_atomic(kaddr);
989 local_irq_restore(flags);
990
991 offset += length;
992 page++;
993 }
994}
995
602adf40 996/*
f7760dad
AE
997 * Clone a portion of a bio, starting at the given byte offset
998 * and continuing for the number of bytes indicated.
602adf40 999 */
f7760dad
AE
1000static struct bio *bio_clone_range(struct bio *bio_src,
1001 unsigned int offset,
1002 unsigned int len,
1003 gfp_t gfpmask)
602adf40 1004{
f7760dad
AE
1005 struct bio_vec *bv;
1006 unsigned int resid;
1007 unsigned short idx;
1008 unsigned int voff;
1009 unsigned short end_idx;
1010 unsigned short vcnt;
1011 struct bio *bio;
1012
1013 /* Handle the easy case for the caller */
1014
1015 if (!offset && len == bio_src->bi_size)
1016 return bio_clone(bio_src, gfpmask);
1017
1018 if (WARN_ON_ONCE(!len))
1019 return NULL;
1020 if (WARN_ON_ONCE(len > bio_src->bi_size))
1021 return NULL;
1022 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1023 return NULL;
1024
1025 /* Find first affected segment... */
1026
1027 resid = offset;
1028 __bio_for_each_segment(bv, bio_src, idx, 0) {
1029 if (resid < bv->bv_len)
1030 break;
1031 resid -= bv->bv_len;
602adf40 1032 }
f7760dad 1033 voff = resid;
602adf40 1034
f7760dad 1035 /* ...and the last affected segment */
602adf40 1036
f7760dad
AE
1037 resid += len;
1038 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1039 if (resid <= bv->bv_len)
1040 break;
1041 resid -= bv->bv_len;
1042 }
1043 vcnt = end_idx - idx + 1;
1044
1045 /* Build the clone */
1046
1047 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1048 if (!bio)
1049 return NULL; /* ENOMEM */
602adf40 1050
f7760dad
AE
1051 bio->bi_bdev = bio_src->bi_bdev;
1052 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1053 bio->bi_rw = bio_src->bi_rw;
1054 bio->bi_flags |= 1 << BIO_CLONED;
1055
1056 /*
1057 * Copy over our part of the bio_vec, then update the first
1058 * and last (or only) entries.
1059 */
1060 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1061 vcnt * sizeof (struct bio_vec));
1062 bio->bi_io_vec[0].bv_offset += voff;
1063 if (vcnt > 1) {
1064 bio->bi_io_vec[0].bv_len -= voff;
1065 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1066 } else {
1067 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1068 }
1069
f7760dad
AE
1070 bio->bi_vcnt = vcnt;
1071 bio->bi_size = len;
1072 bio->bi_idx = 0;
1073
1074 return bio;
1075}
1076
1077/*
1078 * Clone a portion of a bio chain, starting at the given byte offset
1079 * into the first bio in the source chain and continuing for the
1080 * number of bytes indicated. The result is another bio chain of
1081 * exactly the given length, or a null pointer on error.
1082 *
1083 * The bio_src and offset parameters are both in-out. On entry they
1084 * refer to the first source bio and the offset into that bio where
1085 * the start of data to be cloned is located.
1086 *
1087 * On return, bio_src is updated to refer to the bio in the source
1088 * chain that contains first un-cloned byte, and *offset will
1089 * contain the offset of that byte within that bio.
1090 */
1091static struct bio *bio_chain_clone_range(struct bio **bio_src,
1092 unsigned int *offset,
1093 unsigned int len,
1094 gfp_t gfpmask)
1095{
1096 struct bio *bi = *bio_src;
1097 unsigned int off = *offset;
1098 struct bio *chain = NULL;
1099 struct bio **end;
1100
1101 /* Build up a chain of clone bios up to the limit */
1102
1103 if (!bi || off >= bi->bi_size || !len)
1104 return NULL; /* Nothing to clone */
602adf40 1105
f7760dad
AE
1106 end = &chain;
1107 while (len) {
1108 unsigned int bi_size;
1109 struct bio *bio;
1110
f5400b7a
AE
1111 if (!bi) {
1112 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1113 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1114 }
f7760dad
AE
1115 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1116 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1117 if (!bio)
1118 goto out_err; /* ENOMEM */
1119
1120 *end = bio;
1121 end = &bio->bi_next;
602adf40 1122
f7760dad
AE
1123 off += bi_size;
1124 if (off == bi->bi_size) {
1125 bi = bi->bi_next;
1126 off = 0;
1127 }
1128 len -= bi_size;
1129 }
1130 *bio_src = bi;
1131 *offset = off;
1132
1133 return chain;
1134out_err:
1135 bio_chain_put(chain);
602adf40 1136
602adf40
YS
1137 return NULL;
1138}
1139
926f9b3f
AE
1140/*
1141 * The default/initial value for all object request flags is 0. For
1142 * each flag, once its value is set to 1 it is never reset to 0
1143 * again.
1144 */
57acbaa7 1145static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1146{
57acbaa7 1147 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1148 struct rbd_device *rbd_dev;
1149
57acbaa7
AE
1150 rbd_dev = obj_request->img_request->rbd_dev;
1151 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1152 obj_request);
1153 }
1154}
1155
57acbaa7 1156static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1157{
1158 smp_mb();
57acbaa7 1159 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1160}
1161
57acbaa7 1162static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1163{
57acbaa7
AE
1164 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1165 struct rbd_device *rbd_dev = NULL;
6365d33a 1166
57acbaa7
AE
1167 if (obj_request_img_data_test(obj_request))
1168 rbd_dev = obj_request->img_request->rbd_dev;
1169 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1170 obj_request);
1171 }
1172}
1173
57acbaa7 1174static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1175{
1176 smp_mb();
57acbaa7 1177 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1178}
1179
5679c59f
AE
1180/*
1181 * This sets the KNOWN flag after (possibly) setting the EXISTS
1182 * flag. The latter is set based on the "exists" value provided.
1183 *
1184 * Note that for our purposes once an object exists it never goes
1185 * away again. It's possible that the response from two existence
1186 * checks are separated by the creation of the target object, and
1187 * the first ("doesn't exist") response arrives *after* the second
1188 * ("does exist"). In that case we ignore the second one.
1189 */
1190static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1191 bool exists)
1192{
1193 if (exists)
1194 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1195 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1196 smp_mb();
1197}
1198
1199static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1200{
1201 smp_mb();
1202 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1203}
1204
1205static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1206{
1207 smp_mb();
1208 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1209}
1210
bf0d5f50
AE
1211static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1212{
37206ee5
AE
1213 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1214 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1215 kref_get(&obj_request->kref);
1216}
1217
1218static void rbd_obj_request_destroy(struct kref *kref);
1219static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1220{
1221 rbd_assert(obj_request != NULL);
37206ee5
AE
1222 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1223 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1224 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1225}
1226
1227static void rbd_img_request_get(struct rbd_img_request *img_request)
1228{
37206ee5
AE
1229 dout("%s: img %p (was %d)\n", __func__, img_request,
1230 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1231 kref_get(&img_request->kref);
1232}
1233
1234static void rbd_img_request_destroy(struct kref *kref);
1235static void rbd_img_request_put(struct rbd_img_request *img_request)
1236{
1237 rbd_assert(img_request != NULL);
37206ee5
AE
1238 dout("%s: img %p (was %d)\n", __func__, img_request,
1239 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1240 kref_put(&img_request->kref, rbd_img_request_destroy);
1241}
1242
1243static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1244 struct rbd_obj_request *obj_request)
1245{
25dcf954
AE
1246 rbd_assert(obj_request->img_request == NULL);
1247
b155e86c 1248 /* Image request now owns object's original reference */
bf0d5f50 1249 obj_request->img_request = img_request;
25dcf954 1250 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1251 rbd_assert(!obj_request_img_data_test(obj_request));
1252 obj_request_img_data_set(obj_request);
bf0d5f50 1253 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1254 img_request->obj_request_count++;
1255 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1256 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1257 obj_request->which);
bf0d5f50
AE
1258}
1259
1260static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1261 struct rbd_obj_request *obj_request)
1262{
1263 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1264
37206ee5
AE
1265 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1266 obj_request->which);
bf0d5f50 1267 list_del(&obj_request->links);
25dcf954
AE
1268 rbd_assert(img_request->obj_request_count > 0);
1269 img_request->obj_request_count--;
1270 rbd_assert(obj_request->which == img_request->obj_request_count);
1271 obj_request->which = BAD_WHICH;
6365d33a 1272 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1273 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1274 obj_request->img_request = NULL;
25dcf954 1275 obj_request->callback = NULL;
bf0d5f50
AE
1276 rbd_obj_request_put(obj_request);
1277}
1278
1279static bool obj_request_type_valid(enum obj_request_type type)
1280{
1281 switch (type) {
9969ebc5 1282 case OBJ_REQUEST_NODATA:
bf0d5f50 1283 case OBJ_REQUEST_BIO:
788e2df3 1284 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1285 return true;
1286 default:
1287 return false;
1288 }
1289}
1290
bf0d5f50
AE
1291static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292 struct rbd_obj_request *obj_request)
1293{
37206ee5
AE
1294 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1295
bf0d5f50
AE
1296 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1297}
1298
1299static void rbd_img_request_complete(struct rbd_img_request *img_request)
1300{
55f27e09 1301
37206ee5 1302 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1303
1304 /*
1305 * If no error occurred, compute the aggregate transfer
1306 * count for the image request. We could instead use
1307 * atomic64_cmpxchg() to update it as each object request
1308 * completes; not clear which way is better off hand.
1309 */
1310 if (!img_request->result) {
1311 struct rbd_obj_request *obj_request;
1312 u64 xferred = 0;
1313
1314 for_each_obj_request(img_request, obj_request)
1315 xferred += obj_request->xferred;
1316 img_request->xferred = xferred;
1317 }
1318
bf0d5f50
AE
1319 if (img_request->callback)
1320 img_request->callback(img_request);
1321 else
1322 rbd_img_request_put(img_request);
1323}
1324
788e2df3
AE
1325/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1326
1327static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1328{
37206ee5
AE
1329 dout("%s: obj %p\n", __func__, obj_request);
1330
788e2df3
AE
1331 return wait_for_completion_interruptible(&obj_request->completion);
1332}
1333
0c425248
AE
1334/*
1335 * The default/initial value for all image request flags is 0. Each
1336 * is conditionally set to 1 at image request initialization time
1337 * and currently never change thereafter.
1338 */
1339static void img_request_write_set(struct rbd_img_request *img_request)
1340{
1341 set_bit(IMG_REQ_WRITE, &img_request->flags);
1342 smp_mb();
1343}
1344
1345static bool img_request_write_test(struct rbd_img_request *img_request)
1346{
1347 smp_mb();
1348 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1349}
1350
9849e986
AE
1351static void img_request_child_set(struct rbd_img_request *img_request)
1352{
1353 set_bit(IMG_REQ_CHILD, &img_request->flags);
1354 smp_mb();
1355}
1356
1357static bool img_request_child_test(struct rbd_img_request *img_request)
1358{
1359 smp_mb();
1360 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1361}
1362
d0b2e944
AE
1363static void img_request_layered_set(struct rbd_img_request *img_request)
1364{
1365 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1366 smp_mb();
1367}
1368
1369static bool img_request_layered_test(struct rbd_img_request *img_request)
1370{
1371 smp_mb();
1372 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1373}
1374
6e2a4505
AE
1375static void
1376rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1377{
b9434c5b
AE
1378 u64 xferred = obj_request->xferred;
1379 u64 length = obj_request->length;
1380
6e2a4505
AE
1381 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1382 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1383 xferred, length);
6e2a4505
AE
1384 /*
1385 * ENOENT means a hole in the image. We zero-fill the
1386 * entire length of the request. A short read also implies
1387 * zero-fill to the end of the request. Either way we
1388 * update the xferred count to indicate the whole request
1389 * was satisfied.
1390 */
b9434c5b 1391 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1392 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1393 if (obj_request->type == OBJ_REQUEST_BIO)
1394 zero_bio_chain(obj_request->bio_list, 0);
1395 else
1396 zero_pages(obj_request->pages, 0, length);
6e2a4505 1397 obj_request->result = 0;
b9434c5b
AE
1398 obj_request->xferred = length;
1399 } else if (xferred < length && !obj_request->result) {
1400 if (obj_request->type == OBJ_REQUEST_BIO)
1401 zero_bio_chain(obj_request->bio_list, xferred);
1402 else
1403 zero_pages(obj_request->pages, xferred, length);
1404 obj_request->xferred = length;
6e2a4505
AE
1405 }
1406 obj_request_done_set(obj_request);
1407}
1408
bf0d5f50
AE
1409static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1410{
37206ee5
AE
1411 dout("%s: obj %p cb %p\n", __func__, obj_request,
1412 obj_request->callback);
bf0d5f50
AE
1413 if (obj_request->callback)
1414 obj_request->callback(obj_request);
788e2df3
AE
1415 else
1416 complete_all(&obj_request->completion);
bf0d5f50
AE
1417}
1418
c47f9371 1419static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1420{
1421 dout("%s: obj %p\n", __func__, obj_request);
1422 obj_request_done_set(obj_request);
1423}
1424
c47f9371 1425static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1426{
57acbaa7 1427 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1428 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1429 bool layered = false;
1430
1431 if (obj_request_img_data_test(obj_request)) {
1432 img_request = obj_request->img_request;
1433 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1434 rbd_dev = img_request->rbd_dev;
57acbaa7 1435 }
8b3e1a56
AE
1436
1437 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1438 obj_request, img_request, obj_request->result,
1439 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1440 if (layered && obj_request->result == -ENOENT &&
1441 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1442 rbd_img_parent_read(obj_request);
1443 else if (img_request)
6e2a4505
AE
1444 rbd_img_obj_request_read_callback(obj_request);
1445 else
1446 obj_request_done_set(obj_request);
bf0d5f50
AE
1447}
1448
c47f9371 1449static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1450{
1b83bef2
SW
1451 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1452 obj_request->result, obj_request->length);
1453 /*
8b3e1a56
AE
1454 * There is no such thing as a successful short write. Set
1455 * it to our originally-requested length.
1b83bef2
SW
1456 */
1457 obj_request->xferred = obj_request->length;
07741308 1458 obj_request_done_set(obj_request);
bf0d5f50
AE
1459}
1460
fbfab539
AE
1461/*
1462 * For a simple stat call there's nothing to do. We'll do more if
1463 * this is part of a write sequence for a layered image.
1464 */
c47f9371 1465static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1466{
37206ee5 1467 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1468 obj_request_done_set(obj_request);
1469}
1470
bf0d5f50
AE
1471static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1472 struct ceph_msg *msg)
1473{
1474 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1475 u16 opcode;
1476
37206ee5 1477 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1478 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1479 if (obj_request_img_data_test(obj_request)) {
1480 rbd_assert(obj_request->img_request);
1481 rbd_assert(obj_request->which != BAD_WHICH);
1482 } else {
1483 rbd_assert(obj_request->which == BAD_WHICH);
1484 }
bf0d5f50 1485
1b83bef2
SW
1486 if (osd_req->r_result < 0)
1487 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1488 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1489
0eefd470 1490 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1491
c47f9371
AE
1492 /*
1493 * We support a 64-bit length, but ultimately it has to be
1494 * passed to blk_end_request(), which takes an unsigned int.
1495 */
1b83bef2 1496 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1497 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1498 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1499 switch (opcode) {
1500 case CEPH_OSD_OP_READ:
c47f9371 1501 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1502 break;
1503 case CEPH_OSD_OP_WRITE:
c47f9371 1504 rbd_osd_write_callback(obj_request);
bf0d5f50 1505 break;
fbfab539 1506 case CEPH_OSD_OP_STAT:
c47f9371 1507 rbd_osd_stat_callback(obj_request);
fbfab539 1508 break;
36be9a76 1509 case CEPH_OSD_OP_CALL:
b8d70035 1510 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1511 case CEPH_OSD_OP_WATCH:
c47f9371 1512 rbd_osd_trivial_callback(obj_request);
9969ebc5 1513 break;
bf0d5f50
AE
1514 default:
1515 rbd_warn(NULL, "%s: unsupported op %hu\n",
1516 obj_request->object_name, (unsigned short) opcode);
1517 break;
1518 }
1519
07741308 1520 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1521 rbd_obj_request_complete(obj_request);
1522}
1523
9d4df01f 1524static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1525{
1526 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1527 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1528 u64 snap_id;
430c28c3 1529
8c042b0d 1530 rbd_assert(osd_req != NULL);
430c28c3 1531
9d4df01f 1532 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1533 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1534 NULL, snap_id, NULL);
1535}
1536
1537static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1538{
1539 struct rbd_img_request *img_request = obj_request->img_request;
1540 struct ceph_osd_request *osd_req = obj_request->osd_req;
1541 struct ceph_snap_context *snapc;
1542 struct timespec mtime = CURRENT_TIME;
1543
1544 rbd_assert(osd_req != NULL);
1545
1546 snapc = img_request ? img_request->snapc : NULL;
1547 ceph_osdc_build_request(osd_req, obj_request->offset,
1548 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1549}
1550
bf0d5f50
AE
1551static struct ceph_osd_request *rbd_osd_req_create(
1552 struct rbd_device *rbd_dev,
1553 bool write_request,
430c28c3 1554 struct rbd_obj_request *obj_request)
bf0d5f50 1555{
bf0d5f50
AE
1556 struct ceph_snap_context *snapc = NULL;
1557 struct ceph_osd_client *osdc;
1558 struct ceph_osd_request *osd_req;
bf0d5f50 1559
6365d33a
AE
1560 if (obj_request_img_data_test(obj_request)) {
1561 struct rbd_img_request *img_request = obj_request->img_request;
1562
0c425248
AE
1563 rbd_assert(write_request ==
1564 img_request_write_test(img_request));
1565 if (write_request)
bf0d5f50 1566 snapc = img_request->snapc;
bf0d5f50
AE
1567 }
1568
1569 /* Allocate and initialize the request, for the single op */
1570
1571 osdc = &rbd_dev->rbd_client->client->osdc;
1572 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1573 if (!osd_req)
1574 return NULL; /* ENOMEM */
bf0d5f50 1575
430c28c3 1576 if (write_request)
bf0d5f50 1577 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1578 else
bf0d5f50 1579 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1580
1581 osd_req->r_callback = rbd_osd_req_callback;
1582 osd_req->r_priv = obj_request;
1583
1584 osd_req->r_oid_len = strlen(obj_request->object_name);
1585 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1586 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1587
1588 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1589
bf0d5f50
AE
1590 return osd_req;
1591}
1592
0eefd470
AE
1593/*
1594 * Create a copyup osd request based on the information in the
1595 * object request supplied. A copyup request has two osd ops,
1596 * a copyup method call, and a "normal" write request.
1597 */
1598static struct ceph_osd_request *
1599rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1600{
1601 struct rbd_img_request *img_request;
1602 struct ceph_snap_context *snapc;
1603 struct rbd_device *rbd_dev;
1604 struct ceph_osd_client *osdc;
1605 struct ceph_osd_request *osd_req;
1606
1607 rbd_assert(obj_request_img_data_test(obj_request));
1608 img_request = obj_request->img_request;
1609 rbd_assert(img_request);
1610 rbd_assert(img_request_write_test(img_request));
1611
1612 /* Allocate and initialize the request, for the two ops */
1613
1614 snapc = img_request->snapc;
1615 rbd_dev = img_request->rbd_dev;
1616 osdc = &rbd_dev->rbd_client->client->osdc;
1617 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1618 if (!osd_req)
1619 return NULL; /* ENOMEM */
1620
1621 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622 osd_req->r_callback = rbd_osd_req_callback;
1623 osd_req->r_priv = obj_request;
1624
1625 osd_req->r_oid_len = strlen(obj_request->object_name);
1626 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1627 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1628
1629 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1630
1631 return osd_req;
1632}
1633
1634
bf0d5f50
AE
1635static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1636{
1637 ceph_osdc_put_request(osd_req);
1638}
1639
1640/* object_name is assumed to be a non-null pointer and NUL-terminated */
1641
1642static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1643 u64 offset, u64 length,
1644 enum obj_request_type type)
1645{
1646 struct rbd_obj_request *obj_request;
1647 size_t size;
1648 char *name;
1649
1650 rbd_assert(obj_request_type_valid(type));
1651
1652 size = strlen(object_name) + 1;
1653 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1654 if (!obj_request)
1655 return NULL;
1656
1657 name = (char *)(obj_request + 1);
1658 obj_request->object_name = memcpy(name, object_name, size);
1659 obj_request->offset = offset;
1660 obj_request->length = length;
926f9b3f 1661 obj_request->flags = 0;
bf0d5f50
AE
1662 obj_request->which = BAD_WHICH;
1663 obj_request->type = type;
1664 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1665 init_completion(&obj_request->completion);
bf0d5f50
AE
1666 kref_init(&obj_request->kref);
1667
37206ee5
AE
1668 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1669 offset, length, (int)type, obj_request);
1670
bf0d5f50
AE
1671 return obj_request;
1672}
1673
1674static void rbd_obj_request_destroy(struct kref *kref)
1675{
1676 struct rbd_obj_request *obj_request;
1677
1678 obj_request = container_of(kref, struct rbd_obj_request, kref);
1679
37206ee5
AE
1680 dout("%s: obj %p\n", __func__, obj_request);
1681
bf0d5f50
AE
1682 rbd_assert(obj_request->img_request == NULL);
1683 rbd_assert(obj_request->which == BAD_WHICH);
1684
1685 if (obj_request->osd_req)
1686 rbd_osd_req_destroy(obj_request->osd_req);
1687
1688 rbd_assert(obj_request_type_valid(obj_request->type));
1689 switch (obj_request->type) {
9969ebc5
AE
1690 case OBJ_REQUEST_NODATA:
1691 break; /* Nothing to do */
bf0d5f50
AE
1692 case OBJ_REQUEST_BIO:
1693 if (obj_request->bio_list)
1694 bio_chain_put(obj_request->bio_list);
1695 break;
788e2df3
AE
1696 case OBJ_REQUEST_PAGES:
1697 if (obj_request->pages)
1698 ceph_release_page_vector(obj_request->pages,
1699 obj_request->page_count);
1700 break;
bf0d5f50
AE
1701 }
1702
1703 kfree(obj_request);
1704}
1705
1706/*
1707 * Caller is responsible for filling in the list of object requests
1708 * that comprises the image request, and the Linux request pointer
1709 * (if there is one).
1710 */
cc344fa1
AE
1711static struct rbd_img_request *rbd_img_request_create(
1712 struct rbd_device *rbd_dev,
bf0d5f50 1713 u64 offset, u64 length,
9849e986
AE
1714 bool write_request,
1715 bool child_request)
bf0d5f50
AE
1716{
1717 struct rbd_img_request *img_request;
bf0d5f50
AE
1718
1719 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1720 if (!img_request)
1721 return NULL;
1722
1723 if (write_request) {
1724 down_read(&rbd_dev->header_rwsem);
812164f8 1725 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1726 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1727 }
1728
1729 img_request->rq = NULL;
1730 img_request->rbd_dev = rbd_dev;
1731 img_request->offset = offset;
1732 img_request->length = length;
0c425248
AE
1733 img_request->flags = 0;
1734 if (write_request) {
1735 img_request_write_set(img_request);
468521c1 1736 img_request->snapc = rbd_dev->header.snapc;
0c425248 1737 } else {
bf0d5f50 1738 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1739 }
9849e986
AE
1740 if (child_request)
1741 img_request_child_set(img_request);
d0b2e944
AE
1742 if (rbd_dev->parent_spec)
1743 img_request_layered_set(img_request);
bf0d5f50
AE
1744 spin_lock_init(&img_request->completion_lock);
1745 img_request->next_completion = 0;
1746 img_request->callback = NULL;
a5a337d4 1747 img_request->result = 0;
bf0d5f50
AE
1748 img_request->obj_request_count = 0;
1749 INIT_LIST_HEAD(&img_request->obj_requests);
1750 kref_init(&img_request->kref);
1751
1752 rbd_img_request_get(img_request); /* Avoid a warning */
1753 rbd_img_request_put(img_request); /* TEMPORARY */
1754
37206ee5
AE
1755 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1756 write_request ? "write" : "read", offset, length,
1757 img_request);
1758
bf0d5f50
AE
1759 return img_request;
1760}
1761
1762static void rbd_img_request_destroy(struct kref *kref)
1763{
1764 struct rbd_img_request *img_request;
1765 struct rbd_obj_request *obj_request;
1766 struct rbd_obj_request *next_obj_request;
1767
1768 img_request = container_of(kref, struct rbd_img_request, kref);
1769
37206ee5
AE
1770 dout("%s: img %p\n", __func__, img_request);
1771
bf0d5f50
AE
1772 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1773 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1774 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1775
0c425248 1776 if (img_request_write_test(img_request))
812164f8 1777 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1778
8b3e1a56
AE
1779 if (img_request_child_test(img_request))
1780 rbd_obj_request_put(img_request->obj_request);
1781
bf0d5f50
AE
1782 kfree(img_request);
1783}
1784
1217857f
AE
1785static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1786{
6365d33a 1787 struct rbd_img_request *img_request;
1217857f
AE
1788 unsigned int xferred;
1789 int result;
8b3e1a56 1790 bool more;
1217857f 1791
6365d33a
AE
1792 rbd_assert(obj_request_img_data_test(obj_request));
1793 img_request = obj_request->img_request;
1794
1217857f
AE
1795 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1796 xferred = (unsigned int)obj_request->xferred;
1797 result = obj_request->result;
1798 if (result) {
1799 struct rbd_device *rbd_dev = img_request->rbd_dev;
1800
1801 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1802 img_request_write_test(img_request) ? "write" : "read",
1803 obj_request->length, obj_request->img_offset,
1804 obj_request->offset);
1805 rbd_warn(rbd_dev, " result %d xferred %x\n",
1806 result, xferred);
1807 if (!img_request->result)
1808 img_request->result = result;
1809 }
1810
f1a4739f
AE
1811 /* Image object requests don't own their page array */
1812
1813 if (obj_request->type == OBJ_REQUEST_PAGES) {
1814 obj_request->pages = NULL;
1815 obj_request->page_count = 0;
1816 }
1817
8b3e1a56
AE
1818 if (img_request_child_test(img_request)) {
1819 rbd_assert(img_request->obj_request != NULL);
1820 more = obj_request->which < img_request->obj_request_count - 1;
1821 } else {
1822 rbd_assert(img_request->rq != NULL);
1823 more = blk_end_request(img_request->rq, result, xferred);
1824 }
1825
1826 return more;
1217857f
AE
1827}
1828
2169238d
AE
1829static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1830{
1831 struct rbd_img_request *img_request;
1832 u32 which = obj_request->which;
1833 bool more = true;
1834
6365d33a 1835 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1836 img_request = obj_request->img_request;
1837
1838 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1839 rbd_assert(img_request != NULL);
2169238d
AE
1840 rbd_assert(img_request->obj_request_count > 0);
1841 rbd_assert(which != BAD_WHICH);
1842 rbd_assert(which < img_request->obj_request_count);
1843 rbd_assert(which >= img_request->next_completion);
1844
1845 spin_lock_irq(&img_request->completion_lock);
1846 if (which != img_request->next_completion)
1847 goto out;
1848
1849 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1850 rbd_assert(more);
1851 rbd_assert(which < img_request->obj_request_count);
1852
1853 if (!obj_request_done_test(obj_request))
1854 break;
1217857f 1855 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1856 which++;
1857 }
1858
1859 rbd_assert(more ^ (which == img_request->obj_request_count));
1860 img_request->next_completion = which;
1861out:
1862 spin_unlock_irq(&img_request->completion_lock);
1863
1864 if (!more)
1865 rbd_img_request_complete(img_request);
1866}
1867
f1a4739f
AE
1868/*
1869 * Split up an image request into one or more object requests, each
1870 * to a different object. The "type" parameter indicates whether
1871 * "data_desc" is the pointer to the head of a list of bio
1872 * structures, or the base of a page array. In either case this
1873 * function assumes data_desc describes memory sufficient to hold
1874 * all data described by the image request.
1875 */
1876static int rbd_img_request_fill(struct rbd_img_request *img_request,
1877 enum obj_request_type type,
1878 void *data_desc)
bf0d5f50
AE
1879{
1880 struct rbd_device *rbd_dev = img_request->rbd_dev;
1881 struct rbd_obj_request *obj_request = NULL;
1882 struct rbd_obj_request *next_obj_request;
0c425248 1883 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
1884 struct bio *bio_list;
1885 unsigned int bio_offset = 0;
1886 struct page **pages;
7da22d29 1887 u64 img_offset;
bf0d5f50
AE
1888 u64 resid;
1889 u16 opcode;
1890
f1a4739f
AE
1891 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1892 (int)type, data_desc);
37206ee5 1893
430c28c3 1894 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 1895 img_offset = img_request->offset;
bf0d5f50 1896 resid = img_request->length;
4dda41d3 1897 rbd_assert(resid > 0);
f1a4739f
AE
1898
1899 if (type == OBJ_REQUEST_BIO) {
1900 bio_list = data_desc;
1901 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1902 } else {
1903 rbd_assert(type == OBJ_REQUEST_PAGES);
1904 pages = data_desc;
1905 }
1906
bf0d5f50 1907 while (resid) {
2fa12320 1908 struct ceph_osd_request *osd_req;
bf0d5f50 1909 const char *object_name;
bf0d5f50
AE
1910 u64 offset;
1911 u64 length;
1912
7da22d29 1913 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1914 if (!object_name)
1915 goto out_unwind;
7da22d29
AE
1916 offset = rbd_segment_offset(rbd_dev, img_offset);
1917 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 1918 obj_request = rbd_obj_request_create(object_name,
f1a4739f 1919 offset, length, type);
bf0d5f50
AE
1920 kfree(object_name); /* object request has its own copy */
1921 if (!obj_request)
1922 goto out_unwind;
1923
f1a4739f
AE
1924 if (type == OBJ_REQUEST_BIO) {
1925 unsigned int clone_size;
1926
1927 rbd_assert(length <= (u64)UINT_MAX);
1928 clone_size = (unsigned int)length;
1929 obj_request->bio_list =
1930 bio_chain_clone_range(&bio_list,
1931 &bio_offset,
1932 clone_size,
1933 GFP_ATOMIC);
1934 if (!obj_request->bio_list)
1935 goto out_partial;
1936 } else {
1937 unsigned int page_count;
1938
1939 obj_request->pages = pages;
1940 page_count = (u32)calc_pages_for(offset, length);
1941 obj_request->page_count = page_count;
1942 if ((offset + length) & ~PAGE_MASK)
1943 page_count--; /* more on last page */
1944 pages += page_count;
1945 }
bf0d5f50 1946
2fa12320
AE
1947 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1948 obj_request);
1949 if (!osd_req)
bf0d5f50 1950 goto out_partial;
2fa12320 1951 obj_request->osd_req = osd_req;
2169238d 1952 obj_request->callback = rbd_img_obj_callback;
430c28c3 1953
2fa12320
AE
1954 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1955 0, 0);
f1a4739f
AE
1956 if (type == OBJ_REQUEST_BIO)
1957 osd_req_op_extent_osd_data_bio(osd_req, 0,
1958 obj_request->bio_list, length);
1959 else
1960 osd_req_op_extent_osd_data_pages(osd_req, 0,
1961 obj_request->pages, length,
1962 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
1963
1964 if (write_request)
1965 rbd_osd_req_format_write(obj_request);
1966 else
1967 rbd_osd_req_format_read(obj_request);
430c28c3 1968
7da22d29 1969 obj_request->img_offset = img_offset;
bf0d5f50
AE
1970 rbd_img_obj_request_add(img_request, obj_request);
1971
7da22d29 1972 img_offset += length;
bf0d5f50
AE
1973 resid -= length;
1974 }
1975
1976 return 0;
1977
1978out_partial:
1979 rbd_obj_request_put(obj_request);
1980out_unwind:
1981 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1982 rbd_obj_request_put(obj_request);
1983
1984 return -ENOMEM;
1985}
1986
0eefd470
AE
1987static void
1988rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1989{
1990 struct rbd_img_request *img_request;
1991 struct rbd_device *rbd_dev;
1992 u64 length;
1993 u32 page_count;
1994
1995 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1996 rbd_assert(obj_request_img_data_test(obj_request));
1997 img_request = obj_request->img_request;
1998 rbd_assert(img_request);
1999
2000 rbd_dev = img_request->rbd_dev;
2001 rbd_assert(rbd_dev);
2002 length = (u64)1 << rbd_dev->header.obj_order;
2003 page_count = (u32)calc_pages_for(0, length);
2004
2005 rbd_assert(obj_request->copyup_pages);
2006 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2007 obj_request->copyup_pages = NULL;
2008
2009 /*
2010 * We want the transfer count to reflect the size of the
2011 * original write request. There is no such thing as a
2012 * successful short write, so if the request was successful
2013 * we can just set it to the originally-requested length.
2014 */
2015 if (!obj_request->result)
2016 obj_request->xferred = obj_request->length;
2017
2018 /* Finish up with the normal image object callback */
2019
2020 rbd_img_obj_callback(obj_request);
2021}
2022
3d7efd18
AE
2023static void
2024rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2025{
2026 struct rbd_obj_request *orig_request;
0eefd470
AE
2027 struct ceph_osd_request *osd_req;
2028 struct ceph_osd_client *osdc;
2029 struct rbd_device *rbd_dev;
3d7efd18 2030 struct page **pages;
3d7efd18
AE
2031 int result;
2032 u64 obj_size;
2033 u64 xferred;
2034
2035 rbd_assert(img_request_child_test(img_request));
2036
2037 /* First get what we need from the image request */
2038
2039 pages = img_request->copyup_pages;
2040 rbd_assert(pages != NULL);
2041 img_request->copyup_pages = NULL;
2042
2043 orig_request = img_request->obj_request;
2044 rbd_assert(orig_request != NULL);
0eefd470 2045 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2046 result = img_request->result;
2047 obj_size = img_request->length;
2048 xferred = img_request->xferred;
2049
0eefd470
AE
2050 rbd_dev = img_request->rbd_dev;
2051 rbd_assert(rbd_dev);
2052 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2053
3d7efd18
AE
2054 rbd_img_request_put(img_request);
2055
0eefd470
AE
2056 if (result)
2057 goto out_err;
2058
2059 /* Allocate the new copyup osd request for the original request */
2060
2061 result = -ENOMEM;
2062 rbd_assert(!orig_request->osd_req);
2063 osd_req = rbd_osd_req_create_copyup(orig_request);
2064 if (!osd_req)
2065 goto out_err;
2066 orig_request->osd_req = osd_req;
2067 orig_request->copyup_pages = pages;
3d7efd18 2068
0eefd470 2069 /* Initialize the copyup op */
3d7efd18 2070
0eefd470
AE
2071 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2072 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2073 false, false);
3d7efd18 2074
0eefd470
AE
2075 /* Then the original write request op */
2076
2077 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2078 orig_request->offset,
2079 orig_request->length, 0, 0);
2080 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2081 orig_request->length);
2082
2083 rbd_osd_req_format_write(orig_request);
2084
2085 /* All set, send it off. */
2086
2087 orig_request->callback = rbd_img_obj_copyup_callback;
2088 osdc = &rbd_dev->rbd_client->client->osdc;
2089 result = rbd_obj_request_submit(osdc, orig_request);
2090 if (!result)
2091 return;
2092out_err:
2093 /* Record the error code and complete the request */
2094
2095 orig_request->result = result;
2096 orig_request->xferred = 0;
2097 obj_request_done_set(orig_request);
2098 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2099}
2100
2101/*
2102 * Read from the parent image the range of data that covers the
2103 * entire target of the given object request. This is used for
2104 * satisfying a layered image write request when the target of an
2105 * object request from the image request does not exist.
2106 *
2107 * A page array big enough to hold the returned data is allocated
2108 * and supplied to rbd_img_request_fill() as the "data descriptor."
2109 * When the read completes, this page array will be transferred to
2110 * the original object request for the copyup operation.
2111 *
2112 * If an error occurs, record it as the result of the original
2113 * object request and mark it done so it gets completed.
2114 */
2115static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2116{
2117 struct rbd_img_request *img_request = NULL;
2118 struct rbd_img_request *parent_request = NULL;
2119 struct rbd_device *rbd_dev;
2120 u64 img_offset;
2121 u64 length;
2122 struct page **pages = NULL;
2123 u32 page_count;
2124 int result;
2125
2126 rbd_assert(obj_request_img_data_test(obj_request));
2127 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128
2129 img_request = obj_request->img_request;
2130 rbd_assert(img_request != NULL);
2131 rbd_dev = img_request->rbd_dev;
2132 rbd_assert(rbd_dev->parent != NULL);
2133
0eefd470
AE
2134 /*
2135 * First things first. The original osd request is of no
2136 * use to use any more, we'll need a new one that can hold
2137 * the two ops in a copyup request. We'll get that later,
2138 * but for now we can release the old one.
2139 */
2140 rbd_osd_req_destroy(obj_request->osd_req);
2141 obj_request->osd_req = NULL;
2142
3d7efd18
AE
2143 /*
2144 * Determine the byte range covered by the object in the
2145 * child image to which the original request was to be sent.
2146 */
2147 img_offset = obj_request->img_offset - obj_request->offset;
2148 length = (u64)1 << rbd_dev->header.obj_order;
2149
a9e8ba2c
AE
2150 /*
2151 * There is no defined parent data beyond the parent
2152 * overlap, so limit what we read at that boundary if
2153 * necessary.
2154 */
2155 if (img_offset + length > rbd_dev->parent_overlap) {
2156 rbd_assert(img_offset < rbd_dev->parent_overlap);
2157 length = rbd_dev->parent_overlap - img_offset;
2158 }
2159
3d7efd18
AE
2160 /*
2161 * Allocate a page array big enough to receive the data read
2162 * from the parent.
2163 */
2164 page_count = (u32)calc_pages_for(0, length);
2165 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2166 if (IS_ERR(pages)) {
2167 result = PTR_ERR(pages);
2168 pages = NULL;
2169 goto out_err;
2170 }
2171
2172 result = -ENOMEM;
2173 parent_request = rbd_img_request_create(rbd_dev->parent,
2174 img_offset, length,
2175 false, true);
2176 if (!parent_request)
2177 goto out_err;
2178 rbd_obj_request_get(obj_request);
2179 parent_request->obj_request = obj_request;
2180
2181 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2182 if (result)
2183 goto out_err;
2184 parent_request->copyup_pages = pages;
2185
2186 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2187 result = rbd_img_request_submit(parent_request);
2188 if (!result)
2189 return 0;
2190
2191 parent_request->copyup_pages = NULL;
2192 parent_request->obj_request = NULL;
2193 rbd_obj_request_put(obj_request);
2194out_err:
2195 if (pages)
2196 ceph_release_page_vector(pages, page_count);
2197 if (parent_request)
2198 rbd_img_request_put(parent_request);
2199 obj_request->result = result;
2200 obj_request->xferred = 0;
2201 obj_request_done_set(obj_request);
2202
2203 return result;
2204}
2205
c5b5ef6c
AE
2206static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2207{
c5b5ef6c
AE
2208 struct rbd_obj_request *orig_request;
2209 int result;
2210
2211 rbd_assert(!obj_request_img_data_test(obj_request));
2212
2213 /*
2214 * All we need from the object request is the original
2215 * request and the result of the STAT op. Grab those, then
2216 * we're done with the request.
2217 */
2218 orig_request = obj_request->obj_request;
2219 obj_request->obj_request = NULL;
2220 rbd_assert(orig_request);
2221 rbd_assert(orig_request->img_request);
2222
2223 result = obj_request->result;
2224 obj_request->result = 0;
2225
2226 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2227 obj_request, orig_request, result,
2228 obj_request->xferred, obj_request->length);
2229 rbd_obj_request_put(obj_request);
2230
2231 rbd_assert(orig_request);
2232 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2233
2234 /*
2235 * Our only purpose here is to determine whether the object
2236 * exists, and we don't want to treat the non-existence as
2237 * an error. If something else comes back, transfer the
2238 * error to the original request and complete it now.
2239 */
2240 if (!result) {
2241 obj_request_existence_set(orig_request, true);
2242 } else if (result == -ENOENT) {
2243 obj_request_existence_set(orig_request, false);
2244 } else if (result) {
2245 orig_request->result = result;
3d7efd18 2246 goto out;
c5b5ef6c
AE
2247 }
2248
2249 /*
2250 * Resubmit the original request now that we have recorded
2251 * whether the target object exists.
2252 */
b454e36d 2253 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2254out:
c5b5ef6c
AE
2255 if (orig_request->result)
2256 rbd_obj_request_complete(orig_request);
2257 rbd_obj_request_put(orig_request);
2258}
2259
2260static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2261{
2262 struct rbd_obj_request *stat_request;
2263 struct rbd_device *rbd_dev;
2264 struct ceph_osd_client *osdc;
2265 struct page **pages = NULL;
2266 u32 page_count;
2267 size_t size;
2268 int ret;
2269
2270 /*
2271 * The response data for a STAT call consists of:
2272 * le64 length;
2273 * struct {
2274 * le32 tv_sec;
2275 * le32 tv_nsec;
2276 * } mtime;
2277 */
2278 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2279 page_count = (u32)calc_pages_for(0, size);
2280 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2281 if (IS_ERR(pages))
2282 return PTR_ERR(pages);
2283
2284 ret = -ENOMEM;
2285 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2286 OBJ_REQUEST_PAGES);
2287 if (!stat_request)
2288 goto out;
2289
2290 rbd_obj_request_get(obj_request);
2291 stat_request->obj_request = obj_request;
2292 stat_request->pages = pages;
2293 stat_request->page_count = page_count;
2294
2295 rbd_assert(obj_request->img_request);
2296 rbd_dev = obj_request->img_request->rbd_dev;
2297 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2298 stat_request);
2299 if (!stat_request->osd_req)
2300 goto out;
2301 stat_request->callback = rbd_img_obj_exists_callback;
2302
2303 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2304 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2305 false, false);
9d4df01f 2306 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2307
2308 osdc = &rbd_dev->rbd_client->client->osdc;
2309 ret = rbd_obj_request_submit(osdc, stat_request);
2310out:
2311 if (ret)
2312 rbd_obj_request_put(obj_request);
2313
2314 return ret;
2315}
2316
b454e36d
AE
2317static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2318{
2319 struct rbd_img_request *img_request;
a9e8ba2c 2320 struct rbd_device *rbd_dev;
3d7efd18 2321 bool known;
b454e36d
AE
2322
2323 rbd_assert(obj_request_img_data_test(obj_request));
2324
2325 img_request = obj_request->img_request;
2326 rbd_assert(img_request);
a9e8ba2c 2327 rbd_dev = img_request->rbd_dev;
b454e36d 2328
b454e36d 2329 /*
a9e8ba2c
AE
2330 * Only writes to layered images need special handling.
2331 * Reads and non-layered writes are simple object requests.
2332 * Layered writes that start beyond the end of the overlap
2333 * with the parent have no parent data, so they too are
2334 * simple object requests. Finally, if the target object is
2335 * known to already exist, its parent data has already been
2336 * copied, so a write to the object can also be handled as a
2337 * simple object request.
b454e36d
AE
2338 */
2339 if (!img_request_write_test(img_request) ||
2340 !img_request_layered_test(img_request) ||
a9e8ba2c 2341 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2342 ((known = obj_request_known_test(obj_request)) &&
2343 obj_request_exists_test(obj_request))) {
b454e36d
AE
2344
2345 struct rbd_device *rbd_dev;
2346 struct ceph_osd_client *osdc;
2347
2348 rbd_dev = obj_request->img_request->rbd_dev;
2349 osdc = &rbd_dev->rbd_client->client->osdc;
2350
2351 return rbd_obj_request_submit(osdc, obj_request);
2352 }
2353
2354 /*
3d7efd18
AE
2355 * It's a layered write. The target object might exist but
2356 * we may not know that yet. If we know it doesn't exist,
2357 * start by reading the data for the full target object from
2358 * the parent so we can use it for a copyup to the target.
b454e36d 2359 */
3d7efd18
AE
2360 if (known)
2361 return rbd_img_obj_parent_read_full(obj_request);
2362
2363 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2364
2365 return rbd_img_obj_exists_submit(obj_request);
2366}
2367
bf0d5f50
AE
2368static int rbd_img_request_submit(struct rbd_img_request *img_request)
2369{
bf0d5f50 2370 struct rbd_obj_request *obj_request;
46faeed4 2371 struct rbd_obj_request *next_obj_request;
bf0d5f50 2372
37206ee5 2373 dout("%s: img %p\n", __func__, img_request);
46faeed4 2374 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2375 int ret;
2376
b454e36d 2377 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2378 if (ret)
2379 return ret;
bf0d5f50
AE
2380 }
2381
2382 return 0;
2383}
8b3e1a56
AE
2384
2385static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2386{
2387 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2388 struct rbd_device *rbd_dev;
2389 u64 obj_end;
8b3e1a56
AE
2390
2391 rbd_assert(img_request_child_test(img_request));
2392
2393 obj_request = img_request->obj_request;
a9e8ba2c
AE
2394 rbd_assert(obj_request);
2395 rbd_assert(obj_request->img_request);
2396
8b3e1a56 2397 obj_request->result = img_request->result;
a9e8ba2c
AE
2398 if (obj_request->result)
2399 goto out;
2400
2401 /*
2402 * We need to zero anything beyond the parent overlap
2403 * boundary. Since rbd_img_obj_request_read_callback()
2404 * will zero anything beyond the end of a short read, an
2405 * easy way to do this is to pretend the data from the
2406 * parent came up short--ending at the overlap boundary.
2407 */
2408 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2409 obj_end = obj_request->img_offset + obj_request->length;
2410 rbd_dev = obj_request->img_request->rbd_dev;
2411 if (obj_end > rbd_dev->parent_overlap) {
2412 u64 xferred = 0;
2413
2414 if (obj_request->img_offset < rbd_dev->parent_overlap)
2415 xferred = rbd_dev->parent_overlap -
2416 obj_request->img_offset;
8b3e1a56 2417
a9e8ba2c
AE
2418 obj_request->xferred = min(img_request->xferred, xferred);
2419 } else {
2420 obj_request->xferred = img_request->xferred;
2421 }
2422out:
8b3e1a56
AE
2423 rbd_img_obj_request_read_callback(obj_request);
2424 rbd_obj_request_complete(obj_request);
2425}
2426
2427static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2428{
2429 struct rbd_device *rbd_dev;
2430 struct rbd_img_request *img_request;
2431 int result;
2432
2433 rbd_assert(obj_request_img_data_test(obj_request));
2434 rbd_assert(obj_request->img_request != NULL);
2435 rbd_assert(obj_request->result == (s32) -ENOENT);
2436 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2437
2438 rbd_dev = obj_request->img_request->rbd_dev;
2439 rbd_assert(rbd_dev->parent != NULL);
2440 /* rbd_read_finish(obj_request, obj_request->length); */
2441 img_request = rbd_img_request_create(rbd_dev->parent,
2442 obj_request->img_offset,
2443 obj_request->length,
2444 false, true);
2445 result = -ENOMEM;
2446 if (!img_request)
2447 goto out_err;
2448
2449 rbd_obj_request_get(obj_request);
2450 img_request->obj_request = obj_request;
2451
f1a4739f
AE
2452 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2453 obj_request->bio_list);
8b3e1a56
AE
2454 if (result)
2455 goto out_err;
2456
2457 img_request->callback = rbd_img_parent_read_callback;
2458 result = rbd_img_request_submit(img_request);
2459 if (result)
2460 goto out_err;
2461
2462 return;
2463out_err:
2464 if (img_request)
2465 rbd_img_request_put(img_request);
2466 obj_request->result = result;
2467 obj_request->xferred = 0;
2468 obj_request_done_set(obj_request);
2469}
bf0d5f50 2470
cf81b60e 2471static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
2472 u64 ver, u64 notify_id)
2473{
2474 struct rbd_obj_request *obj_request;
2169238d 2475 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2476 int ret;
2477
2478 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2479 OBJ_REQUEST_NODATA);
2480 if (!obj_request)
2481 return -ENOMEM;
2482
2483 ret = -ENOMEM;
430c28c3 2484 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2485 if (!obj_request->osd_req)
2486 goto out;
2169238d 2487 obj_request->callback = rbd_obj_request_put;
b8d70035 2488
c99d2d4a
AE
2489 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2490 notify_id, ver, 0);
9d4df01f 2491 rbd_osd_req_format_read(obj_request);
430c28c3 2492
b8d70035 2493 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2494out:
cf81b60e
AE
2495 if (ret)
2496 rbd_obj_request_put(obj_request);
b8d70035
AE
2497
2498 return ret;
2499}
2500
2501static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2502{
2503 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2504 u64 hver;
b8d70035
AE
2505
2506 if (!rbd_dev)
2507 return;
2508
37206ee5 2509 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
2510 rbd_dev->header_name, (unsigned long long) notify_id,
2511 (unsigned int) opcode);
522a0cc0 2512 (void)rbd_dev_refresh(rbd_dev, &hver);
b8d70035 2513
cf81b60e 2514 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
2515}
2516
9969ebc5
AE
2517/*
2518 * Request sync osd watch/unwatch. The value of "start" determines
2519 * whether a watch request is being initiated or torn down.
2520 */
2521static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2522{
2523 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2524 struct rbd_obj_request *obj_request;
9969ebc5
AE
2525 int ret;
2526
2527 rbd_assert(start ^ !!rbd_dev->watch_event);
2528 rbd_assert(start ^ !!rbd_dev->watch_request);
2529
2530 if (start) {
3c663bbd 2531 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2532 &rbd_dev->watch_event);
2533 if (ret < 0)
2534 return ret;
8eb87565 2535 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2536 }
2537
2538 ret = -ENOMEM;
2539 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2540 OBJ_REQUEST_NODATA);
2541 if (!obj_request)
2542 goto out_cancel;
2543
430c28c3
AE
2544 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2545 if (!obj_request->osd_req)
2546 goto out_cancel;
2547
8eb87565 2548 if (start)
975241af 2549 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2550 else
6977c3f9 2551 ceph_osdc_unregister_linger_request(osdc,
975241af 2552 rbd_dev->watch_request->osd_req);
2169238d
AE
2553
2554 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
b21ebddd 2555 rbd_dev->watch_event->cookie, 0, start);
9d4df01f 2556 rbd_osd_req_format_write(obj_request);
2169238d 2557
9969ebc5
AE
2558 ret = rbd_obj_request_submit(osdc, obj_request);
2559 if (ret)
2560 goto out_cancel;
2561 ret = rbd_obj_request_wait(obj_request);
2562 if (ret)
2563 goto out_cancel;
9969ebc5
AE
2564 ret = obj_request->result;
2565 if (ret)
2566 goto out_cancel;
2567
8eb87565
AE
2568 /*
2569 * A watch request is set to linger, so the underlying osd
2570 * request won't go away until we unregister it. We retain
2571 * a pointer to the object request during that time (in
2572 * rbd_dev->watch_request), so we'll keep a reference to
2573 * it. We'll drop that reference (below) after we've
2574 * unregistered it.
2575 */
2576 if (start) {
2577 rbd_dev->watch_request = obj_request;
2578
2579 return 0;
2580 }
2581
2582 /* We have successfully torn down the watch request */
2583
2584 rbd_obj_request_put(rbd_dev->watch_request);
2585 rbd_dev->watch_request = NULL;
9969ebc5
AE
2586out_cancel:
2587 /* Cancel the event if we're tearing down, or on error */
2588 ceph_osdc_cancel_event(rbd_dev->watch_event);
2589 rbd_dev->watch_event = NULL;
9969ebc5
AE
2590 if (obj_request)
2591 rbd_obj_request_put(obj_request);
2592
2593 return ret;
2594}
2595
36be9a76 2596/*
f40eb349
AE
2597 * Synchronous osd object method call. Returns the number of bytes
2598 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2599 */
2600static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2601 const char *object_name,
2602 const char *class_name,
2603 const char *method_name,
4157976b 2604 const void *outbound,
36be9a76 2605 size_t outbound_size,
4157976b 2606 void *inbound,
36be9a76
AE
2607 size_t inbound_size,
2608 u64 *version)
2609{
2169238d 2610 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2611 struct rbd_obj_request *obj_request;
36be9a76
AE
2612 struct page **pages;
2613 u32 page_count;
2614 int ret;
2615
2616 /*
6010a451
AE
2617 * Method calls are ultimately read operations. The result
2618 * should placed into the inbound buffer provided. They
2619 * also supply outbound data--parameters for the object
2620 * method. Currently if this is present it will be a
2621 * snapshot id.
36be9a76 2622 */
57385b51 2623 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2624 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2625 if (IS_ERR(pages))
2626 return PTR_ERR(pages);
2627
2628 ret = -ENOMEM;
6010a451 2629 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2630 OBJ_REQUEST_PAGES);
2631 if (!obj_request)
2632 goto out;
2633
2634 obj_request->pages = pages;
2635 obj_request->page_count = page_count;
2636
430c28c3 2637 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2638 if (!obj_request->osd_req)
2639 goto out;
2640
c99d2d4a 2641 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2642 class_name, method_name);
2643 if (outbound_size) {
2644 struct ceph_pagelist *pagelist;
2645
2646 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2647 if (!pagelist)
2648 goto out;
2649
2650 ceph_pagelist_init(pagelist);
2651 ceph_pagelist_append(pagelist, outbound, outbound_size);
2652 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2653 pagelist);
2654 }
a4ce40a9
AE
2655 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2656 obj_request->pages, inbound_size,
44cd188d 2657 0, false, false);
9d4df01f 2658 rbd_osd_req_format_read(obj_request);
430c28c3 2659
36be9a76
AE
2660 ret = rbd_obj_request_submit(osdc, obj_request);
2661 if (ret)
2662 goto out;
2663 ret = rbd_obj_request_wait(obj_request);
2664 if (ret)
2665 goto out;
2666
2667 ret = obj_request->result;
2668 if (ret < 0)
2669 goto out;
57385b51
AE
2670
2671 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2672 ret = (int)obj_request->xferred;
903bb32e 2673 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2674 if (version)
2675 *version = obj_request->version;
2676out:
2677 if (obj_request)
2678 rbd_obj_request_put(obj_request);
2679 else
2680 ceph_release_page_vector(pages, page_count);
2681
2682 return ret;
2683}
2684
bf0d5f50 2685static void rbd_request_fn(struct request_queue *q)
cc344fa1 2686 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2687{
2688 struct rbd_device *rbd_dev = q->queuedata;
2689 bool read_only = rbd_dev->mapping.read_only;
2690 struct request *rq;
2691 int result;
2692
2693 while ((rq = blk_fetch_request(q))) {
2694 bool write_request = rq_data_dir(rq) == WRITE;
2695 struct rbd_img_request *img_request;
2696 u64 offset;
2697 u64 length;
2698
2699 /* Ignore any non-FS requests that filter through. */
2700
2701 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2702 dout("%s: non-fs request type %d\n", __func__,
2703 (int) rq->cmd_type);
2704 __blk_end_request_all(rq, 0);
2705 continue;
2706 }
2707
2708 /* Ignore/skip any zero-length requests */
2709
2710 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2711 length = (u64) blk_rq_bytes(rq);
2712
2713 if (!length) {
2714 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2715 __blk_end_request_all(rq, 0);
2716 continue;
2717 }
2718
2719 spin_unlock_irq(q->queue_lock);
2720
2721 /* Disallow writes to a read-only device */
2722
2723 if (write_request) {
2724 result = -EROFS;
2725 if (read_only)
2726 goto end_request;
2727 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2728 }
2729
6d292906
AE
2730 /*
2731 * Quit early if the mapped snapshot no longer
2732 * exists. It's still possible the snapshot will
2733 * have disappeared by the time our request arrives
2734 * at the osd, but there's no sense in sending it if
2735 * we already know.
2736 */
2737 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2738 dout("request for non-existent snapshot");
2739 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2740 result = -ENXIO;
2741 goto end_request;
2742 }
2743
bf0d5f50 2744 result = -EINVAL;
c0cd10db
AE
2745 if (offset && length > U64_MAX - offset + 1) {
2746 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2747 offset, length);
bf0d5f50 2748 goto end_request; /* Shouldn't happen */
c0cd10db 2749 }
bf0d5f50
AE
2750
2751 result = -ENOMEM;
2752 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2753 write_request, false);
bf0d5f50
AE
2754 if (!img_request)
2755 goto end_request;
2756
2757 img_request->rq = rq;
2758
f1a4739f
AE
2759 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2760 rq->bio);
bf0d5f50
AE
2761 if (!result)
2762 result = rbd_img_request_submit(img_request);
2763 if (result)
2764 rbd_img_request_put(img_request);
2765end_request:
2766 spin_lock_irq(q->queue_lock);
2767 if (result < 0) {
7da22d29
AE
2768 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2769 write_request ? "write" : "read",
2770 length, offset, result);
2771
bf0d5f50
AE
2772 __blk_end_request_all(rq, result);
2773 }
2774 }
2775}
2776
602adf40
YS
2777/*
2778 * a queue callback. Makes sure that we don't create a bio that spans across
2779 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2780 * which we handle later at bio_chain_clone_range()
602adf40
YS
2781 */
2782static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2783 struct bio_vec *bvec)
2784{
2785 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2786 sector_t sector_offset;
2787 sector_t sectors_per_obj;
2788 sector_t obj_sector_offset;
2789 int ret;
2790
2791 /*
2792 * Find how far into its rbd object the partition-relative
2793 * bio start sector is to offset relative to the enclosing
2794 * device.
2795 */
2796 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2797 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2798 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2799
2800 /*
2801 * Compute the number of bytes from that offset to the end
2802 * of the object. Account for what's already used by the bio.
2803 */
2804 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2805 if (ret > bmd->bi_size)
2806 ret -= bmd->bi_size;
2807 else
2808 ret = 0;
2809
2810 /*
2811 * Don't send back more than was asked for. And if the bio
2812 * was empty, let the whole thing through because: "Note
2813 * that a block device *must* allow a single page to be
2814 * added to an empty bio."
2815 */
2816 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2817 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2818 ret = (int) bvec->bv_len;
2819
2820 return ret;
602adf40
YS
2821}
2822
2823static void rbd_free_disk(struct rbd_device *rbd_dev)
2824{
2825 struct gendisk *disk = rbd_dev->disk;
2826
2827 if (!disk)
2828 return;
2829
a0cab924
AE
2830 rbd_dev->disk = NULL;
2831 if (disk->flags & GENHD_FL_UP) {
602adf40 2832 del_gendisk(disk);
a0cab924
AE
2833 if (disk->queue)
2834 blk_cleanup_queue(disk->queue);
2835 }
602adf40
YS
2836 put_disk(disk);
2837}
2838
788e2df3
AE
2839static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2840 const char *object_name,
7097f8df 2841 u64 offset, u64 length, void *buf)
788e2df3
AE
2842
2843{
2169238d 2844 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2845 struct rbd_obj_request *obj_request;
788e2df3
AE
2846 struct page **pages = NULL;
2847 u32 page_count;
1ceae7ef 2848 size_t size;
788e2df3
AE
2849 int ret;
2850
2851 page_count = (u32) calc_pages_for(offset, length);
2852 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2853 if (IS_ERR(pages))
2854 ret = PTR_ERR(pages);
2855
2856 ret = -ENOMEM;
2857 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2858 OBJ_REQUEST_PAGES);
788e2df3
AE
2859 if (!obj_request)
2860 goto out;
2861
2862 obj_request->pages = pages;
2863 obj_request->page_count = page_count;
2864
430c28c3 2865 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2866 if (!obj_request->osd_req)
2867 goto out;
2868
c99d2d4a
AE
2869 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2870 offset, length, 0, 0);
406e2c9f 2871 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2872 obj_request->pages,
44cd188d
AE
2873 obj_request->length,
2874 obj_request->offset & ~PAGE_MASK,
2875 false, false);
9d4df01f 2876 rbd_osd_req_format_read(obj_request);
430c28c3 2877
788e2df3
AE
2878 ret = rbd_obj_request_submit(osdc, obj_request);
2879 if (ret)
2880 goto out;
2881 ret = rbd_obj_request_wait(obj_request);
2882 if (ret)
2883 goto out;
2884
2885 ret = obj_request->result;
2886 if (ret < 0)
2887 goto out;
1ceae7ef
AE
2888
2889 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2890 size = (size_t) obj_request->xferred;
903bb32e 2891 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
2892 rbd_assert(size <= (size_t)INT_MAX);
2893 ret = (int)size;
788e2df3
AE
2894out:
2895 if (obj_request)
2896 rbd_obj_request_put(obj_request);
2897 else
2898 ceph_release_page_vector(pages, page_count);
2899
2900 return ret;
2901}
2902
602adf40 2903/*
4156d998
AE
2904 * Read the complete header for the given rbd device.
2905 *
2906 * Returns a pointer to a dynamically-allocated buffer containing
2907 * the complete and validated header. Caller can pass the address
2908 * of a variable that will be filled in with the version of the
2909 * header object at the time it was read.
2910 *
2911 * Returns a pointer-coded errno if a failure occurs.
602adf40 2912 */
4156d998 2913static struct rbd_image_header_ondisk *
7097f8df 2914rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
602adf40 2915{
4156d998 2916 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2917 u32 snap_count = 0;
4156d998
AE
2918 u64 names_size = 0;
2919 u32 want_count;
2920 int ret;
602adf40 2921
00f1f36f 2922 /*
4156d998
AE
2923 * The complete header will include an array of its 64-bit
2924 * snapshot ids, followed by the names of those snapshots as
2925 * a contiguous block of NUL-terminated strings. Note that
2926 * the number of snapshots could change by the time we read
2927 * it in, in which case we re-read it.
00f1f36f 2928 */
4156d998
AE
2929 do {
2930 size_t size;
2931
2932 kfree(ondisk);
2933
2934 size = sizeof (*ondisk);
2935 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2936 size += names_size;
2937 ondisk = kmalloc(size, GFP_KERNEL);
2938 if (!ondisk)
2939 return ERR_PTR(-ENOMEM);
2940
788e2df3 2941 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 2942 0, size, ondisk);
4156d998
AE
2943 if (ret < 0)
2944 goto out_err;
c0cd10db 2945 if ((size_t)ret < size) {
4156d998 2946 ret = -ENXIO;
06ecc6cb
AE
2947 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2948 size, ret);
4156d998
AE
2949 goto out_err;
2950 }
2951 if (!rbd_dev_ondisk_valid(ondisk)) {
2952 ret = -ENXIO;
06ecc6cb 2953 rbd_warn(rbd_dev, "invalid header");
4156d998 2954 goto out_err;
81e759fb 2955 }
602adf40 2956
4156d998
AE
2957 names_size = le64_to_cpu(ondisk->snap_names_len);
2958 want_count = snap_count;
2959 snap_count = le32_to_cpu(ondisk->snap_count);
2960 } while (snap_count != want_count);
00f1f36f 2961
4156d998 2962 return ondisk;
00f1f36f 2963
4156d998
AE
2964out_err:
2965 kfree(ondisk);
2966
2967 return ERR_PTR(ret);
2968}
2969
2970/*
2971 * reload the ondisk the header
2972 */
2973static int rbd_read_header(struct rbd_device *rbd_dev,
2974 struct rbd_image_header *header)
2975{
2976 struct rbd_image_header_ondisk *ondisk;
4156d998 2977 int ret;
602adf40 2978
7097f8df 2979 ondisk = rbd_dev_v1_header_read(rbd_dev);
4156d998
AE
2980 if (IS_ERR(ondisk))
2981 return PTR_ERR(ondisk);
2982 ret = rbd_header_from_disk(header, ondisk);
4156d998
AE
2983 kfree(ondisk);
2984
2985 return ret;
602adf40
YS
2986}
2987
41f38c2b 2988static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2989{
2990 struct rbd_snap *snap;
a0593290 2991 struct rbd_snap *next;
dfc5606d 2992
6087b51b
AE
2993 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2994 list_del(&snap->node);
2995 rbd_snap_destroy(snap);
2996 }
dfc5606d
YS
2997}
2998
9478554a
AE
2999static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3000{
0d7dbfce 3001 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
3002 return;
3003
e28626a0
AE
3004 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3005 sector_t size;
3006
3007 rbd_dev->mapping.size = rbd_dev->header.image_size;
3008 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3009 dout("setting size to %llu sectors", (unsigned long long)size);
3010 set_capacity(rbd_dev->disk, size);
3011 }
9478554a
AE
3012}
3013
602adf40
YS
3014/*
3015 * only read the first part of the ondisk header, without the snaps info
3016 */
117973fb 3017static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
3018{
3019 int ret;
3020 struct rbd_image_header h;
602adf40
YS
3021
3022 ret = rbd_read_header(rbd_dev, &h);
3023 if (ret < 0)
3024 return ret;
3025
a51aa0c0
JD
3026 down_write(&rbd_dev->header_rwsem);
3027
9478554a
AE
3028 /* Update image size, and check for resize of mapped image */
3029 rbd_dev->header.image_size = h.image_size;
3030 rbd_update_mapping_size(rbd_dev);
9db4b3e3 3031
849b4260 3032 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3033 kfree(rbd_dev->header.snap_sizes);
849b4260 3034 kfree(rbd_dev->header.snap_names);
d1d25646 3035 /* osd requests may still refer to snapc */
812164f8 3036 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3037
93a24e08 3038 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3039 rbd_dev->header.snapc = h.snapc;
3040 rbd_dev->header.snap_names = h.snap_names;
3041 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260 3042 /* Free the extra copy of the object prefix */
c0cd10db
AE
3043 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3044 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
849b4260
AE
3045 kfree(h.object_prefix);
3046
304f6808 3047 ret = rbd_dev_snaps_update(rbd_dev);
dfc5606d 3048
c666601a 3049 up_write(&rbd_dev->header_rwsem);
602adf40 3050
dfc5606d 3051 return ret;
602adf40
YS
3052}
3053
117973fb 3054static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993 3055{
a3fbe5d4 3056 u64 image_size;
1fe5e993
AE
3057 int ret;
3058
117973fb 3059 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
a3fbe5d4 3060 image_size = rbd_dev->header.image_size;
1fe5e993 3061 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
3062 if (rbd_dev->image_format == 1)
3063 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3064 else
3065 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993 3066 mutex_unlock(&ctl_mutex);
522a0cc0
AE
3067 if (ret)
3068 rbd_warn(rbd_dev, "got notification but failed to "
3069 " update snaps: %d\n", ret);
a3fbe5d4
AE
3070 if (image_size != rbd_dev->header.image_size)
3071 revalidate_disk(rbd_dev->disk);
1fe5e993
AE
3072
3073 return ret;
3074}
3075
602adf40
YS
3076static int rbd_init_disk(struct rbd_device *rbd_dev)
3077{
3078 struct gendisk *disk;
3079 struct request_queue *q;
593a9e7b 3080 u64 segment_size;
602adf40 3081
602adf40 3082 /* create gendisk info */
602adf40
YS
3083 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3084 if (!disk)
1fcdb8aa 3085 return -ENOMEM;
602adf40 3086
f0f8cef5 3087 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3088 rbd_dev->dev_id);
602adf40
YS
3089 disk->major = rbd_dev->major;
3090 disk->first_minor = 0;
3091 disk->fops = &rbd_bd_ops;
3092 disk->private_data = rbd_dev;
3093
bf0d5f50 3094 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3095 if (!q)
3096 goto out_disk;
029bcbd8 3097
593a9e7b
AE
3098 /* We use the default size, but let's be explicit about it. */
3099 blk_queue_physical_block_size(q, SECTOR_SIZE);
3100
029bcbd8 3101 /* set io sizes to object size */
593a9e7b
AE
3102 segment_size = rbd_obj_bytes(&rbd_dev->header);
3103 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3104 blk_queue_max_segment_size(q, segment_size);
3105 blk_queue_io_min(q, segment_size);
3106 blk_queue_io_opt(q, segment_size);
029bcbd8 3107
602adf40
YS
3108 blk_queue_merge_bvec(q, rbd_merge_bvec);
3109 disk->queue = q;
3110
3111 q->queuedata = rbd_dev;
3112
3113 rbd_dev->disk = disk;
602adf40 3114
602adf40 3115 return 0;
602adf40
YS
3116out_disk:
3117 put_disk(disk);
1fcdb8aa
AE
3118
3119 return -ENOMEM;
602adf40
YS
3120}
3121
dfc5606d
YS
3122/*
3123 sysfs
3124*/
3125
593a9e7b
AE
3126static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3127{
3128 return container_of(dev, struct rbd_device, dev);
3129}
3130
dfc5606d
YS
3131static ssize_t rbd_size_show(struct device *dev,
3132 struct device_attribute *attr, char *buf)
3133{
593a9e7b 3134 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3135
fc71d833
AE
3136 return sprintf(buf, "%llu\n",
3137 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3138}
3139
34b13184
AE
3140/*
3141 * Note this shows the features for whatever's mapped, which is not
3142 * necessarily the base image.
3143 */
3144static ssize_t rbd_features_show(struct device *dev,
3145 struct device_attribute *attr, char *buf)
3146{
3147 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3148
3149 return sprintf(buf, "0x%016llx\n",
fc71d833 3150 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3151}
3152
dfc5606d
YS
3153static ssize_t rbd_major_show(struct device *dev,
3154 struct device_attribute *attr, char *buf)
3155{
593a9e7b 3156 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3157
fc71d833
AE
3158 if (rbd_dev->major)
3159 return sprintf(buf, "%d\n", rbd_dev->major);
3160
3161 return sprintf(buf, "(none)\n");
3162
dfc5606d
YS
3163}
3164
3165static ssize_t rbd_client_id_show(struct device *dev,
3166 struct device_attribute *attr, char *buf)
602adf40 3167{
593a9e7b 3168 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3169
1dbb4399
AE
3170 return sprintf(buf, "client%lld\n",
3171 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3172}
3173
dfc5606d
YS
3174static ssize_t rbd_pool_show(struct device *dev,
3175 struct device_attribute *attr, char *buf)
602adf40 3176{
593a9e7b 3177 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3178
0d7dbfce 3179 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3180}
3181
9bb2f334
AE
3182static ssize_t rbd_pool_id_show(struct device *dev,
3183 struct device_attribute *attr, char *buf)
3184{
3185 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3186
0d7dbfce 3187 return sprintf(buf, "%llu\n",
fc71d833 3188 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3189}
3190
dfc5606d
YS
3191static ssize_t rbd_name_show(struct device *dev,
3192 struct device_attribute *attr, char *buf)
3193{
593a9e7b 3194 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3195
a92ffdf8
AE
3196 if (rbd_dev->spec->image_name)
3197 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3198
3199 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3200}
3201
589d30e0
AE
3202static ssize_t rbd_image_id_show(struct device *dev,
3203 struct device_attribute *attr, char *buf)
3204{
3205 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3206
0d7dbfce 3207 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3208}
3209
34b13184
AE
3210/*
3211 * Shows the name of the currently-mapped snapshot (or
3212 * RBD_SNAP_HEAD_NAME for the base image).
3213 */
dfc5606d
YS
3214static ssize_t rbd_snap_show(struct device *dev,
3215 struct device_attribute *attr,
3216 char *buf)
3217{
593a9e7b 3218 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3219
0d7dbfce 3220 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3221}
3222
86b00e0d
AE
3223/*
3224 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3225 * for the parent image. If there is no parent, simply shows
3226 * "(no parent image)".
3227 */
3228static ssize_t rbd_parent_show(struct device *dev,
3229 struct device_attribute *attr,
3230 char *buf)
3231{
3232 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3233 struct rbd_spec *spec = rbd_dev->parent_spec;
3234 int count;
3235 char *bufp = buf;
3236
3237 if (!spec)
3238 return sprintf(buf, "(no parent image)\n");
3239
3240 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3241 (unsigned long long) spec->pool_id, spec->pool_name);
3242 if (count < 0)
3243 return count;
3244 bufp += count;
3245
3246 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3247 spec->image_name ? spec->image_name : "(unknown)");
3248 if (count < 0)
3249 return count;
3250 bufp += count;
3251
3252 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3253 (unsigned long long) spec->snap_id, spec->snap_name);
3254 if (count < 0)
3255 return count;
3256 bufp += count;
3257
3258 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3259 if (count < 0)
3260 return count;
3261 bufp += count;
3262
3263 return (ssize_t) (bufp - buf);
3264}
3265
dfc5606d
YS
3266static ssize_t rbd_image_refresh(struct device *dev,
3267 struct device_attribute *attr,
3268 const char *buf,
3269 size_t size)
3270{
593a9e7b 3271 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3272 int ret;
602adf40 3273
117973fb 3274 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
3275
3276 return ret < 0 ? ret : size;
dfc5606d 3277}
602adf40 3278
dfc5606d 3279static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3280static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3281static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3282static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3283static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3284static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3285static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3286static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3287static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3288static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3289static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3290
3291static struct attribute *rbd_attrs[] = {
3292 &dev_attr_size.attr,
34b13184 3293 &dev_attr_features.attr,
dfc5606d
YS
3294 &dev_attr_major.attr,
3295 &dev_attr_client_id.attr,
3296 &dev_attr_pool.attr,
9bb2f334 3297 &dev_attr_pool_id.attr,
dfc5606d 3298 &dev_attr_name.attr,
589d30e0 3299 &dev_attr_image_id.attr,
dfc5606d 3300 &dev_attr_current_snap.attr,
86b00e0d 3301 &dev_attr_parent.attr,
dfc5606d 3302 &dev_attr_refresh.attr,
dfc5606d
YS
3303 NULL
3304};
3305
3306static struct attribute_group rbd_attr_group = {
3307 .attrs = rbd_attrs,
3308};
3309
3310static const struct attribute_group *rbd_attr_groups[] = {
3311 &rbd_attr_group,
3312 NULL
3313};
3314
3315static void rbd_sysfs_dev_release(struct device *dev)
3316{
3317}
3318
3319static struct device_type rbd_device_type = {
3320 .name = "rbd",
3321 .groups = rbd_attr_groups,
3322 .release = rbd_sysfs_dev_release,
3323};
3324
8b8fb99c
AE
3325static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3326{
3327 kref_get(&spec->kref);
3328
3329 return spec;
3330}
3331
3332static void rbd_spec_free(struct kref *kref);
3333static void rbd_spec_put(struct rbd_spec *spec)
3334{
3335 if (spec)
3336 kref_put(&spec->kref, rbd_spec_free);
3337}
3338
3339static struct rbd_spec *rbd_spec_alloc(void)
3340{
3341 struct rbd_spec *spec;
3342
3343 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3344 if (!spec)
3345 return NULL;
3346 kref_init(&spec->kref);
3347
8b8fb99c
AE
3348 return spec;
3349}
3350
3351static void rbd_spec_free(struct kref *kref)
3352{
3353 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3354
3355 kfree(spec->pool_name);
3356 kfree(spec->image_id);
3357 kfree(spec->image_name);
3358 kfree(spec->snap_name);
3359 kfree(spec);
3360}
3361
cc344fa1 3362static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3363 struct rbd_spec *spec)
3364{
3365 struct rbd_device *rbd_dev;
3366
3367 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3368 if (!rbd_dev)
3369 return NULL;
3370
3371 spin_lock_init(&rbd_dev->lock);
6d292906 3372 rbd_dev->flags = 0;
c53d5893
AE
3373 INIT_LIST_HEAD(&rbd_dev->node);
3374 INIT_LIST_HEAD(&rbd_dev->snaps);
3375 init_rwsem(&rbd_dev->header_rwsem);
3376
3377 rbd_dev->spec = spec;
3378 rbd_dev->rbd_client = rbdc;
3379
0903e875
AE
3380 /* Initialize the layout used for all rbd requests */
3381
3382 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3383 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3384 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3385 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3386
c53d5893
AE
3387 return rbd_dev;
3388}
3389
3390static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3391{
c53d5893
AE
3392 rbd_put_client(rbd_dev->rbd_client);
3393 rbd_spec_put(rbd_dev->spec);
3394 kfree(rbd_dev);
3395}
3396
6087b51b 3397static void rbd_snap_destroy(struct rbd_snap *snap)
dfc5606d 3398{
3e83b65b
AE
3399 kfree(snap->name);
3400 kfree(snap);
dfc5606d
YS
3401}
3402
6087b51b 3403static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
c8d18425 3404 const char *snap_name,
34b13184
AE
3405 u64 snap_id, u64 snap_size,
3406 u64 snap_features)
dfc5606d 3407{
4e891e0a 3408 struct rbd_snap *snap;
4e891e0a
AE
3409
3410 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 3411 if (!snap)
4e891e0a
AE
3412 return ERR_PTR(-ENOMEM);
3413
6e584f52 3414 snap->name = snap_name;
c8d18425
AE
3415 snap->id = snap_id;
3416 snap->size = snap_size;
34b13184 3417 snap->features = snap_features;
4e891e0a
AE
3418
3419 return snap;
dfc5606d
YS
3420}
3421
6e584f52
AE
3422/*
3423 * Returns a dynamically-allocated snapshot name if successful, or a
3424 * pointer-coded error otherwise.
3425 */
cb75223d 3426static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
cd892126
AE
3427 u64 *snap_size, u64 *snap_features)
3428{
cb75223d 3429 const char *snap_name;
6e584f52 3430 int i;
cd892126
AE
3431
3432 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3433
cd892126
AE
3434 /* Skip over names until we find the one we are looking for */
3435
3436 snap_name = rbd_dev->header.snap_names;
6e584f52 3437 for (i = 0; i < which; i++)
cd892126
AE
3438 snap_name += strlen(snap_name) + 1;
3439
6e584f52
AE
3440 snap_name = kstrdup(snap_name, GFP_KERNEL);
3441 if (!snap_name)
3442 return ERR_PTR(-ENOMEM);
3443
3444 *snap_size = rbd_dev->header.snap_sizes[which];
3445 *snap_features = 0; /* No features for v1 */
3446
cd892126
AE
3447 return snap_name;
3448}
3449
9d475de5
AE
3450/*
3451 * Get the size and object order for an image snapshot, or if
3452 * snap_id is CEPH_NOSNAP, gets this information for the base
3453 * image.
3454 */
3455static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3456 u8 *order, u64 *snap_size)
3457{
3458 __le64 snapid = cpu_to_le64(snap_id);
3459 int ret;
3460 struct {
3461 u8 order;
3462 __le64 size;
3463 } __attribute__ ((packed)) size_buf = { 0 };
3464
36be9a76 3465 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3466 "rbd", "get_size",
4157976b
AE
3467 &snapid, sizeof (snapid),
3468 &size_buf, sizeof (size_buf), NULL);
36be9a76 3469 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3470 if (ret < 0)
3471 return ret;
57385b51
AE
3472 if (ret < sizeof (size_buf))
3473 return -ERANGE;
9d475de5 3474
c86f86e9
AE
3475 if (order)
3476 *order = size_buf.order;
9d475de5
AE
3477 *snap_size = le64_to_cpu(size_buf.size);
3478
3479 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3480 (unsigned long long)snap_id, (unsigned int)*order,
3481 (unsigned long long)*snap_size);
9d475de5
AE
3482
3483 return 0;
3484}
3485
3486static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3487{
3488 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3489 &rbd_dev->header.obj_order,
3490 &rbd_dev->header.image_size);
3491}
3492
1e130199
AE
3493static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3494{
3495 void *reply_buf;
3496 int ret;
3497 void *p;
3498
3499 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3500 if (!reply_buf)
3501 return -ENOMEM;
3502
36be9a76 3503 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3504 "rbd", "get_object_prefix", NULL, 0,
07b2391f 3505 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 3506 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3507 if (ret < 0)
3508 goto out;
3509
3510 p = reply_buf;
3511 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3512 p + ret, NULL, GFP_NOIO);
3513 ret = 0;
1e130199
AE
3514
3515 if (IS_ERR(rbd_dev->header.object_prefix)) {
3516 ret = PTR_ERR(rbd_dev->header.object_prefix);
3517 rbd_dev->header.object_prefix = NULL;
3518 } else {
3519 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3520 }
1e130199
AE
3521out:
3522 kfree(reply_buf);
3523
3524 return ret;
3525}
3526
b1b5402a
AE
3527static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3528 u64 *snap_features)
3529{
3530 __le64 snapid = cpu_to_le64(snap_id);
3531 struct {
3532 __le64 features;
3533 __le64 incompat;
4157976b 3534 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3535 u64 incompat;
b1b5402a
AE
3536 int ret;
3537
36be9a76 3538 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3539 "rbd", "get_features",
4157976b
AE
3540 &snapid, sizeof (snapid),
3541 &features_buf, sizeof (features_buf), NULL);
36be9a76 3542 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3543 if (ret < 0)
3544 return ret;
57385b51
AE
3545 if (ret < sizeof (features_buf))
3546 return -ERANGE;
d889140c
AE
3547
3548 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3549 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3550 return -ENXIO;
d889140c 3551
b1b5402a
AE
3552 *snap_features = le64_to_cpu(features_buf.features);
3553
3554 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3555 (unsigned long long)snap_id,
3556 (unsigned long long)*snap_features,
3557 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3558
3559 return 0;
3560}
3561
3562static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3563{
3564 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3565 &rbd_dev->header.features);
3566}
3567
86b00e0d
AE
3568static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3569{
3570 struct rbd_spec *parent_spec;
3571 size_t size;
3572 void *reply_buf = NULL;
3573 __le64 snapid;
3574 void *p;
3575 void *end;
3576 char *image_id;
3577 u64 overlap;
86b00e0d
AE
3578 int ret;
3579
3580 parent_spec = rbd_spec_alloc();
3581 if (!parent_spec)
3582 return -ENOMEM;
3583
3584 size = sizeof (__le64) + /* pool_id */
3585 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3586 sizeof (__le64) + /* snap_id */
3587 sizeof (__le64); /* overlap */
3588 reply_buf = kmalloc(size, GFP_KERNEL);
3589 if (!reply_buf) {
3590 ret = -ENOMEM;
3591 goto out_err;
3592 }
3593
3594 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3595 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3596 "rbd", "get_parent",
4157976b
AE
3597 &snapid, sizeof (snapid),
3598 reply_buf, size, NULL);
36be9a76 3599 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3600 if (ret < 0)
3601 goto out_err;
3602
86b00e0d 3603 p = reply_buf;
57385b51
AE
3604 end = reply_buf + ret;
3605 ret = -ERANGE;
86b00e0d
AE
3606 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3607 if (parent_spec->pool_id == CEPH_NOPOOL)
3608 goto out; /* No parent? No problem. */
3609
0903e875
AE
3610 /* The ceph file layout needs to fit pool id in 32 bits */
3611
3612 ret = -EIO;
c0cd10db
AE
3613 if (parent_spec->pool_id > (u64)U32_MAX) {
3614 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3615 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3616 goto out_err;
c0cd10db 3617 }
0903e875 3618
979ed480 3619 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3620 if (IS_ERR(image_id)) {
3621 ret = PTR_ERR(image_id);
3622 goto out_err;
3623 }
3624 parent_spec->image_id = image_id;
3625 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3626 ceph_decode_64_safe(&p, end, overlap, out_err);
3627
3628 rbd_dev->parent_overlap = overlap;
3629 rbd_dev->parent_spec = parent_spec;
3630 parent_spec = NULL; /* rbd_dev now owns this */
3631out:
3632 ret = 0;
3633out_err:
3634 kfree(reply_buf);
3635 rbd_spec_put(parent_spec);
3636
3637 return ret;
3638}
3639
cc070d59
AE
3640static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3641{
3642 struct {
3643 __le64 stripe_unit;
3644 __le64 stripe_count;
3645 } __attribute__ ((packed)) striping_info_buf = { 0 };
3646 size_t size = sizeof (striping_info_buf);
3647 void *p;
3648 u64 obj_size;
3649 u64 stripe_unit;
3650 u64 stripe_count;
3651 int ret;
3652
3653 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3654 "rbd", "get_stripe_unit_count", NULL, 0,
3655 (char *)&striping_info_buf, size, NULL);
3656 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3657 if (ret < 0)
3658 return ret;
3659 if (ret < size)
3660 return -ERANGE;
3661
3662 /*
3663 * We don't actually support the "fancy striping" feature
3664 * (STRIPINGV2) yet, but if the striping sizes are the
3665 * defaults the behavior is the same as before. So find
3666 * out, and only fail if the image has non-default values.
3667 */
3668 ret = -EINVAL;
3669 obj_size = (u64)1 << rbd_dev->header.obj_order;
3670 p = &striping_info_buf;
3671 stripe_unit = ceph_decode_64(&p);
3672 if (stripe_unit != obj_size) {
3673 rbd_warn(rbd_dev, "unsupported stripe unit "
3674 "(got %llu want %llu)",
3675 stripe_unit, obj_size);
3676 return -EINVAL;
3677 }
3678 stripe_count = ceph_decode_64(&p);
3679 if (stripe_count != 1) {
3680 rbd_warn(rbd_dev, "unsupported stripe count "
3681 "(got %llu want 1)", stripe_count);
3682 return -EINVAL;
3683 }
500d0c0f
AE
3684 rbd_dev->header.stripe_unit = stripe_unit;
3685 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3686
3687 return 0;
3688}
3689
9e15b77d
AE
3690static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3691{
3692 size_t image_id_size;
3693 char *image_id;
3694 void *p;
3695 void *end;
3696 size_t size;
3697 void *reply_buf = NULL;
3698 size_t len = 0;
3699 char *image_name = NULL;
3700 int ret;
3701
3702 rbd_assert(!rbd_dev->spec->image_name);
3703
69e7a02f
AE
3704 len = strlen(rbd_dev->spec->image_id);
3705 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3706 image_id = kmalloc(image_id_size, GFP_KERNEL);
3707 if (!image_id)
3708 return NULL;
3709
3710 p = image_id;
4157976b 3711 end = image_id + image_id_size;
57385b51 3712 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3713
3714 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3715 reply_buf = kmalloc(size, GFP_KERNEL);
3716 if (!reply_buf)
3717 goto out;
3718
36be9a76 3719 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3720 "rbd", "dir_get_name",
3721 image_id, image_id_size,
4157976b 3722 reply_buf, size, NULL);
9e15b77d
AE
3723 if (ret < 0)
3724 goto out;
3725 p = reply_buf;
f40eb349
AE
3726 end = reply_buf + ret;
3727
9e15b77d
AE
3728 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3729 if (IS_ERR(image_name))
3730 image_name = NULL;
3731 else
3732 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3733out:
3734 kfree(reply_buf);
3735 kfree(image_id);
3736
3737 return image_name;
3738}
3739
3740/*
2e9f7f1c
AE
3741 * When an rbd image has a parent image, it is identified by the
3742 * pool, image, and snapshot ids (not names). This function fills
3743 * in the names for those ids. (It's OK if we can't figure out the
3744 * name for an image id, but the pool and snapshot ids should always
3745 * exist and have names.) All names in an rbd spec are dynamically
3746 * allocated.
e1d4213f
AE
3747 *
3748 * When an image being mapped (not a parent) is probed, we have the
3749 * pool name and pool id, image name and image id, and the snapshot
3750 * name. The only thing we're missing is the snapshot id.
2e9f7f1c
AE
3751 *
3752 * The set of snapshots for an image is not known until they have
3753 * been read by rbd_dev_snaps_update(), so we can't completely fill
3754 * in this information until after that has been called.
9e15b77d 3755 */
2e9f7f1c 3756static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3757{
2e9f7f1c
AE
3758 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3759 struct rbd_spec *spec = rbd_dev->spec;
3760 const char *pool_name;
3761 const char *image_name;
3762 const char *snap_name;
9e15b77d
AE
3763 int ret;
3764
e1d4213f
AE
3765 /*
3766 * An image being mapped will have the pool name (etc.), but
3767 * we need to look up the snapshot id.
3768 */
2e9f7f1c
AE
3769 if (spec->pool_name) {
3770 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
e1d4213f
AE
3771 struct rbd_snap *snap;
3772
2e9f7f1c 3773 snap = snap_by_name(rbd_dev, spec->snap_name);
e1d4213f
AE
3774 if (!snap)
3775 return -ENOENT;
2e9f7f1c 3776 spec->snap_id = snap->id;
e1d4213f 3777 } else {
2e9f7f1c 3778 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3779 }
3780
3781 return 0;
3782 }
9e15b77d 3783
2e9f7f1c 3784 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3785
2e9f7f1c
AE
3786 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3787 if (!pool_name) {
3788 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3789 return -EIO;
3790 }
2e9f7f1c
AE
3791 pool_name = kstrdup(pool_name, GFP_KERNEL);
3792 if (!pool_name)
9e15b77d
AE
3793 return -ENOMEM;
3794
3795 /* Fetch the image name; tolerate failure here */
3796
2e9f7f1c
AE
3797 image_name = rbd_dev_image_name(rbd_dev);
3798 if (!image_name)
06ecc6cb 3799 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3800
2e9f7f1c 3801 /* Look up the snapshot name, and make a copy */
9e15b77d 3802
2e9f7f1c
AE
3803 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3804 if (!snap_name) {
3805 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
9e15b77d
AE
3806 ret = -EIO;
3807 goto out_err;
3808 }
2e9f7f1c
AE
3809 snap_name = kstrdup(snap_name, GFP_KERNEL);
3810 if (!snap_name) {
3811 ret = -ENOMEM;
9e15b77d 3812 goto out_err;
2e9f7f1c
AE
3813 }
3814
3815 spec->pool_name = pool_name;
3816 spec->image_name = image_name;
3817 spec->snap_name = snap_name;
9e15b77d
AE
3818
3819 return 0;
3820out_err:
2e9f7f1c
AE
3821 kfree(image_name);
3822 kfree(pool_name);
9e15b77d
AE
3823
3824 return ret;
3825}
3826
6e14b1a6 3827static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3828{
3829 size_t size;
3830 int ret;
3831 void *reply_buf;
3832 void *p;
3833 void *end;
3834 u64 seq;
3835 u32 snap_count;
3836 struct ceph_snap_context *snapc;
3837 u32 i;
3838
3839 /*
3840 * We'll need room for the seq value (maximum snapshot id),
3841 * snapshot count, and array of that many snapshot ids.
3842 * For now we have a fixed upper limit on the number we're
3843 * prepared to receive.
3844 */
3845 size = sizeof (__le64) + sizeof (__le32) +
3846 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3847 reply_buf = kzalloc(size, GFP_KERNEL);
3848 if (!reply_buf)
3849 return -ENOMEM;
3850
36be9a76 3851 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3852 "rbd", "get_snapcontext", NULL, 0,
07b2391f 3853 reply_buf, size, ver);
36be9a76 3854 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3855 if (ret < 0)
3856 goto out;
3857
35d489f9 3858 p = reply_buf;
57385b51
AE
3859 end = reply_buf + ret;
3860 ret = -ERANGE;
35d489f9
AE
3861 ceph_decode_64_safe(&p, end, seq, out);
3862 ceph_decode_32_safe(&p, end, snap_count, out);
3863
3864 /*
3865 * Make sure the reported number of snapshot ids wouldn't go
3866 * beyond the end of our buffer. But before checking that,
3867 * make sure the computed size of the snapshot context we
3868 * allocate is representable in a size_t.
3869 */
3870 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3871 / sizeof (u64)) {
3872 ret = -EINVAL;
3873 goto out;
3874 }
3875 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3876 goto out;
468521c1 3877 ret = 0;
35d489f9 3878
812164f8 3879 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3880 if (!snapc) {
3881 ret = -ENOMEM;
3882 goto out;
3883 }
35d489f9 3884 snapc->seq = seq;
35d489f9
AE
3885 for (i = 0; i < snap_count; i++)
3886 snapc->snaps[i] = ceph_decode_64(&p);
3887
3888 rbd_dev->header.snapc = snapc;
3889
3890 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3891 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3892out:
3893 kfree(reply_buf);
3894
57385b51 3895 return ret;
35d489f9
AE
3896}
3897
cb75223d 3898static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
b8b1e2db
AE
3899{
3900 size_t size;
3901 void *reply_buf;
3902 __le64 snap_id;
3903 int ret;
3904 void *p;
3905 void *end;
b8b1e2db
AE
3906 char *snap_name;
3907
3908 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3909 reply_buf = kmalloc(size, GFP_KERNEL);
3910 if (!reply_buf)
3911 return ERR_PTR(-ENOMEM);
3912
acb1b6ca 3913 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3914 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3915 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 3916 "rbd", "get_snapshot_name",
4157976b 3917 &snap_id, sizeof (snap_id),
07b2391f 3918 reply_buf, size, NULL);
36be9a76 3919 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
3920 if (ret < 0) {
3921 snap_name = ERR_PTR(ret);
b8b1e2db 3922 goto out;
f40eb349 3923 }
b8b1e2db
AE
3924
3925 p = reply_buf;
f40eb349 3926 end = reply_buf + ret;
e5c35534 3927 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 3928 if (IS_ERR(snap_name))
b8b1e2db 3929 goto out;
b8b1e2db 3930
f40eb349
AE
3931 dout(" snap_id 0x%016llx snap_name = %s\n",
3932 (unsigned long long)le64_to_cpu(snap_id), snap_name);
b8b1e2db
AE
3933out:
3934 kfree(reply_buf);
3935
f40eb349 3936 return snap_name;
b8b1e2db
AE
3937}
3938
cb75223d 3939static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
b8b1e2db
AE
3940 u64 *snap_size, u64 *snap_features)
3941{
e0b49868 3942 u64 snap_id;
acb1b6ca
AE
3943 u64 size;
3944 u64 features;
cb75223d 3945 const char *snap_name;
b8b1e2db
AE
3946 int ret;
3947
acb1b6ca 3948 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3949 snap_id = rbd_dev->header.snapc->snaps[which];
acb1b6ca 3950 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
b8b1e2db 3951 if (ret)
acb1b6ca
AE
3952 goto out_err;
3953
3954 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
b8b1e2db 3955 if (ret)
acb1b6ca
AE
3956 goto out_err;
3957
3958 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3959 if (!IS_ERR(snap_name)) {
3960 *snap_size = size;
3961 *snap_features = features;
3962 }
b8b1e2db 3963
acb1b6ca
AE
3964 return snap_name;
3965out_err:
3966 return ERR_PTR(ret);
b8b1e2db
AE
3967}
3968
cb75223d 3969static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
b8b1e2db
AE
3970 u64 *snap_size, u64 *snap_features)
3971{
3972 if (rbd_dev->image_format == 1)
3973 return rbd_dev_v1_snap_info(rbd_dev, which,
3974 snap_size, snap_features);
3975 if (rbd_dev->image_format == 2)
3976 return rbd_dev_v2_snap_info(rbd_dev, which,
3977 snap_size, snap_features);
3978 return ERR_PTR(-EINVAL);
3979}
3980
117973fb
AE
3981static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3982{
3983 int ret;
117973fb
AE
3984
3985 down_write(&rbd_dev->header_rwsem);
3986
117973fb
AE
3987 ret = rbd_dev_v2_image_size(rbd_dev);
3988 if (ret)
3989 goto out;
117973fb
AE
3990 rbd_update_mapping_size(rbd_dev);
3991
3992 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3993 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3994 if (ret)
3995 goto out;
3996 ret = rbd_dev_snaps_update(rbd_dev);
3997 dout("rbd_dev_snaps_update returned %d\n", ret);
3998 if (ret)
3999 goto out;
117973fb
AE
4000out:
4001 up_write(&rbd_dev->header_rwsem);
4002
4003 return ret;
4004}
4005
dfc5606d 4006/*
35938150
AE
4007 * Scan the rbd device's current snapshot list and compare it to the
4008 * newly-received snapshot context. Remove any existing snapshots
4009 * not present in the new snapshot context. Add a new snapshot for
4010 * any snaphots in the snapshot context not in the current list.
4011 * And verify there are no changes to snapshots we already know
4012 * about.
4013 *
4014 * Assumes the snapshots in the snapshot context are sorted by
4015 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4016 * are also maintained in that order.)
522a0cc0
AE
4017 *
4018 * Note that any error occurs while updating the snapshot list
4019 * aborts the update, and the entire list is cleared. The snapshot
4020 * list becomes inconsistent at that point anyway, so it might as
4021 * well be empty.
dfc5606d 4022 */
304f6808 4023static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 4024{
35938150
AE
4025 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4026 const u32 snap_count = snapc->num_snaps;
35938150
AE
4027 struct list_head *head = &rbd_dev->snaps;
4028 struct list_head *links = head->next;
4029 u32 index = 0;
522a0cc0 4030 int ret = 0;
dfc5606d 4031
522a0cc0 4032 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
35938150
AE
4033 while (index < snap_count || links != head) {
4034 u64 snap_id;
4035 struct rbd_snap *snap;
cb75223d 4036 const char *snap_name;
cd892126
AE
4037 u64 snap_size = 0;
4038 u64 snap_features = 0;
dfc5606d 4039
35938150
AE
4040 snap_id = index < snap_count ? snapc->snaps[index]
4041 : CEPH_NOSNAP;
4042 snap = links != head ? list_entry(links, struct rbd_snap, node)
4043 : NULL;
aafb230e 4044 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 4045
35938150
AE
4046 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4047 struct list_head *next = links->next;
dfc5606d 4048
6d292906
AE
4049 /*
4050 * A previously-existing snapshot is not in
4051 * the new snap context.
4052 *
522a0cc0
AE
4053 * If the now-missing snapshot is the one
4054 * the image represents, clear its existence
4055 * flag so we can avoid sending any more
4056 * requests to it.
6d292906 4057 */
0d7dbfce 4058 if (rbd_dev->spec->snap_id == snap->id)
6d292906 4059 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3e83b65b 4060 dout("removing %ssnap id %llu\n",
0d7dbfce
AE
4061 rbd_dev->spec->snap_id == snap->id ?
4062 "mapped " : "",
522a0cc0 4063 (unsigned long long)snap->id);
6087b51b
AE
4064
4065 list_del(&snap->node);
4066 rbd_snap_destroy(snap);
35938150
AE
4067
4068 /* Done with this list entry; advance */
4069
4070 links = next;
dfc5606d
YS
4071 continue;
4072 }
35938150 4073
b8b1e2db
AE
4074 snap_name = rbd_dev_snap_info(rbd_dev, index,
4075 &snap_size, &snap_features);
522a0cc0
AE
4076 if (IS_ERR(snap_name)) {
4077 ret = PTR_ERR(snap_name);
4078 dout("failed to get snap info, error %d\n", ret);
4079 goto out_err;
4080 }
cd892126 4081
522a0cc0
AE
4082 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4083 (unsigned long long)snap_id);
35938150
AE
4084 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4085 struct rbd_snap *new_snap;
4086
4087 /* We haven't seen this snapshot before */
4088
6087b51b 4089 new_snap = rbd_snap_create(rbd_dev, snap_name,
cd892126 4090 snap_id, snap_size, snap_features);
9fcbb800 4091 if (IS_ERR(new_snap)) {
522a0cc0
AE
4092 ret = PTR_ERR(new_snap);
4093 dout(" failed to add dev, error %d\n", ret);
4094 goto out_err;
9fcbb800 4095 }
35938150
AE
4096
4097 /* New goes before existing, or at end of list */
4098
9fcbb800 4099 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
4100 if (snap)
4101 list_add_tail(&new_snap->node, &snap->node);
4102 else
523f3258 4103 list_add_tail(&new_snap->node, head);
35938150
AE
4104 } else {
4105 /* Already have this one */
4106
9fcbb800
AE
4107 dout(" already present\n");
4108
cd892126 4109 rbd_assert(snap->size == snap_size);
aafb230e 4110 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 4111 rbd_assert(snap->features == snap_features);
35938150
AE
4112
4113 /* Done with this list entry; advance */
4114
4115 links = links->next;
dfc5606d 4116 }
35938150
AE
4117
4118 /* Advance to the next entry in the snapshot context */
4119
4120 index++;
dfc5606d 4121 }
9fcbb800 4122 dout("%s: done\n", __func__);
dfc5606d
YS
4123
4124 return 0;
522a0cc0
AE
4125out_err:
4126 rbd_remove_all_snaps(rbd_dev);
4127
4128 return ret;
dfc5606d
YS
4129}
4130
dfc5606d
YS
4131static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4132{
dfc5606d 4133 struct device *dev;
cd789ab9 4134 int ret;
dfc5606d
YS
4135
4136 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4137
cd789ab9 4138 dev = &rbd_dev->dev;
dfc5606d
YS
4139 dev->bus = &rbd_bus_type;
4140 dev->type = &rbd_device_type;
4141 dev->parent = &rbd_root_dev;
200a6a8b 4142 dev->release = rbd_dev_device_release;
de71a297 4143 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4144 ret = device_register(dev);
dfc5606d 4145
dfc5606d 4146 mutex_unlock(&ctl_mutex);
cd789ab9 4147
dfc5606d 4148 return ret;
602adf40
YS
4149}
4150
dfc5606d
YS
4151static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4152{
4153 device_unregister(&rbd_dev->dev);
4154}
4155
e2839308 4156static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4157
4158/*
499afd5b
AE
4159 * Get a unique rbd identifier for the given new rbd_dev, and add
4160 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4161 */
e2839308 4162static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4163{
e2839308 4164 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4165
4166 spin_lock(&rbd_dev_list_lock);
4167 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4168 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4169 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4170 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4171}
b7f23c36 4172
1ddbe94e 4173/*
499afd5b
AE
4174 * Remove an rbd_dev from the global list, and record that its
4175 * identifier is no longer in use.
1ddbe94e 4176 */
e2839308 4177static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4178{
d184f6bf 4179 struct list_head *tmp;
de71a297 4180 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4181 int max_id;
4182
aafb230e 4183 rbd_assert(rbd_id > 0);
499afd5b 4184
e2839308
AE
4185 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4186 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4187 spin_lock(&rbd_dev_list_lock);
4188 list_del_init(&rbd_dev->node);
d184f6bf
AE
4189
4190 /*
4191 * If the id being "put" is not the current maximum, there
4192 * is nothing special we need to do.
4193 */
e2839308 4194 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4195 spin_unlock(&rbd_dev_list_lock);
4196 return;
4197 }
4198
4199 /*
4200 * We need to update the current maximum id. Search the
4201 * list to find out what it is. We're more likely to find
4202 * the maximum at the end, so search the list backward.
4203 */
4204 max_id = 0;
4205 list_for_each_prev(tmp, &rbd_dev_list) {
4206 struct rbd_device *rbd_dev;
4207
4208 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4209 if (rbd_dev->dev_id > max_id)
4210 max_id = rbd_dev->dev_id;
d184f6bf 4211 }
499afd5b 4212 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4213
1ddbe94e 4214 /*
e2839308 4215 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4216 * which case it now accurately reflects the new maximum.
4217 * Be careful not to overwrite the maximum value in that
4218 * case.
1ddbe94e 4219 */
e2839308
AE
4220 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4221 dout(" max dev id has been reset\n");
b7f23c36
AE
4222}
4223
e28fff26
AE
4224/*
4225 * Skips over white space at *buf, and updates *buf to point to the
4226 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4227 * the token (string of non-white space characters) found. Note
4228 * that *buf must be terminated with '\0'.
e28fff26
AE
4229 */
4230static inline size_t next_token(const char **buf)
4231{
4232 /*
4233 * These are the characters that produce nonzero for
4234 * isspace() in the "C" and "POSIX" locales.
4235 */
4236 const char *spaces = " \f\n\r\t\v";
4237
4238 *buf += strspn(*buf, spaces); /* Find start of token */
4239
4240 return strcspn(*buf, spaces); /* Return token length */
4241}
4242
4243/*
4244 * Finds the next token in *buf, and if the provided token buffer is
4245 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4246 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4247 * must be terminated with '\0' on entry.
e28fff26
AE
4248 *
4249 * Returns the length of the token found (not including the '\0').
4250 * Return value will be 0 if no token is found, and it will be >=
4251 * token_size if the token would not fit.
4252 *
593a9e7b 4253 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4254 * found token. Note that this occurs even if the token buffer is
4255 * too small to hold it.
4256 */
4257static inline size_t copy_token(const char **buf,
4258 char *token,
4259 size_t token_size)
4260{
4261 size_t len;
4262
4263 len = next_token(buf);
4264 if (len < token_size) {
4265 memcpy(token, *buf, len);
4266 *(token + len) = '\0';
4267 }
4268 *buf += len;
4269
4270 return len;
4271}
4272
ea3352f4
AE
4273/*
4274 * Finds the next token in *buf, dynamically allocates a buffer big
4275 * enough to hold a copy of it, and copies the token into the new
4276 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4277 * that a duplicate buffer is created even for a zero-length token.
4278 *
4279 * Returns a pointer to the newly-allocated duplicate, or a null
4280 * pointer if memory for the duplicate was not available. If
4281 * the lenp argument is a non-null pointer, the length of the token
4282 * (not including the '\0') is returned in *lenp.
4283 *
4284 * If successful, the *buf pointer will be updated to point beyond
4285 * the end of the found token.
4286 *
4287 * Note: uses GFP_KERNEL for allocation.
4288 */
4289static inline char *dup_token(const char **buf, size_t *lenp)
4290{
4291 char *dup;
4292 size_t len;
4293
4294 len = next_token(buf);
4caf35f9 4295 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4296 if (!dup)
4297 return NULL;
ea3352f4
AE
4298 *(dup + len) = '\0';
4299 *buf += len;
4300
4301 if (lenp)
4302 *lenp = len;
4303
4304 return dup;
4305}
4306
a725f65e 4307/*
859c31df
AE
4308 * Parse the options provided for an "rbd add" (i.e., rbd image
4309 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4310 * and the data written is passed here via a NUL-terminated buffer.
4311 * Returns 0 if successful or an error code otherwise.
d22f76e7 4312 *
859c31df
AE
4313 * The information extracted from these options is recorded in
4314 * the other parameters which return dynamically-allocated
4315 * structures:
4316 * ceph_opts
4317 * The address of a pointer that will refer to a ceph options
4318 * structure. Caller must release the returned pointer using
4319 * ceph_destroy_options() when it is no longer needed.
4320 * rbd_opts
4321 * Address of an rbd options pointer. Fully initialized by
4322 * this function; caller must release with kfree().
4323 * spec
4324 * Address of an rbd image specification pointer. Fully
4325 * initialized by this function based on parsed options.
4326 * Caller must release with rbd_spec_put().
4327 *
4328 * The options passed take this form:
4329 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4330 * where:
4331 * <mon_addrs>
4332 * A comma-separated list of one or more monitor addresses.
4333 * A monitor address is an ip address, optionally followed
4334 * by a port number (separated by a colon).
4335 * I.e.: ip1[:port1][,ip2[:port2]...]
4336 * <options>
4337 * A comma-separated list of ceph and/or rbd options.
4338 * <pool_name>
4339 * The name of the rados pool containing the rbd image.
4340 * <image_name>
4341 * The name of the image in that pool to map.
4342 * <snap_id>
4343 * An optional snapshot id. If provided, the mapping will
4344 * present data from the image at the time that snapshot was
4345 * created. The image head is used if no snapshot id is
4346 * provided. Snapshot mappings are always read-only.
a725f65e 4347 */
859c31df 4348static int rbd_add_parse_args(const char *buf,
dc79b113 4349 struct ceph_options **ceph_opts,
859c31df
AE
4350 struct rbd_options **opts,
4351 struct rbd_spec **rbd_spec)
e28fff26 4352{
d22f76e7 4353 size_t len;
859c31df 4354 char *options;
0ddebc0c 4355 const char *mon_addrs;
ecb4dc22 4356 char *snap_name;
0ddebc0c 4357 size_t mon_addrs_size;
859c31df 4358 struct rbd_spec *spec = NULL;
4e9afeba 4359 struct rbd_options *rbd_opts = NULL;
859c31df 4360 struct ceph_options *copts;
dc79b113 4361 int ret;
e28fff26
AE
4362
4363 /* The first four tokens are required */
4364
7ef3214a 4365 len = next_token(&buf);
4fb5d671
AE
4366 if (!len) {
4367 rbd_warn(NULL, "no monitor address(es) provided");
4368 return -EINVAL;
4369 }
0ddebc0c 4370 mon_addrs = buf;
f28e565a 4371 mon_addrs_size = len + 1;
7ef3214a 4372 buf += len;
a725f65e 4373
dc79b113 4374 ret = -EINVAL;
f28e565a
AE
4375 options = dup_token(&buf, NULL);
4376 if (!options)
dc79b113 4377 return -ENOMEM;
4fb5d671
AE
4378 if (!*options) {
4379 rbd_warn(NULL, "no options provided");
4380 goto out_err;
4381 }
e28fff26 4382
859c31df
AE
4383 spec = rbd_spec_alloc();
4384 if (!spec)
f28e565a 4385 goto out_mem;
859c31df
AE
4386
4387 spec->pool_name = dup_token(&buf, NULL);
4388 if (!spec->pool_name)
4389 goto out_mem;
4fb5d671
AE
4390 if (!*spec->pool_name) {
4391 rbd_warn(NULL, "no pool name provided");
4392 goto out_err;
4393 }
e28fff26 4394
69e7a02f 4395 spec->image_name = dup_token(&buf, NULL);
859c31df 4396 if (!spec->image_name)
f28e565a 4397 goto out_mem;
4fb5d671
AE
4398 if (!*spec->image_name) {
4399 rbd_warn(NULL, "no image name provided");
4400 goto out_err;
4401 }
d4b125e9 4402
f28e565a
AE
4403 /*
4404 * Snapshot name is optional; default is to use "-"
4405 * (indicating the head/no snapshot).
4406 */
3feeb894 4407 len = next_token(&buf);
820a5f3e 4408 if (!len) {
3feeb894
AE
4409 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4410 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4411 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4412 ret = -ENAMETOOLONG;
f28e565a 4413 goto out_err;
849b4260 4414 }
ecb4dc22
AE
4415 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4416 if (!snap_name)
f28e565a 4417 goto out_mem;
ecb4dc22
AE
4418 *(snap_name + len) = '\0';
4419 spec->snap_name = snap_name;
e5c35534 4420
0ddebc0c 4421 /* Initialize all rbd options to the defaults */
e28fff26 4422
4e9afeba
AE
4423 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4424 if (!rbd_opts)
4425 goto out_mem;
4426
4427 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4428
859c31df 4429 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4430 mon_addrs + mon_addrs_size - 1,
4e9afeba 4431 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4432 if (IS_ERR(copts)) {
4433 ret = PTR_ERR(copts);
dc79b113
AE
4434 goto out_err;
4435 }
859c31df
AE
4436 kfree(options);
4437
4438 *ceph_opts = copts;
4e9afeba 4439 *opts = rbd_opts;
859c31df 4440 *rbd_spec = spec;
0ddebc0c 4441
dc79b113 4442 return 0;
f28e565a 4443out_mem:
dc79b113 4444 ret = -ENOMEM;
d22f76e7 4445out_err:
859c31df
AE
4446 kfree(rbd_opts);
4447 rbd_spec_put(spec);
f28e565a 4448 kfree(options);
d22f76e7 4449
dc79b113 4450 return ret;
a725f65e
AE
4451}
4452
589d30e0
AE
4453/*
4454 * An rbd format 2 image has a unique identifier, distinct from the
4455 * name given to it by the user. Internally, that identifier is
4456 * what's used to specify the names of objects related to the image.
4457 *
4458 * A special "rbd id" object is used to map an rbd image name to its
4459 * id. If that object doesn't exist, then there is no v2 rbd image
4460 * with the supplied name.
4461 *
4462 * This function will record the given rbd_dev's image_id field if
4463 * it can be determined, and in that case will return 0. If any
4464 * errors occur a negative errno will be returned and the rbd_dev's
4465 * image_id field will be unchanged (and should be NULL).
4466 */
4467static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4468{
4469 int ret;
4470 size_t size;
4471 char *object_name;
4472 void *response;
c0fba368 4473 char *image_id;
2f82ee54 4474
2c0d0a10
AE
4475 /*
4476 * When probing a parent image, the image id is already
4477 * known (and the image name likely is not). There's no
c0fba368
AE
4478 * need to fetch the image id again in this case. We
4479 * do still need to set the image format though.
2c0d0a10 4480 */
c0fba368
AE
4481 if (rbd_dev->spec->image_id) {
4482 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4483
2c0d0a10 4484 return 0;
c0fba368 4485 }
2c0d0a10 4486
589d30e0
AE
4487 /*
4488 * First, see if the format 2 image id file exists, and if
4489 * so, get the image's persistent id from it.
4490 */
69e7a02f 4491 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4492 object_name = kmalloc(size, GFP_NOIO);
4493 if (!object_name)
4494 return -ENOMEM;
0d7dbfce 4495 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4496 dout("rbd id object name is %s\n", object_name);
4497
4498 /* Response will be an encoded string, which includes a length */
4499
4500 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4501 response = kzalloc(size, GFP_NOIO);
4502 if (!response) {
4503 ret = -ENOMEM;
4504 goto out;
4505 }
4506
c0fba368
AE
4507 /* If it doesn't exist we'll assume it's a format 1 image */
4508
36be9a76 4509 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4510 "rbd", "get_id", NULL, 0,
07b2391f 4511 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 4512 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4513 if (ret == -ENOENT) {
4514 image_id = kstrdup("", GFP_KERNEL);
4515 ret = image_id ? 0 : -ENOMEM;
4516 if (!ret)
4517 rbd_dev->image_format = 1;
4518 } else if (ret > sizeof (__le32)) {
4519 void *p = response;
4520
4521 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4522 NULL, GFP_NOIO);
c0fba368
AE
4523 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4524 if (!ret)
4525 rbd_dev->image_format = 2;
589d30e0 4526 } else {
c0fba368
AE
4527 ret = -EINVAL;
4528 }
4529
4530 if (!ret) {
4531 rbd_dev->spec->image_id = image_id;
4532 dout("image_id is %s\n", image_id);
589d30e0
AE
4533 }
4534out:
4535 kfree(response);
4536 kfree(object_name);
4537
4538 return ret;
4539}
4540
6fd48b3b
AE
4541/* Undo whatever state changes are made by v1 or v2 image probe */
4542
4543static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4544{
4545 struct rbd_image_header *header;
4546
4547 rbd_dev_remove_parent(rbd_dev);
4548 rbd_spec_put(rbd_dev->parent_spec);
4549 rbd_dev->parent_spec = NULL;
4550 rbd_dev->parent_overlap = 0;
4551
4552 /* Free dynamic fields from the header, then zero it out */
4553
4554 header = &rbd_dev->header;
812164f8 4555 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4556 kfree(header->snap_sizes);
4557 kfree(header->snap_names);
4558 kfree(header->object_prefix);
4559 memset(header, 0, sizeof (*header));
4560}
4561
a30b71b9
AE
4562static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4563{
4564 int ret;
a30b71b9
AE
4565
4566 /* Populate rbd image metadata */
4567
4568 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4569 if (ret < 0)
4570 goto out_err;
86b00e0d
AE
4571
4572 /* Version 1 images have no parent (no layering) */
4573
4574 rbd_dev->parent_spec = NULL;
4575 rbd_dev->parent_overlap = 0;
4576
a30b71b9
AE
4577 dout("discovered version 1 image, header name is %s\n",
4578 rbd_dev->header_name);
4579
4580 return 0;
4581
4582out_err:
4583 kfree(rbd_dev->header_name);
4584 rbd_dev->header_name = NULL;
0d7dbfce
AE
4585 kfree(rbd_dev->spec->image_id);
4586 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4587
4588 return ret;
4589}
4590
4591static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4592{
9d475de5 4593 int ret;
6e14b1a6 4594 u64 ver = 0;
a30b71b9 4595
9d475de5 4596 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4597 if (ret)
1e130199
AE
4598 goto out_err;
4599
4600 /* Get the object prefix (a.k.a. block_name) for the image */
4601
4602 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4603 if (ret)
b1b5402a
AE
4604 goto out_err;
4605
d889140c 4606 /* Get the and check features for the image */
b1b5402a
AE
4607
4608 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4609 if (ret)
9d475de5 4610 goto out_err;
35d489f9 4611
86b00e0d
AE
4612 /* If the image supports layering, get the parent info */
4613
4614 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4615 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4616 if (ret)
86b00e0d 4617 goto out_err;
96882f55
AE
4618
4619 /*
4620 * Don't print a warning for parent images. We can
4621 * tell this point because we won't know its pool
4622 * name yet (just its pool id).
4623 */
4624 if (rbd_dev->spec->pool_name)
4625 rbd_warn(rbd_dev, "WARNING: kernel layering "
4626 "is EXPERIMENTAL!");
86b00e0d
AE
4627 }
4628
cc070d59
AE
4629 /* If the image supports fancy striping, get its parameters */
4630
4631 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4632 ret = rbd_dev_v2_striping_info(rbd_dev);
4633 if (ret < 0)
4634 goto out_err;
4635 }
4636
6e14b1a6
AE
4637 /* crypto and compression type aren't (yet) supported for v2 images */
4638
4639 rbd_dev->header.crypt_type = 0;
4640 rbd_dev->header.comp_type = 0;
35d489f9 4641
6e14b1a6
AE
4642 /* Get the snapshot context, plus the header version */
4643
4644 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
4645 if (ret)
4646 goto out_err;
6e14b1a6 4647
a30b71b9
AE
4648 dout("discovered version 2 image, header name is %s\n",
4649 rbd_dev->header_name);
4650
35152979 4651 return 0;
9d475de5 4652out_err:
86b00e0d
AE
4653 rbd_dev->parent_overlap = 0;
4654 rbd_spec_put(rbd_dev->parent_spec);
4655 rbd_dev->parent_spec = NULL;
9d475de5
AE
4656 kfree(rbd_dev->header_name);
4657 rbd_dev->header_name = NULL;
1e130199
AE
4658 kfree(rbd_dev->header.object_prefix);
4659 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4660
4661 return ret;
a30b71b9
AE
4662}
4663
124afba2 4664static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4665{
2f82ee54 4666 struct rbd_device *parent = NULL;
124afba2
AE
4667 struct rbd_spec *parent_spec;
4668 struct rbd_client *rbdc;
4669 int ret;
4670
4671 if (!rbd_dev->parent_spec)
4672 return 0;
4673 /*
4674 * We need to pass a reference to the client and the parent
4675 * spec when creating the parent rbd_dev. Images related by
4676 * parent/child relationships always share both.
4677 */
4678 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4679 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4680
4681 ret = -ENOMEM;
4682 parent = rbd_dev_create(rbdc, parent_spec);
4683 if (!parent)
4684 goto out_err;
4685
4686 ret = rbd_dev_image_probe(parent);
4687 if (ret < 0)
4688 goto out_err;
4689 rbd_dev->parent = parent;
4690
4691 return 0;
4692out_err:
4693 if (parent) {
4694 rbd_spec_put(rbd_dev->parent_spec);
4695 kfree(rbd_dev->header_name);
4696 rbd_dev_destroy(parent);
4697 } else {
4698 rbd_put_client(rbdc);
4699 rbd_spec_put(parent_spec);
4700 }
4701
4702 return ret;
4703}
4704
200a6a8b 4705static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4706{
83a06263 4707 int ret;
d1cf5788
AE
4708
4709 ret = rbd_dev_mapping_set(rbd_dev);
83a06263 4710 if (ret)
9bb81c9b 4711 return ret;
5de10f3b 4712
83a06263
AE
4713 /* generate unique id: find highest unique id, add one */
4714 rbd_dev_id_get(rbd_dev);
4715
4716 /* Fill in the device name, now that we have its id. */
4717 BUILD_BUG_ON(DEV_NAME_LEN
4718 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4719 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4720
4721 /* Get our block major device number. */
4722
4723 ret = register_blkdev(0, rbd_dev->name);
4724 if (ret < 0)
4725 goto err_out_id;
4726 rbd_dev->major = ret;
4727
4728 /* Set up the blkdev mapping. */
4729
4730 ret = rbd_init_disk(rbd_dev);
4731 if (ret)
4732 goto err_out_blkdev;
4733
4734 ret = rbd_bus_add_dev(rbd_dev);
4735 if (ret)
4736 goto err_out_disk;
4737
83a06263
AE
4738 /* Everything's ready. Announce the disk to the world. */
4739
b5156e76 4740 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
129b79d4 4741 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4742 add_disk(rbd_dev->disk);
4743
4744 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4745 (unsigned long long) rbd_dev->mapping.size);
4746
4747 return ret;
2f82ee54 4748
83a06263
AE
4749err_out_disk:
4750 rbd_free_disk(rbd_dev);
4751err_out_blkdev:
4752 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4753err_out_id:
4754 rbd_dev_id_put(rbd_dev);
d1cf5788 4755 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4756
4757 return ret;
4758}
4759
332bb12d
AE
4760static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4761{
4762 struct rbd_spec *spec = rbd_dev->spec;
4763 size_t size;
4764
4765 /* Record the header object name for this rbd image. */
4766
4767 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4768
4769 if (rbd_dev->image_format == 1)
4770 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4771 else
4772 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4773
4774 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4775 if (!rbd_dev->header_name)
4776 return -ENOMEM;
4777
4778 if (rbd_dev->image_format == 1)
4779 sprintf(rbd_dev->header_name, "%s%s",
4780 spec->image_name, RBD_SUFFIX);
4781 else
4782 sprintf(rbd_dev->header_name, "%s%s",
4783 RBD_HEADER_PREFIX, spec->image_id);
4784 return 0;
4785}
4786
200a6a8b
AE
4787static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4788{
6fd48b3b
AE
4789 int ret;
4790
4791 rbd_remove_all_snaps(rbd_dev);
4792 rbd_dev_unprobe(rbd_dev);
4793 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4794 if (ret)
4795 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
200a6a8b 4796 kfree(rbd_dev->header_name);
6fd48b3b
AE
4797 rbd_dev->header_name = NULL;
4798 rbd_dev->image_format = 0;
4799 kfree(rbd_dev->spec->image_id);
4800 rbd_dev->spec->image_id = NULL;
4801
200a6a8b
AE
4802 rbd_dev_destroy(rbd_dev);
4803}
4804
a30b71b9
AE
4805/*
4806 * Probe for the existence of the header object for the given rbd
4807 * device. For format 2 images this includes determining the image
4808 * id.
4809 */
71f293e2 4810static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
a30b71b9
AE
4811{
4812 int ret;
b644de2b 4813 int tmp;
a30b71b9
AE
4814
4815 /*
4816 * Get the id from the image id object. If it's not a
4817 * format 2 image, we'll get ENOENT back, and we'll assume
4818 * it's a format 1 image.
4819 */
4820 ret = rbd_dev_image_id(rbd_dev);
4821 if (ret)
c0fba368
AE
4822 return ret;
4823 rbd_assert(rbd_dev->spec->image_id);
4824 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4825
332bb12d
AE
4826 ret = rbd_dev_header_name(rbd_dev);
4827 if (ret)
4828 goto err_out_format;
4829
b644de2b
AE
4830 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4831 if (ret)
4832 goto out_header_name;
4833
c0fba368 4834 if (rbd_dev->image_format == 1)
a30b71b9
AE
4835 ret = rbd_dev_v1_probe(rbd_dev);
4836 else
4837 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9 4838 if (ret)
b644de2b 4839 goto err_out_watch;
83a06263 4840
9bb81c9b
AE
4841 ret = rbd_dev_snaps_update(rbd_dev);
4842 if (ret)
6fd48b3b 4843 goto err_out_probe;
9bb81c9b
AE
4844
4845 ret = rbd_dev_spec_update(rbd_dev);
4846 if (ret)
4847 goto err_out_snaps;
4848
4849 ret = rbd_dev_probe_parent(rbd_dev);
6fd48b3b
AE
4850 if (!ret)
4851 return 0;
83a06263 4852
9bb81c9b
AE
4853err_out_snaps:
4854 rbd_remove_all_snaps(rbd_dev);
6fd48b3b
AE
4855err_out_probe:
4856 rbd_dev_unprobe(rbd_dev);
b644de2b
AE
4857err_out_watch:
4858 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4859 if (tmp)
4860 rbd_warn(rbd_dev, "unable to tear down watch request\n");
332bb12d
AE
4861out_header_name:
4862 kfree(rbd_dev->header_name);
4863 rbd_dev->header_name = NULL;
4864err_out_format:
4865 rbd_dev->image_format = 0;
5655c4d9
AE
4866 kfree(rbd_dev->spec->image_id);
4867 rbd_dev->spec->image_id = NULL;
4868
4869 dout("probe failed, returning %d\n", ret);
4870
a30b71b9
AE
4871 return ret;
4872}
4873
59c2be1e
YS
4874static ssize_t rbd_add(struct bus_type *bus,
4875 const char *buf,
4876 size_t count)
602adf40 4877{
cb8627c7 4878 struct rbd_device *rbd_dev = NULL;
dc79b113 4879 struct ceph_options *ceph_opts = NULL;
4e9afeba 4880 struct rbd_options *rbd_opts = NULL;
859c31df 4881 struct rbd_spec *spec = NULL;
9d3997fd 4882 struct rbd_client *rbdc;
27cc2594
AE
4883 struct ceph_osd_client *osdc;
4884 int rc = -ENOMEM;
602adf40
YS
4885
4886 if (!try_module_get(THIS_MODULE))
4887 return -ENODEV;
4888
602adf40 4889 /* parse add command */
859c31df 4890 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4891 if (rc < 0)
bd4ba655 4892 goto err_out_module;
78cea76e 4893
9d3997fd
AE
4894 rbdc = rbd_get_client(ceph_opts);
4895 if (IS_ERR(rbdc)) {
4896 rc = PTR_ERR(rbdc);
0ddebc0c 4897 goto err_out_args;
9d3997fd 4898 }
c53d5893 4899 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4900
602adf40 4901 /* pick the pool */
9d3997fd 4902 osdc = &rbdc->client->osdc;
859c31df 4903 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4904 if (rc < 0)
4905 goto err_out_client;
c0cd10db 4906 spec->pool_id = (u64)rc;
859c31df 4907
0903e875
AE
4908 /* The ceph file layout needs to fit pool id in 32 bits */
4909
c0cd10db
AE
4910 if (spec->pool_id > (u64)U32_MAX) {
4911 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4912 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4913 rc = -EIO;
4914 goto err_out_client;
4915 }
4916
c53d5893 4917 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4918 if (!rbd_dev)
4919 goto err_out_client;
c53d5893
AE
4920 rbdc = NULL; /* rbd_dev now owns this */
4921 spec = NULL; /* rbd_dev now owns this */
602adf40 4922
bd4ba655 4923 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4924 kfree(rbd_opts);
4925 rbd_opts = NULL; /* done with this */
bd4ba655 4926
71f293e2 4927 rc = rbd_dev_image_probe(rbd_dev);
a30b71b9 4928 if (rc < 0)
c53d5893 4929 goto err_out_rbd_dev;
05fd6f6f 4930
b536f69a
AE
4931 rc = rbd_dev_device_setup(rbd_dev);
4932 if (!rc)
4933 return count;
4934
4935 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4936err_out_rbd_dev:
4937 rbd_dev_destroy(rbd_dev);
bd4ba655 4938err_out_client:
9d3997fd 4939 rbd_put_client(rbdc);
0ddebc0c 4940err_out_args:
78cea76e
AE
4941 if (ceph_opts)
4942 ceph_destroy_options(ceph_opts);
4e9afeba 4943 kfree(rbd_opts);
859c31df 4944 rbd_spec_put(spec);
bd4ba655
AE
4945err_out_module:
4946 module_put(THIS_MODULE);
27cc2594 4947
602adf40 4948 dout("Error adding device %s\n", buf);
27cc2594 4949
c0cd10db 4950 return (ssize_t)rc;
602adf40
YS
4951}
4952
de71a297 4953static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4954{
4955 struct list_head *tmp;
4956 struct rbd_device *rbd_dev;
4957
e124a82f 4958 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4959 list_for_each(tmp, &rbd_dev_list) {
4960 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4961 if (rbd_dev->dev_id == dev_id) {
e124a82f 4962 spin_unlock(&rbd_dev_list_lock);
602adf40 4963 return rbd_dev;
e124a82f 4964 }
602adf40 4965 }
e124a82f 4966 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4967 return NULL;
4968}
4969
200a6a8b 4970static void rbd_dev_device_release(struct device *dev)
602adf40 4971{
593a9e7b 4972 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4973
602adf40 4974 rbd_free_disk(rbd_dev);
200a6a8b
AE
4975 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4976 rbd_dev_clear_mapping(rbd_dev);
602adf40 4977 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4978 rbd_dev->major = 0;
e2839308 4979 rbd_dev_id_put(rbd_dev);
d1cf5788 4980 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4981}
4982
05a46afd
AE
4983static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4984{
ad945fc1 4985 while (rbd_dev->parent) {
05a46afd
AE
4986 struct rbd_device *first = rbd_dev;
4987 struct rbd_device *second = first->parent;
4988 struct rbd_device *third;
4989
4990 /*
4991 * Follow to the parent with no grandparent and
4992 * remove it.
4993 */
4994 while (second && (third = second->parent)) {
4995 first = second;
4996 second = third;
4997 }
ad945fc1 4998 rbd_assert(second);
8ad42cd0 4999 rbd_dev_image_release(second);
ad945fc1
AE
5000 first->parent = NULL;
5001 first->parent_overlap = 0;
5002
5003 rbd_assert(first->parent_spec);
05a46afd
AE
5004 rbd_spec_put(first->parent_spec);
5005 first->parent_spec = NULL;
05a46afd
AE
5006 }
5007}
5008
dfc5606d
YS
5009static ssize_t rbd_remove(struct bus_type *bus,
5010 const char *buf,
5011 size_t count)
602adf40
YS
5012{
5013 struct rbd_device *rbd_dev = NULL;
0d8189e1 5014 int target_id;
602adf40 5015 unsigned long ul;
0d8189e1 5016 int ret;
602adf40 5017
0d8189e1
AE
5018 ret = strict_strtoul(buf, 10, &ul);
5019 if (ret)
5020 return ret;
602adf40
YS
5021
5022 /* convert to int; abort if we lost anything in the conversion */
5023 target_id = (int) ul;
5024 if (target_id != ul)
5025 return -EINVAL;
5026
5027 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5028
5029 rbd_dev = __rbd_get_dev(target_id);
5030 if (!rbd_dev) {
5031 ret = -ENOENT;
5032 goto done;
42382b70
AE
5033 }
5034
a14ea269 5035 spin_lock_irq(&rbd_dev->lock);
b82d167b 5036 if (rbd_dev->open_count)
42382b70 5037 ret = -EBUSY;
b82d167b
AE
5038 else
5039 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 5040 spin_unlock_irq(&rbd_dev->lock);
b82d167b 5041 if (ret < 0)
42382b70 5042 goto done;
0d8189e1 5043 ret = count;
b480815a 5044 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5045 rbd_dev_image_release(rbd_dev);
79ab7558 5046 module_put(THIS_MODULE);
602adf40
YS
5047done:
5048 mutex_unlock(&ctl_mutex);
aafb230e 5049
602adf40
YS
5050 return ret;
5051}
5052
602adf40
YS
5053/*
5054 * create control files in sysfs
dfc5606d 5055 * /sys/bus/rbd/...
602adf40
YS
5056 */
5057static int rbd_sysfs_init(void)
5058{
dfc5606d 5059 int ret;
602adf40 5060
fed4c143 5061 ret = device_register(&rbd_root_dev);
21079786 5062 if (ret < 0)
dfc5606d 5063 return ret;
602adf40 5064
fed4c143
AE
5065 ret = bus_register(&rbd_bus_type);
5066 if (ret < 0)
5067 device_unregister(&rbd_root_dev);
602adf40 5068
602adf40
YS
5069 return ret;
5070}
5071
5072static void rbd_sysfs_cleanup(void)
5073{
dfc5606d 5074 bus_unregister(&rbd_bus_type);
fed4c143 5075 device_unregister(&rbd_root_dev);
602adf40
YS
5076}
5077
cc344fa1 5078static int __init rbd_init(void)
602adf40
YS
5079{
5080 int rc;
5081
1e32d34c
AE
5082 if (!libceph_compatible(NULL)) {
5083 rbd_warn(NULL, "libceph incompatibility (quitting)");
5084
5085 return -EINVAL;
5086 }
602adf40
YS
5087 rc = rbd_sysfs_init();
5088 if (rc)
5089 return rc;
f0f8cef5 5090 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
5091 return 0;
5092}
5093
cc344fa1 5094static void __exit rbd_exit(void)
602adf40
YS
5095{
5096 rbd_sysfs_cleanup();
5097}
5098
5099module_init(rbd_init);
5100module_exit(rbd_exit);
5101
5102MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5103MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5104MODULE_DESCRIPTION("rados block device");
5105
5106/* following authorship retained from original osdblk.c */
5107MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5108
5109MODULE_LICENSE("GPL");