rbd: retry watch re-registration periodically
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
7ad18afa 41#include <linux/blk-mq.h>
602adf40
YS
42#include <linux/fs.h>
43#include <linux/blkdev.h>
1c2a9dfe 44#include <linux/slab.h>
f8a22fc2 45#include <linux/idr.h>
bc1ecc65 46#include <linux/workqueue.h>
602adf40
YS
47
48#include "rbd_types.h"
49
aafb230e
AE
50#define RBD_DEBUG /* Activate rbd_assert() calls */
51
593a9e7b
AE
52/*
53 * The basic unit of block I/O is a sector. It is interpreted in a
54 * number of contexts in Linux (blk, bio, genhd), but the default is
55 * universally 512 bytes. These symbols are just slightly more
56 * meaningful than the bare numbers they represent.
57 */
58#define SECTOR_SHIFT 9
59#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
60
a2acd00e
AE
61/*
62 * Increment the given counter and return its updated value.
63 * If the counter is already 0 it will not be incremented.
64 * If the counter is already at its maximum value returns
65 * -EINVAL without updating it.
66 */
67static int atomic_inc_return_safe(atomic_t *v)
68{
69 unsigned int counter;
70
71 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
72 if (counter <= (unsigned int)INT_MAX)
73 return (int)counter;
74
75 atomic_dec(v);
76
77 return -EINVAL;
78}
79
80/* Decrement the counter. Return the resulting value, or -EINVAL */
81static int atomic_dec_return_safe(atomic_t *v)
82{
83 int counter;
84
85 counter = atomic_dec_return(v);
86 if (counter >= 0)
87 return counter;
88
89 atomic_inc(v);
90
91 return -EINVAL;
92}
93
f0f8cef5 94#define RBD_DRV_NAME "rbd"
602adf40 95
7e513d43
ID
96#define RBD_MINORS_PER_MAJOR 256
97#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 98
6d69bb53
ID
99#define RBD_MAX_PARENT_CHAIN_LEN 16
100
d4b125e9
AE
101#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
102#define RBD_MAX_SNAP_NAME_LEN \
103 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
104
35d489f9 105#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
106
107#define RBD_SNAP_HEAD_NAME "-"
108
9682fc6d
AE
109#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
110
9e15b77d
AE
111/* This allows a single page to hold an image name sent by OSD */
112#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 113#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 114
1e130199 115#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 116
99d16943
ID
117#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
118
d889140c
AE
119/* Feature bits */
120
5cbf6f12
AE
121#define RBD_FEATURE_LAYERING (1<<0)
122#define RBD_FEATURE_STRIPINGV2 (1<<1)
123#define RBD_FEATURES_ALL \
124 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
125
126/* Features supported by this (client software) implementation. */
127
770eba6e 128#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 129
81a89793
AE
130/*
131 * An RBD device name will be "rbd#", where the "rbd" comes from
132 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 133 */
602adf40
YS
134#define DEV_NAME_LEN 32
135
136/*
137 * block device image metadata (in-memory version)
138 */
139struct rbd_image_header {
f35a4dee 140 /* These six fields never change for a given rbd image */
849b4260 141 char *object_prefix;
602adf40
YS
142 __u8 obj_order;
143 __u8 crypt_type;
144 __u8 comp_type;
f35a4dee
AE
145 u64 stripe_unit;
146 u64 stripe_count;
147 u64 features; /* Might be changeable someday? */
602adf40 148
f84344f3
AE
149 /* The remaining fields need to be updated occasionally */
150 u64 image_size;
151 struct ceph_snap_context *snapc;
f35a4dee
AE
152 char *snap_names; /* format 1 only */
153 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
154};
155
0d7dbfce
AE
156/*
157 * An rbd image specification.
158 *
159 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
160 * identify an image. Each rbd_dev structure includes a pointer to
161 * an rbd_spec structure that encapsulates this identity.
162 *
163 * Each of the id's in an rbd_spec has an associated name. For a
164 * user-mapped image, the names are supplied and the id's associated
165 * with them are looked up. For a layered image, a parent image is
166 * defined by the tuple, and the names are looked up.
167 *
168 * An rbd_dev structure contains a parent_spec pointer which is
169 * non-null if the image it represents is a child in a layered
170 * image. This pointer will refer to the rbd_spec structure used
171 * by the parent rbd_dev for its own identity (i.e., the structure
172 * is shared between the parent and child).
173 *
174 * Since these structures are populated once, during the discovery
175 * phase of image construction, they are effectively immutable so
176 * we make no effort to synchronize access to them.
177 *
178 * Note that code herein does not assume the image name is known (it
179 * could be a null pointer).
0d7dbfce
AE
180 */
181struct rbd_spec {
182 u64 pool_id;
ecb4dc22 183 const char *pool_name;
0d7dbfce 184
ecb4dc22
AE
185 const char *image_id;
186 const char *image_name;
0d7dbfce
AE
187
188 u64 snap_id;
ecb4dc22 189 const char *snap_name;
0d7dbfce
AE
190
191 struct kref kref;
192};
193
602adf40 194/*
f0f8cef5 195 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
196 */
197struct rbd_client {
198 struct ceph_client *client;
199 struct kref kref;
200 struct list_head node;
201};
202
bf0d5f50
AE
203struct rbd_img_request;
204typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
205
206#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
207
208struct rbd_obj_request;
209typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
210
9969ebc5
AE
211enum obj_request_type {
212 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
213};
bf0d5f50 214
6d2940c8
GZ
215enum obj_operation_type {
216 OBJ_OP_WRITE,
217 OBJ_OP_READ,
90e98c52 218 OBJ_OP_DISCARD,
6d2940c8
GZ
219};
220
926f9b3f
AE
221enum obj_req_flags {
222 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 223 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
224 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
225 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
226};
227
bf0d5f50
AE
228struct rbd_obj_request {
229 const char *object_name;
230 u64 offset; /* object start byte */
231 u64 length; /* bytes from offset */
926f9b3f 232 unsigned long flags;
bf0d5f50 233
c5b5ef6c
AE
234 /*
235 * An object request associated with an image will have its
236 * img_data flag set; a standalone object request will not.
237 *
238 * A standalone object request will have which == BAD_WHICH
239 * and a null obj_request pointer.
240 *
241 * An object request initiated in support of a layered image
242 * object (to check for its existence before a write) will
243 * have which == BAD_WHICH and a non-null obj_request pointer.
244 *
245 * Finally, an object request for rbd image data will have
246 * which != BAD_WHICH, and will have a non-null img_request
247 * pointer. The value of which will be in the range
248 * 0..(img_request->obj_request_count-1).
249 */
250 union {
251 struct rbd_obj_request *obj_request; /* STAT op */
252 struct {
253 struct rbd_img_request *img_request;
254 u64 img_offset;
255 /* links for img_request->obj_requests list */
256 struct list_head links;
257 };
258 };
bf0d5f50
AE
259 u32 which; /* posn image request list */
260
261 enum obj_request_type type;
788e2df3
AE
262 union {
263 struct bio *bio_list;
264 struct {
265 struct page **pages;
266 u32 page_count;
267 };
268 };
0eefd470 269 struct page **copyup_pages;
ebda6408 270 u32 copyup_page_count;
bf0d5f50
AE
271
272 struct ceph_osd_request *osd_req;
273
274 u64 xferred; /* bytes transferred */
1b83bef2 275 int result;
bf0d5f50
AE
276
277 rbd_obj_callback_t callback;
788e2df3 278 struct completion completion;
bf0d5f50
AE
279
280 struct kref kref;
281};
282
0c425248 283enum img_req_flags {
9849e986
AE
284 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
285 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 286 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 287 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
288};
289
bf0d5f50 290struct rbd_img_request {
bf0d5f50
AE
291 struct rbd_device *rbd_dev;
292 u64 offset; /* starting image byte offset */
293 u64 length; /* byte count from offset */
0c425248 294 unsigned long flags;
bf0d5f50 295 union {
9849e986 296 u64 snap_id; /* for reads */
bf0d5f50 297 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
298 };
299 union {
300 struct request *rq; /* block request */
301 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 302 };
3d7efd18 303 struct page **copyup_pages;
ebda6408 304 u32 copyup_page_count;
bf0d5f50
AE
305 spinlock_t completion_lock;/* protects next_completion */
306 u32 next_completion;
307 rbd_img_callback_t callback;
55f27e09 308 u64 xferred;/* aggregate bytes transferred */
a5a337d4 309 int result; /* first nonzero obj_request result */
bf0d5f50
AE
310
311 u32 obj_request_count;
312 struct list_head obj_requests; /* rbd_obj_request structs */
313
314 struct kref kref;
315};
316
317#define for_each_obj_request(ireq, oreq) \
ef06f4d3 318 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 319#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 320 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 321#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 322 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 323
99d16943
ID
324enum rbd_watch_state {
325 RBD_WATCH_STATE_UNREGISTERED,
326 RBD_WATCH_STATE_REGISTERED,
327 RBD_WATCH_STATE_ERROR,
328};
329
f84344f3 330struct rbd_mapping {
99c1f08f 331 u64 size;
34b13184 332 u64 features;
f84344f3
AE
333 bool read_only;
334};
335
602adf40
YS
336/*
337 * a single device
338 */
339struct rbd_device {
de71a297 340 int dev_id; /* blkdev unique id */
602adf40
YS
341
342 int major; /* blkdev assigned major */
dd82fff1 343 int minor;
602adf40 344 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 345
a30b71b9 346 u32 image_format; /* Either 1 or 2 */
602adf40
YS
347 struct rbd_client *rbd_client;
348
349 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
350
b82d167b 351 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
352
353 struct rbd_image_header header;
b82d167b 354 unsigned long flags; /* possibly lock protected */
0d7dbfce 355 struct rbd_spec *spec;
d147543d 356 struct rbd_options *opts;
602adf40 357
c41d13a3 358 struct ceph_object_id header_oid;
922dab61 359 struct ceph_object_locator header_oloc;
971f839a 360
1643dfa4 361 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 362
99d16943
ID
363 struct mutex watch_mutex;
364 enum rbd_watch_state watch_state;
922dab61 365 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
366 u64 watch_cookie;
367 struct delayed_work watch_dwork;
59c2be1e 368
1643dfa4
ID
369 struct workqueue_struct *task_wq;
370
86b00e0d
AE
371 struct rbd_spec *parent_spec;
372 u64 parent_overlap;
a2acd00e 373 atomic_t parent_ref;
2f82ee54 374 struct rbd_device *parent;
86b00e0d 375
7ad18afa
CH
376 /* Block layer tags. */
377 struct blk_mq_tag_set tag_set;
378
c666601a
JD
379 /* protects updating the header */
380 struct rw_semaphore header_rwsem;
f84344f3
AE
381
382 struct rbd_mapping mapping;
602adf40
YS
383
384 struct list_head node;
dfc5606d 385
dfc5606d
YS
386 /* sysfs related */
387 struct device dev;
b82d167b 388 unsigned long open_count; /* protected by lock */
dfc5606d
YS
389};
390
b82d167b
AE
391/*
392 * Flag bits for rbd_dev->flags. If atomicity is required,
393 * rbd_dev->lock is used to protect access.
394 *
395 * Currently, only the "removing" flag (which is coupled with the
396 * "open_count" field) requires atomic access.
397 */
6d292906
AE
398enum rbd_dev_flags {
399 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 400 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
401};
402
cfbf6377 403static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 404
602adf40 405static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
406static DEFINE_SPINLOCK(rbd_dev_list_lock);
407
432b8587
AE
408static LIST_HEAD(rbd_client_list); /* clients */
409static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 410
78c2a44a
AE
411/* Slab caches for frequently-allocated structures */
412
1c2a9dfe 413static struct kmem_cache *rbd_img_request_cache;
868311b1 414static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 415static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 416
9b60e70b 417static int rbd_major;
f8a22fc2
ID
418static DEFINE_IDA(rbd_dev_id_ida);
419
f5ee37bd
ID
420static struct workqueue_struct *rbd_wq;
421
9b60e70b
ID
422/*
423 * Default to false for now, as single-major requires >= 0.75 version of
424 * userspace rbd utility.
425 */
426static bool single_major = false;
427module_param(single_major, bool, S_IRUGO);
428MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
429
3d7efd18
AE
430static int rbd_img_request_submit(struct rbd_img_request *img_request);
431
f0f8cef5
AE
432static ssize_t rbd_add(struct bus_type *bus, const char *buf,
433 size_t count);
434static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
435 size_t count);
9b60e70b
ID
436static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
437 size_t count);
438static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
439 size_t count);
6d69bb53 440static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 441static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 442
9b60e70b
ID
443static int rbd_dev_id_to_minor(int dev_id)
444{
7e513d43 445 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
446}
447
448static int minor_to_rbd_dev_id(int minor)
449{
7e513d43 450 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
451}
452
b15a21dd
GKH
453static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
454static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
455static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
456static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
b15a21dd
GKH
457
458static struct attribute *rbd_bus_attrs[] = {
459 &bus_attr_add.attr,
460 &bus_attr_remove.attr,
9b60e70b
ID
461 &bus_attr_add_single_major.attr,
462 &bus_attr_remove_single_major.attr,
b15a21dd 463 NULL,
f0f8cef5 464};
92c76dc0
ID
465
466static umode_t rbd_bus_is_visible(struct kobject *kobj,
467 struct attribute *attr, int index)
468{
9b60e70b
ID
469 if (!single_major &&
470 (attr == &bus_attr_add_single_major.attr ||
471 attr == &bus_attr_remove_single_major.attr))
472 return 0;
473
92c76dc0
ID
474 return attr->mode;
475}
476
477static const struct attribute_group rbd_bus_group = {
478 .attrs = rbd_bus_attrs,
479 .is_visible = rbd_bus_is_visible,
480};
481__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
482
483static struct bus_type rbd_bus_type = {
484 .name = "rbd",
b15a21dd 485 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
486};
487
488static void rbd_root_dev_release(struct device *dev)
489{
490}
491
492static struct device rbd_root_dev = {
493 .init_name = "rbd",
494 .release = rbd_root_dev_release,
495};
496
06ecc6cb
AE
497static __printf(2, 3)
498void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
499{
500 struct va_format vaf;
501 va_list args;
502
503 va_start(args, fmt);
504 vaf.fmt = fmt;
505 vaf.va = &args;
506
507 if (!rbd_dev)
508 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
509 else if (rbd_dev->disk)
510 printk(KERN_WARNING "%s: %s: %pV\n",
511 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
512 else if (rbd_dev->spec && rbd_dev->spec->image_name)
513 printk(KERN_WARNING "%s: image %s: %pV\n",
514 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
515 else if (rbd_dev->spec && rbd_dev->spec->image_id)
516 printk(KERN_WARNING "%s: id %s: %pV\n",
517 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
518 else /* punt */
519 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
520 RBD_DRV_NAME, rbd_dev, &vaf);
521 va_end(args);
522}
523
aafb230e
AE
524#ifdef RBD_DEBUG
525#define rbd_assert(expr) \
526 if (unlikely(!(expr))) { \
527 printk(KERN_ERR "\nAssertion failure in %s() " \
528 "at line %d:\n\n" \
529 "\trbd_assert(%s);\n\n", \
530 __func__, __LINE__, #expr); \
531 BUG(); \
532 }
533#else /* !RBD_DEBUG */
534# define rbd_assert(expr) ((void) 0)
535#endif /* !RBD_DEBUG */
dfc5606d 536
2761713d 537static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 538static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
539static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
540static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 541
cc4a38bd 542static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 543static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 544static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 545static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
546static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
547 u64 snap_id);
2ad3d716
AE
548static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
549 u8 *order, u64 *snap_size);
550static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
551 u64 *snap_features);
59c2be1e 552
602adf40
YS
553static int rbd_open(struct block_device *bdev, fmode_t mode)
554{
f0f8cef5 555 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 556 bool removing = false;
602adf40 557
f84344f3 558 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
559 return -EROFS;
560
a14ea269 561 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
562 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
563 removing = true;
564 else
565 rbd_dev->open_count++;
a14ea269 566 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
567 if (removing)
568 return -ENOENT;
569
c3e946ce 570 (void) get_device(&rbd_dev->dev);
340c7a2b 571
602adf40
YS
572 return 0;
573}
574
db2a144b 575static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
576{
577 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
578 unsigned long open_count_before;
579
a14ea269 580 spin_lock_irq(&rbd_dev->lock);
b82d167b 581 open_count_before = rbd_dev->open_count--;
a14ea269 582 spin_unlock_irq(&rbd_dev->lock);
b82d167b 583 rbd_assert(open_count_before > 0);
dfc5606d 584
c3e946ce 585 put_device(&rbd_dev->dev);
dfc5606d
YS
586}
587
131fd9f6
GZ
588static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
589{
77f33c03 590 int ret = 0;
131fd9f6
GZ
591 int val;
592 bool ro;
77f33c03 593 bool ro_changed = false;
131fd9f6 594
77f33c03 595 /* get_user() may sleep, so call it before taking rbd_dev->lock */
131fd9f6
GZ
596 if (get_user(val, (int __user *)(arg)))
597 return -EFAULT;
598
599 ro = val ? true : false;
600 /* Snapshot doesn't allow to write*/
601 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
602 return -EROFS;
603
77f33c03
JD
604 spin_lock_irq(&rbd_dev->lock);
605 /* prevent others open this device */
606 if (rbd_dev->open_count > 1) {
607 ret = -EBUSY;
608 goto out;
609 }
610
131fd9f6
GZ
611 if (rbd_dev->mapping.read_only != ro) {
612 rbd_dev->mapping.read_only = ro;
77f33c03 613 ro_changed = true;
131fd9f6
GZ
614 }
615
77f33c03
JD
616out:
617 spin_unlock_irq(&rbd_dev->lock);
618 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
619 if (ret == 0 && ro_changed)
620 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
621
622 return ret;
131fd9f6
GZ
623}
624
625static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
626 unsigned int cmd, unsigned long arg)
627{
628 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
629 int ret = 0;
630
131fd9f6
GZ
631 switch (cmd) {
632 case BLKROSET:
633 ret = rbd_ioctl_set_ro(rbd_dev, arg);
634 break;
635 default:
636 ret = -ENOTTY;
637 }
638
131fd9f6
GZ
639 return ret;
640}
641
642#ifdef CONFIG_COMPAT
643static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
644 unsigned int cmd, unsigned long arg)
645{
646 return rbd_ioctl(bdev, mode, cmd, arg);
647}
648#endif /* CONFIG_COMPAT */
649
602adf40
YS
650static const struct block_device_operations rbd_bd_ops = {
651 .owner = THIS_MODULE,
652 .open = rbd_open,
dfc5606d 653 .release = rbd_release,
131fd9f6
GZ
654 .ioctl = rbd_ioctl,
655#ifdef CONFIG_COMPAT
656 .compat_ioctl = rbd_compat_ioctl,
657#endif
602adf40
YS
658};
659
660/*
7262cfca 661 * Initialize an rbd client instance. Success or not, this function
cfbf6377 662 * consumes ceph_opts. Caller holds client_mutex.
602adf40 663 */
f8c38929 664static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
665{
666 struct rbd_client *rbdc;
667 int ret = -ENOMEM;
668
37206ee5 669 dout("%s:\n", __func__);
602adf40
YS
670 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
671 if (!rbdc)
672 goto out_opt;
673
674 kref_init(&rbdc->kref);
675 INIT_LIST_HEAD(&rbdc->node);
676
43ae4701 677 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 678 if (IS_ERR(rbdc->client))
08f75463 679 goto out_rbdc;
43ae4701 680 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
681
682 ret = ceph_open_session(rbdc->client);
683 if (ret < 0)
08f75463 684 goto out_client;
602adf40 685
432b8587 686 spin_lock(&rbd_client_list_lock);
602adf40 687 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 688 spin_unlock(&rbd_client_list_lock);
602adf40 689
37206ee5 690 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 691
602adf40 692 return rbdc;
08f75463 693out_client:
602adf40 694 ceph_destroy_client(rbdc->client);
08f75463 695out_rbdc:
602adf40
YS
696 kfree(rbdc);
697out_opt:
43ae4701
AE
698 if (ceph_opts)
699 ceph_destroy_options(ceph_opts);
37206ee5
AE
700 dout("%s: error %d\n", __func__, ret);
701
28f259b7 702 return ERR_PTR(ret);
602adf40
YS
703}
704
2f82ee54
AE
705static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
706{
707 kref_get(&rbdc->kref);
708
709 return rbdc;
710}
711
602adf40 712/*
1f7ba331
AE
713 * Find a ceph client with specific addr and configuration. If
714 * found, bump its reference count.
602adf40 715 */
1f7ba331 716static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
717{
718 struct rbd_client *client_node;
1f7ba331 719 bool found = false;
602adf40 720
43ae4701 721 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
722 return NULL;
723
1f7ba331
AE
724 spin_lock(&rbd_client_list_lock);
725 list_for_each_entry(client_node, &rbd_client_list, node) {
726 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
727 __rbd_get_client(client_node);
728
1f7ba331
AE
729 found = true;
730 break;
731 }
732 }
733 spin_unlock(&rbd_client_list_lock);
734
735 return found ? client_node : NULL;
602adf40
YS
736}
737
59c2be1e 738/*
210c104c 739 * (Per device) rbd map options
59c2be1e
YS
740 */
741enum {
b5584180 742 Opt_queue_depth,
59c2be1e
YS
743 Opt_last_int,
744 /* int args above */
745 Opt_last_string,
746 /* string args above */
cc0538b6
AE
747 Opt_read_only,
748 Opt_read_write,
210c104c 749 Opt_err
59c2be1e
YS
750};
751
43ae4701 752static match_table_t rbd_opts_tokens = {
b5584180 753 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
754 /* int args above */
755 /* string args above */
be466c1c 756 {Opt_read_only, "read_only"},
cc0538b6
AE
757 {Opt_read_only, "ro"}, /* Alternate spelling */
758 {Opt_read_write, "read_write"},
759 {Opt_read_write, "rw"}, /* Alternate spelling */
210c104c 760 {Opt_err, NULL}
59c2be1e
YS
761};
762
98571b5a 763struct rbd_options {
b5584180 764 int queue_depth;
98571b5a
AE
765 bool read_only;
766};
767
b5584180 768#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a
AE
769#define RBD_READ_ONLY_DEFAULT false
770
59c2be1e
YS
771static int parse_rbd_opts_token(char *c, void *private)
772{
43ae4701 773 struct rbd_options *rbd_opts = private;
59c2be1e
YS
774 substring_t argstr[MAX_OPT_ARGS];
775 int token, intval, ret;
776
43ae4701 777 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
778 if (token < Opt_last_int) {
779 ret = match_int(&argstr[0], &intval);
780 if (ret < 0) {
210c104c 781 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
782 return ret;
783 }
784 dout("got int token %d val %d\n", token, intval);
785 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 786 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
787 } else {
788 dout("got token %d\n", token);
789 }
790
791 switch (token) {
b5584180
ID
792 case Opt_queue_depth:
793 if (intval < 1) {
794 pr_err("queue_depth out of range\n");
795 return -EINVAL;
796 }
797 rbd_opts->queue_depth = intval;
798 break;
cc0538b6
AE
799 case Opt_read_only:
800 rbd_opts->read_only = true;
801 break;
802 case Opt_read_write:
803 rbd_opts->read_only = false;
804 break;
59c2be1e 805 default:
210c104c
ID
806 /* libceph prints "bad option" msg */
807 return -EINVAL;
59c2be1e 808 }
210c104c 809
59c2be1e
YS
810 return 0;
811}
812
6d2940c8
GZ
813static char* obj_op_name(enum obj_operation_type op_type)
814{
815 switch (op_type) {
816 case OBJ_OP_READ:
817 return "read";
818 case OBJ_OP_WRITE:
819 return "write";
90e98c52
GZ
820 case OBJ_OP_DISCARD:
821 return "discard";
6d2940c8
GZ
822 default:
823 return "???";
824 }
825}
826
602adf40
YS
827/*
828 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
829 * not exist create it. Either way, ceph_opts is consumed by this
830 * function.
602adf40 831 */
9d3997fd 832static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 833{
f8c38929 834 struct rbd_client *rbdc;
59c2be1e 835
cfbf6377 836 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 837 rbdc = rbd_client_find(ceph_opts);
9d3997fd 838 if (rbdc) /* using an existing client */
43ae4701 839 ceph_destroy_options(ceph_opts);
9d3997fd 840 else
f8c38929 841 rbdc = rbd_client_create(ceph_opts);
cfbf6377 842 mutex_unlock(&client_mutex);
602adf40 843
9d3997fd 844 return rbdc;
602adf40
YS
845}
846
847/*
848 * Destroy ceph client
d23a4b3f 849 *
432b8587 850 * Caller must hold rbd_client_list_lock.
602adf40
YS
851 */
852static void rbd_client_release(struct kref *kref)
853{
854 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
855
37206ee5 856 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 857 spin_lock(&rbd_client_list_lock);
602adf40 858 list_del(&rbdc->node);
cd9d9f5d 859 spin_unlock(&rbd_client_list_lock);
602adf40
YS
860
861 ceph_destroy_client(rbdc->client);
862 kfree(rbdc);
863}
864
865/*
866 * Drop reference to ceph client node. If it's not referenced anymore, release
867 * it.
868 */
9d3997fd 869static void rbd_put_client(struct rbd_client *rbdc)
602adf40 870{
c53d5893
AE
871 if (rbdc)
872 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
873}
874
a30b71b9
AE
875static bool rbd_image_format_valid(u32 image_format)
876{
877 return image_format == 1 || image_format == 2;
878}
879
8e94af8e
AE
880static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
881{
103a150f
AE
882 size_t size;
883 u32 snap_count;
884
885 /* The header has to start with the magic rbd header text */
886 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
887 return false;
888
db2388b6
AE
889 /* The bio layer requires at least sector-sized I/O */
890
891 if (ondisk->options.order < SECTOR_SHIFT)
892 return false;
893
894 /* If we use u64 in a few spots we may be able to loosen this */
895
896 if (ondisk->options.order > 8 * sizeof (int) - 1)
897 return false;
898
103a150f
AE
899 /*
900 * The size of a snapshot header has to fit in a size_t, and
901 * that limits the number of snapshots.
902 */
903 snap_count = le32_to_cpu(ondisk->snap_count);
904 size = SIZE_MAX - sizeof (struct ceph_snap_context);
905 if (snap_count > size / sizeof (__le64))
906 return false;
907
908 /*
909 * Not only that, but the size of the entire the snapshot
910 * header must also be representable in a size_t.
911 */
912 size -= snap_count * sizeof (__le64);
913 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
914 return false;
915
916 return true;
8e94af8e
AE
917}
918
602adf40 919/*
bb23e37a
AE
920 * Fill an rbd image header with information from the given format 1
921 * on-disk header.
602adf40 922 */
662518b1 923static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 924 struct rbd_image_header_ondisk *ondisk)
602adf40 925{
662518b1 926 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
927 bool first_time = header->object_prefix == NULL;
928 struct ceph_snap_context *snapc;
929 char *object_prefix = NULL;
930 char *snap_names = NULL;
931 u64 *snap_sizes = NULL;
ccece235 932 u32 snap_count;
d2bb24e5 933 size_t size;
bb23e37a 934 int ret = -ENOMEM;
621901d6 935 u32 i;
602adf40 936
bb23e37a 937 /* Allocate this now to avoid having to handle failure below */
6a52325f 938
bb23e37a
AE
939 if (first_time) {
940 size_t len;
103a150f 941
bb23e37a
AE
942 len = strnlen(ondisk->object_prefix,
943 sizeof (ondisk->object_prefix));
944 object_prefix = kmalloc(len + 1, GFP_KERNEL);
945 if (!object_prefix)
946 return -ENOMEM;
947 memcpy(object_prefix, ondisk->object_prefix, len);
948 object_prefix[len] = '\0';
949 }
00f1f36f 950
bb23e37a 951 /* Allocate the snapshot context and fill it in */
00f1f36f 952
bb23e37a
AE
953 snap_count = le32_to_cpu(ondisk->snap_count);
954 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
955 if (!snapc)
956 goto out_err;
957 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 958 if (snap_count) {
bb23e37a 959 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
960 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
961
bb23e37a 962 /* We'll keep a copy of the snapshot names... */
621901d6 963
bb23e37a
AE
964 if (snap_names_len > (u64)SIZE_MAX)
965 goto out_2big;
966 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
967 if (!snap_names)
6a52325f
AE
968 goto out_err;
969
bb23e37a 970 /* ...as well as the array of their sizes. */
621901d6 971
d2bb24e5 972 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
973 snap_sizes = kmalloc(size, GFP_KERNEL);
974 if (!snap_sizes)
6a52325f 975 goto out_err;
bb23e37a 976
f785cc1d 977 /*
bb23e37a
AE
978 * Copy the names, and fill in each snapshot's id
979 * and size.
980 *
99a41ebc 981 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 982 * ondisk buffer we're working with has
f785cc1d
AE
983 * snap_names_len bytes beyond the end of the
984 * snapshot id array, this memcpy() is safe.
985 */
bb23e37a
AE
986 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
987 snaps = ondisk->snaps;
988 for (i = 0; i < snap_count; i++) {
989 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
990 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
991 }
602adf40 992 }
6a52325f 993
bb23e37a 994 /* We won't fail any more, fill in the header */
621901d6 995
bb23e37a
AE
996 if (first_time) {
997 header->object_prefix = object_prefix;
998 header->obj_order = ondisk->options.order;
999 header->crypt_type = ondisk->options.crypt_type;
1000 header->comp_type = ondisk->options.comp_type;
1001 /* The rest aren't used for format 1 images */
1002 header->stripe_unit = 0;
1003 header->stripe_count = 0;
1004 header->features = 0;
602adf40 1005 } else {
662518b1
AE
1006 ceph_put_snap_context(header->snapc);
1007 kfree(header->snap_names);
1008 kfree(header->snap_sizes);
602adf40 1009 }
849b4260 1010
bb23e37a 1011 /* The remaining fields always get updated (when we refresh) */
621901d6 1012
f84344f3 1013 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1014 header->snapc = snapc;
1015 header->snap_names = snap_names;
1016 header->snap_sizes = snap_sizes;
468521c1 1017
602adf40 1018 return 0;
bb23e37a
AE
1019out_2big:
1020 ret = -EIO;
6a52325f 1021out_err:
bb23e37a
AE
1022 kfree(snap_sizes);
1023 kfree(snap_names);
1024 ceph_put_snap_context(snapc);
1025 kfree(object_prefix);
ccece235 1026
bb23e37a 1027 return ret;
602adf40
YS
1028}
1029
9682fc6d
AE
1030static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1031{
1032 const char *snap_name;
1033
1034 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1035
1036 /* Skip over names until we find the one we are looking for */
1037
1038 snap_name = rbd_dev->header.snap_names;
1039 while (which--)
1040 snap_name += strlen(snap_name) + 1;
1041
1042 return kstrdup(snap_name, GFP_KERNEL);
1043}
1044
30d1cff8
AE
1045/*
1046 * Snapshot id comparison function for use with qsort()/bsearch().
1047 * Note that result is for snapshots in *descending* order.
1048 */
1049static int snapid_compare_reverse(const void *s1, const void *s2)
1050{
1051 u64 snap_id1 = *(u64 *)s1;
1052 u64 snap_id2 = *(u64 *)s2;
1053
1054 if (snap_id1 < snap_id2)
1055 return 1;
1056 return snap_id1 == snap_id2 ? 0 : -1;
1057}
1058
1059/*
1060 * Search a snapshot context to see if the given snapshot id is
1061 * present.
1062 *
1063 * Returns the position of the snapshot id in the array if it's found,
1064 * or BAD_SNAP_INDEX otherwise.
1065 *
1066 * Note: The snapshot array is in kept sorted (by the osd) in
1067 * reverse order, highest snapshot id first.
1068 */
9682fc6d
AE
1069static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1070{
1071 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1072 u64 *found;
9682fc6d 1073
30d1cff8
AE
1074 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1075 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1076
30d1cff8 1077 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1078}
1079
2ad3d716
AE
1080static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1081 u64 snap_id)
9e15b77d 1082{
54cac61f 1083 u32 which;
da6a6b63 1084 const char *snap_name;
9e15b77d 1085
54cac61f
AE
1086 which = rbd_dev_snap_index(rbd_dev, snap_id);
1087 if (which == BAD_SNAP_INDEX)
da6a6b63 1088 return ERR_PTR(-ENOENT);
54cac61f 1089
da6a6b63
JD
1090 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1091 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1092}
1093
1094static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1095{
9e15b77d
AE
1096 if (snap_id == CEPH_NOSNAP)
1097 return RBD_SNAP_HEAD_NAME;
1098
54cac61f
AE
1099 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1100 if (rbd_dev->image_format == 1)
1101 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1102
54cac61f 1103 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1104}
1105
2ad3d716
AE
1106static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1107 u64 *snap_size)
602adf40 1108{
2ad3d716
AE
1109 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1110 if (snap_id == CEPH_NOSNAP) {
1111 *snap_size = rbd_dev->header.image_size;
1112 } else if (rbd_dev->image_format == 1) {
1113 u32 which;
602adf40 1114
2ad3d716
AE
1115 which = rbd_dev_snap_index(rbd_dev, snap_id);
1116 if (which == BAD_SNAP_INDEX)
1117 return -ENOENT;
e86924a8 1118
2ad3d716
AE
1119 *snap_size = rbd_dev->header.snap_sizes[which];
1120 } else {
1121 u64 size = 0;
1122 int ret;
1123
1124 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1125 if (ret)
1126 return ret;
1127
1128 *snap_size = size;
1129 }
1130 return 0;
602adf40
YS
1131}
1132
2ad3d716
AE
1133static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1134 u64 *snap_features)
602adf40 1135{
2ad3d716
AE
1136 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1137 if (snap_id == CEPH_NOSNAP) {
1138 *snap_features = rbd_dev->header.features;
1139 } else if (rbd_dev->image_format == 1) {
1140 *snap_features = 0; /* No features for format 1 */
602adf40 1141 } else {
2ad3d716
AE
1142 u64 features = 0;
1143 int ret;
8b0241f8 1144
2ad3d716
AE
1145 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1146 if (ret)
1147 return ret;
1148
1149 *snap_features = features;
1150 }
1151 return 0;
1152}
1153
1154static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1155{
8f4b7d98 1156 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1157 u64 size = 0;
1158 u64 features = 0;
1159 int ret;
1160
2ad3d716
AE
1161 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1162 if (ret)
1163 return ret;
1164 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1165 if (ret)
1166 return ret;
1167
1168 rbd_dev->mapping.size = size;
1169 rbd_dev->mapping.features = features;
1170
8b0241f8 1171 return 0;
602adf40
YS
1172}
1173
d1cf5788
AE
1174static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1175{
1176 rbd_dev->mapping.size = 0;
1177 rbd_dev->mapping.features = 0;
200a6a8b
AE
1178}
1179
7d5079aa
HS
1180static void rbd_segment_name_free(const char *name)
1181{
1182 /* The explicit cast here is needed to drop the const qualifier */
1183
1184 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1185}
1186
98571b5a 1187static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1188{
65ccfe21
AE
1189 char *name;
1190 u64 segment;
1191 int ret;
3a96d5cd 1192 char *name_format;
602adf40 1193
78c2a44a 1194 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1195 if (!name)
1196 return NULL;
1197 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1198 name_format = "%s.%012llx";
1199 if (rbd_dev->image_format == 2)
1200 name_format = "%s.%016llx";
2d0ebc5d 1201 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
65ccfe21 1202 rbd_dev->header.object_prefix, segment);
2d0ebc5d 1203 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
65ccfe21
AE
1204 pr_err("error formatting segment name for #%llu (%d)\n",
1205 segment, ret);
7d5079aa 1206 rbd_segment_name_free(name);
65ccfe21
AE
1207 name = NULL;
1208 }
602adf40 1209
65ccfe21
AE
1210 return name;
1211}
602adf40 1212
65ccfe21
AE
1213static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1214{
1215 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1216
65ccfe21
AE
1217 return offset & (segment_size - 1);
1218}
1219
1220static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1221 u64 offset, u64 length)
1222{
1223 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1224
1225 offset &= segment_size - 1;
1226
aafb230e 1227 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1228 if (offset + length > segment_size)
1229 length = segment_size - offset;
1230
1231 return length;
602adf40
YS
1232}
1233
029bcbd8
JD
1234/*
1235 * returns the size of an object in the image
1236 */
1237static u64 rbd_obj_bytes(struct rbd_image_header *header)
1238{
1239 return 1 << header->obj_order;
1240}
1241
602adf40
YS
1242/*
1243 * bio helpers
1244 */
1245
1246static void bio_chain_put(struct bio *chain)
1247{
1248 struct bio *tmp;
1249
1250 while (chain) {
1251 tmp = chain;
1252 chain = chain->bi_next;
1253 bio_put(tmp);
1254 }
1255}
1256
1257/*
1258 * zeros a bio chain, starting at specific offset
1259 */
1260static void zero_bio_chain(struct bio *chain, int start_ofs)
1261{
7988613b
KO
1262 struct bio_vec bv;
1263 struct bvec_iter iter;
602adf40
YS
1264 unsigned long flags;
1265 void *buf;
602adf40
YS
1266 int pos = 0;
1267
1268 while (chain) {
7988613b
KO
1269 bio_for_each_segment(bv, chain, iter) {
1270 if (pos + bv.bv_len > start_ofs) {
602adf40 1271 int remainder = max(start_ofs - pos, 0);
7988613b 1272 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1273 memset(buf + remainder, 0,
7988613b
KO
1274 bv.bv_len - remainder);
1275 flush_dcache_page(bv.bv_page);
85b5aaa6 1276 bvec_kunmap_irq(buf, &flags);
602adf40 1277 }
7988613b 1278 pos += bv.bv_len;
602adf40
YS
1279 }
1280
1281 chain = chain->bi_next;
1282 }
1283}
1284
b9434c5b
AE
1285/*
1286 * similar to zero_bio_chain(), zeros data defined by a page array,
1287 * starting at the given byte offset from the start of the array and
1288 * continuing up to the given end offset. The pages array is
1289 * assumed to be big enough to hold all bytes up to the end.
1290 */
1291static void zero_pages(struct page **pages, u64 offset, u64 end)
1292{
1293 struct page **page = &pages[offset >> PAGE_SHIFT];
1294
1295 rbd_assert(end > offset);
1296 rbd_assert(end - offset <= (u64)SIZE_MAX);
1297 while (offset < end) {
1298 size_t page_offset;
1299 size_t length;
1300 unsigned long flags;
1301 void *kaddr;
1302
491205a8
GU
1303 page_offset = offset & ~PAGE_MASK;
1304 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1305 local_irq_save(flags);
1306 kaddr = kmap_atomic(*page);
1307 memset(kaddr + page_offset, 0, length);
e2156054 1308 flush_dcache_page(*page);
b9434c5b
AE
1309 kunmap_atomic(kaddr);
1310 local_irq_restore(flags);
1311
1312 offset += length;
1313 page++;
1314 }
1315}
1316
602adf40 1317/*
f7760dad
AE
1318 * Clone a portion of a bio, starting at the given byte offset
1319 * and continuing for the number of bytes indicated.
602adf40 1320 */
f7760dad
AE
1321static struct bio *bio_clone_range(struct bio *bio_src,
1322 unsigned int offset,
1323 unsigned int len,
1324 gfp_t gfpmask)
602adf40 1325{
f7760dad
AE
1326 struct bio *bio;
1327
5341a627 1328 bio = bio_clone(bio_src, gfpmask);
f7760dad
AE
1329 if (!bio)
1330 return NULL; /* ENOMEM */
602adf40 1331
5341a627 1332 bio_advance(bio, offset);
4f024f37 1333 bio->bi_iter.bi_size = len;
f7760dad
AE
1334
1335 return bio;
1336}
1337
1338/*
1339 * Clone a portion of a bio chain, starting at the given byte offset
1340 * into the first bio in the source chain and continuing for the
1341 * number of bytes indicated. The result is another bio chain of
1342 * exactly the given length, or a null pointer on error.
1343 *
1344 * The bio_src and offset parameters are both in-out. On entry they
1345 * refer to the first source bio and the offset into that bio where
1346 * the start of data to be cloned is located.
1347 *
1348 * On return, bio_src is updated to refer to the bio in the source
1349 * chain that contains first un-cloned byte, and *offset will
1350 * contain the offset of that byte within that bio.
1351 */
1352static struct bio *bio_chain_clone_range(struct bio **bio_src,
1353 unsigned int *offset,
1354 unsigned int len,
1355 gfp_t gfpmask)
1356{
1357 struct bio *bi = *bio_src;
1358 unsigned int off = *offset;
1359 struct bio *chain = NULL;
1360 struct bio **end;
1361
1362 /* Build up a chain of clone bios up to the limit */
1363
4f024f37 1364 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1365 return NULL; /* Nothing to clone */
602adf40 1366
f7760dad
AE
1367 end = &chain;
1368 while (len) {
1369 unsigned int bi_size;
1370 struct bio *bio;
1371
f5400b7a
AE
1372 if (!bi) {
1373 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1374 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1375 }
4f024f37 1376 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1377 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1378 if (!bio)
1379 goto out_err; /* ENOMEM */
1380
1381 *end = bio;
1382 end = &bio->bi_next;
602adf40 1383
f7760dad 1384 off += bi_size;
4f024f37 1385 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1386 bi = bi->bi_next;
1387 off = 0;
1388 }
1389 len -= bi_size;
1390 }
1391 *bio_src = bi;
1392 *offset = off;
1393
1394 return chain;
1395out_err:
1396 bio_chain_put(chain);
602adf40 1397
602adf40
YS
1398 return NULL;
1399}
1400
926f9b3f
AE
1401/*
1402 * The default/initial value for all object request flags is 0. For
1403 * each flag, once its value is set to 1 it is never reset to 0
1404 * again.
1405 */
57acbaa7 1406static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1407{
57acbaa7 1408 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1409 struct rbd_device *rbd_dev;
1410
57acbaa7 1411 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1412 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1413 obj_request);
1414 }
1415}
1416
57acbaa7 1417static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1418{
1419 smp_mb();
57acbaa7 1420 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1421}
1422
57acbaa7 1423static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1424{
57acbaa7
AE
1425 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1426 struct rbd_device *rbd_dev = NULL;
6365d33a 1427
57acbaa7
AE
1428 if (obj_request_img_data_test(obj_request))
1429 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1430 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1431 obj_request);
1432 }
1433}
1434
57acbaa7 1435static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1436{
1437 smp_mb();
57acbaa7 1438 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1439}
1440
5679c59f
AE
1441/*
1442 * This sets the KNOWN flag after (possibly) setting the EXISTS
1443 * flag. The latter is set based on the "exists" value provided.
1444 *
1445 * Note that for our purposes once an object exists it never goes
1446 * away again. It's possible that the response from two existence
1447 * checks are separated by the creation of the target object, and
1448 * the first ("doesn't exist") response arrives *after* the second
1449 * ("does exist"). In that case we ignore the second one.
1450 */
1451static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1452 bool exists)
1453{
1454 if (exists)
1455 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1456 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1457 smp_mb();
1458}
1459
1460static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1461{
1462 smp_mb();
1463 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1464}
1465
1466static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1467{
1468 smp_mb();
1469 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1470}
1471
9638556a
ID
1472static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1473{
1474 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1475
1476 return obj_request->img_offset <
1477 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1478}
1479
bf0d5f50
AE
1480static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1481{
37206ee5
AE
1482 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1483 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1484 kref_get(&obj_request->kref);
1485}
1486
1487static void rbd_obj_request_destroy(struct kref *kref);
1488static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1489{
1490 rbd_assert(obj_request != NULL);
37206ee5
AE
1491 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1492 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1493 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1494}
1495
0f2d5be7
AE
1496static void rbd_img_request_get(struct rbd_img_request *img_request)
1497{
1498 dout("%s: img %p (was %d)\n", __func__, img_request,
1499 atomic_read(&img_request->kref.refcount));
1500 kref_get(&img_request->kref);
1501}
1502
e93f3152
AE
1503static bool img_request_child_test(struct rbd_img_request *img_request);
1504static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1505static void rbd_img_request_destroy(struct kref *kref);
1506static void rbd_img_request_put(struct rbd_img_request *img_request)
1507{
1508 rbd_assert(img_request != NULL);
37206ee5
AE
1509 dout("%s: img %p (was %d)\n", __func__, img_request,
1510 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1511 if (img_request_child_test(img_request))
1512 kref_put(&img_request->kref, rbd_parent_request_destroy);
1513 else
1514 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1515}
1516
1517static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1518 struct rbd_obj_request *obj_request)
1519{
25dcf954
AE
1520 rbd_assert(obj_request->img_request == NULL);
1521
b155e86c 1522 /* Image request now owns object's original reference */
bf0d5f50 1523 obj_request->img_request = img_request;
25dcf954 1524 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1525 rbd_assert(!obj_request_img_data_test(obj_request));
1526 obj_request_img_data_set(obj_request);
bf0d5f50 1527 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1528 img_request->obj_request_count++;
1529 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1530 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1531 obj_request->which);
bf0d5f50
AE
1532}
1533
1534static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1535 struct rbd_obj_request *obj_request)
1536{
1537 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1538
37206ee5
AE
1539 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1540 obj_request->which);
bf0d5f50 1541 list_del(&obj_request->links);
25dcf954
AE
1542 rbd_assert(img_request->obj_request_count > 0);
1543 img_request->obj_request_count--;
1544 rbd_assert(obj_request->which == img_request->obj_request_count);
1545 obj_request->which = BAD_WHICH;
6365d33a 1546 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1547 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1548 obj_request->img_request = NULL;
25dcf954 1549 obj_request->callback = NULL;
bf0d5f50
AE
1550 rbd_obj_request_put(obj_request);
1551}
1552
1553static bool obj_request_type_valid(enum obj_request_type type)
1554{
1555 switch (type) {
9969ebc5 1556 case OBJ_REQUEST_NODATA:
bf0d5f50 1557 case OBJ_REQUEST_BIO:
788e2df3 1558 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1559 return true;
1560 default:
1561 return false;
1562 }
1563}
1564
bf0d5f50
AE
1565static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1566 struct rbd_obj_request *obj_request)
1567{
71c20a06 1568 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
1569 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1570}
1571
71c20a06
ID
1572static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1573{
1574 dout("%s %p\n", __func__, obj_request);
1575 ceph_osdc_cancel_request(obj_request->osd_req);
1576}
1577
1578/*
1579 * Wait for an object request to complete. If interrupted, cancel the
1580 * underlying osd request.
2894e1d7
ID
1581 *
1582 * @timeout: in jiffies, 0 means "wait forever"
71c20a06 1583 */
2894e1d7
ID
1584static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1585 unsigned long timeout)
71c20a06 1586{
2894e1d7 1587 long ret;
71c20a06
ID
1588
1589 dout("%s %p\n", __func__, obj_request);
2894e1d7
ID
1590 ret = wait_for_completion_interruptible_timeout(
1591 &obj_request->completion,
1592 ceph_timeout_jiffies(timeout));
1593 if (ret <= 0) {
1594 if (ret == 0)
1595 ret = -ETIMEDOUT;
71c20a06 1596 rbd_obj_request_end(obj_request);
2894e1d7
ID
1597 } else {
1598 ret = 0;
71c20a06
ID
1599 }
1600
2894e1d7
ID
1601 dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1602 return ret;
1603}
1604
1605static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1606{
1607 return __rbd_obj_request_wait(obj_request, 0);
1608}
1609
bf0d5f50
AE
1610static void rbd_img_request_complete(struct rbd_img_request *img_request)
1611{
55f27e09 1612
37206ee5 1613 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1614
1615 /*
1616 * If no error occurred, compute the aggregate transfer
1617 * count for the image request. We could instead use
1618 * atomic64_cmpxchg() to update it as each object request
1619 * completes; not clear which way is better off hand.
1620 */
1621 if (!img_request->result) {
1622 struct rbd_obj_request *obj_request;
1623 u64 xferred = 0;
1624
1625 for_each_obj_request(img_request, obj_request)
1626 xferred += obj_request->xferred;
1627 img_request->xferred = xferred;
1628 }
1629
bf0d5f50
AE
1630 if (img_request->callback)
1631 img_request->callback(img_request);
1632 else
1633 rbd_img_request_put(img_request);
1634}
1635
0c425248
AE
1636/*
1637 * The default/initial value for all image request flags is 0. Each
1638 * is conditionally set to 1 at image request initialization time
1639 * and currently never change thereafter.
1640 */
1641static void img_request_write_set(struct rbd_img_request *img_request)
1642{
1643 set_bit(IMG_REQ_WRITE, &img_request->flags);
1644 smp_mb();
1645}
1646
1647static bool img_request_write_test(struct rbd_img_request *img_request)
1648{
1649 smp_mb();
1650 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1651}
1652
90e98c52
GZ
1653/*
1654 * Set the discard flag when the img_request is an discard request
1655 */
1656static void img_request_discard_set(struct rbd_img_request *img_request)
1657{
1658 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1659 smp_mb();
1660}
1661
1662static bool img_request_discard_test(struct rbd_img_request *img_request)
1663{
1664 smp_mb();
1665 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1666}
1667
9849e986
AE
1668static void img_request_child_set(struct rbd_img_request *img_request)
1669{
1670 set_bit(IMG_REQ_CHILD, &img_request->flags);
1671 smp_mb();
1672}
1673
e93f3152
AE
1674static void img_request_child_clear(struct rbd_img_request *img_request)
1675{
1676 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1677 smp_mb();
1678}
1679
9849e986
AE
1680static bool img_request_child_test(struct rbd_img_request *img_request)
1681{
1682 smp_mb();
1683 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1684}
1685
d0b2e944
AE
1686static void img_request_layered_set(struct rbd_img_request *img_request)
1687{
1688 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1689 smp_mb();
1690}
1691
a2acd00e
AE
1692static void img_request_layered_clear(struct rbd_img_request *img_request)
1693{
1694 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1695 smp_mb();
1696}
1697
d0b2e944
AE
1698static bool img_request_layered_test(struct rbd_img_request *img_request)
1699{
1700 smp_mb();
1701 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1702}
1703
3b434a2a
JD
1704static enum obj_operation_type
1705rbd_img_request_op_type(struct rbd_img_request *img_request)
1706{
1707 if (img_request_write_test(img_request))
1708 return OBJ_OP_WRITE;
1709 else if (img_request_discard_test(img_request))
1710 return OBJ_OP_DISCARD;
1711 else
1712 return OBJ_OP_READ;
1713}
1714
6e2a4505
AE
1715static void
1716rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1717{
b9434c5b
AE
1718 u64 xferred = obj_request->xferred;
1719 u64 length = obj_request->length;
1720
6e2a4505
AE
1721 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1722 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1723 xferred, length);
6e2a4505 1724 /*
17c1cc1d
JD
1725 * ENOENT means a hole in the image. We zero-fill the entire
1726 * length of the request. A short read also implies zero-fill
1727 * to the end of the request. An error requires the whole
1728 * length of the request to be reported finished with an error
1729 * to the block layer. In each case we update the xferred
1730 * count to indicate the whole request was satisfied.
6e2a4505 1731 */
b9434c5b 1732 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1733 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1734 if (obj_request->type == OBJ_REQUEST_BIO)
1735 zero_bio_chain(obj_request->bio_list, 0);
1736 else
1737 zero_pages(obj_request->pages, 0, length);
6e2a4505 1738 obj_request->result = 0;
b9434c5b
AE
1739 } else if (xferred < length && !obj_request->result) {
1740 if (obj_request->type == OBJ_REQUEST_BIO)
1741 zero_bio_chain(obj_request->bio_list, xferred);
1742 else
1743 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1744 }
17c1cc1d 1745 obj_request->xferred = length;
6e2a4505
AE
1746 obj_request_done_set(obj_request);
1747}
1748
bf0d5f50
AE
1749static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1750{
37206ee5
AE
1751 dout("%s: obj %p cb %p\n", __func__, obj_request,
1752 obj_request->callback);
bf0d5f50
AE
1753 if (obj_request->callback)
1754 obj_request->callback(obj_request);
788e2df3
AE
1755 else
1756 complete_all(&obj_request->completion);
bf0d5f50
AE
1757}
1758
c47f9371 1759static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1760{
57acbaa7 1761 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1762 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1763 bool layered = false;
1764
1765 if (obj_request_img_data_test(obj_request)) {
1766 img_request = obj_request->img_request;
1767 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1768 rbd_dev = img_request->rbd_dev;
57acbaa7 1769 }
8b3e1a56
AE
1770
1771 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1772 obj_request, img_request, obj_request->result,
1773 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1774 if (layered && obj_request->result == -ENOENT &&
1775 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1776 rbd_img_parent_read(obj_request);
1777 else if (img_request)
6e2a4505
AE
1778 rbd_img_obj_request_read_callback(obj_request);
1779 else
1780 obj_request_done_set(obj_request);
bf0d5f50
AE
1781}
1782
c47f9371 1783static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1784{
1b83bef2
SW
1785 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1786 obj_request->result, obj_request->length);
1787 /*
8b3e1a56
AE
1788 * There is no such thing as a successful short write. Set
1789 * it to our originally-requested length.
1b83bef2
SW
1790 */
1791 obj_request->xferred = obj_request->length;
07741308 1792 obj_request_done_set(obj_request);
bf0d5f50
AE
1793}
1794
90e98c52
GZ
1795static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1796{
1797 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1798 obj_request->result, obj_request->length);
1799 /*
1800 * There is no such thing as a successful short discard. Set
1801 * it to our originally-requested length.
1802 */
1803 obj_request->xferred = obj_request->length;
d0265de7
JD
1804 /* discarding a non-existent object is not a problem */
1805 if (obj_request->result == -ENOENT)
1806 obj_request->result = 0;
90e98c52
GZ
1807 obj_request_done_set(obj_request);
1808}
1809
fbfab539
AE
1810/*
1811 * For a simple stat call there's nothing to do. We'll do more if
1812 * this is part of a write sequence for a layered image.
1813 */
c47f9371 1814static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1815{
37206ee5 1816 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1817 obj_request_done_set(obj_request);
1818}
1819
2761713d
ID
1820static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1821{
1822 dout("%s: obj %p\n", __func__, obj_request);
1823
1824 if (obj_request_img_data_test(obj_request))
1825 rbd_osd_copyup_callback(obj_request);
1826 else
1827 obj_request_done_set(obj_request);
1828}
1829
85e084fe 1830static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1831{
1832 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1833 u16 opcode;
1834
85e084fe 1835 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1836 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1837 if (obj_request_img_data_test(obj_request)) {
1838 rbd_assert(obj_request->img_request);
1839 rbd_assert(obj_request->which != BAD_WHICH);
1840 } else {
1841 rbd_assert(obj_request->which == BAD_WHICH);
1842 }
bf0d5f50 1843
1b83bef2
SW
1844 if (osd_req->r_result < 0)
1845 obj_request->result = osd_req->r_result;
bf0d5f50 1846
c47f9371
AE
1847 /*
1848 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1849 * passed to the block layer, which just supports a 32-bit
1850 * length field.
c47f9371 1851 */
7665d85b 1852 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1853 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1854
79528734 1855 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1856 switch (opcode) {
1857 case CEPH_OSD_OP_READ:
c47f9371 1858 rbd_osd_read_callback(obj_request);
bf0d5f50 1859 break;
0ccd5926 1860 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1861 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1862 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1863 /* fall through */
bf0d5f50 1864 case CEPH_OSD_OP_WRITE:
e30b7577 1865 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1866 rbd_osd_write_callback(obj_request);
bf0d5f50 1867 break;
fbfab539 1868 case CEPH_OSD_OP_STAT:
c47f9371 1869 rbd_osd_stat_callback(obj_request);
fbfab539 1870 break;
90e98c52
GZ
1871 case CEPH_OSD_OP_DELETE:
1872 case CEPH_OSD_OP_TRUNCATE:
1873 case CEPH_OSD_OP_ZERO:
1874 rbd_osd_discard_callback(obj_request);
1875 break;
36be9a76 1876 case CEPH_OSD_OP_CALL:
2761713d
ID
1877 rbd_osd_call_callback(obj_request);
1878 break;
bf0d5f50 1879 default:
9584d508 1880 rbd_warn(NULL, "%s: unsupported op %hu",
bf0d5f50
AE
1881 obj_request->object_name, (unsigned short) opcode);
1882 break;
1883 }
1884
07741308 1885 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1886 rbd_obj_request_complete(obj_request);
1887}
1888
9d4df01f 1889static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1890{
1891 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1892 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1893
bb873b53
ID
1894 if (img_request)
1895 osd_req->r_snapid = img_request->snap_id;
9d4df01f
AE
1896}
1897
1898static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1899{
9d4df01f 1900 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1901
bb873b53
ID
1902 osd_req->r_mtime = CURRENT_TIME;
1903 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1904}
1905
0ccd5926
ID
1906/*
1907 * Create an osd request. A read request has one osd op (read).
1908 * A write request has either one (watch) or two (hint+write) osd ops.
1909 * (All rbd data writes are prefixed with an allocation hint op, but
1910 * technically osd watch is a write request, hence this distinction.)
1911 */
bf0d5f50
AE
1912static struct ceph_osd_request *rbd_osd_req_create(
1913 struct rbd_device *rbd_dev,
6d2940c8 1914 enum obj_operation_type op_type,
deb236b3 1915 unsigned int num_ops,
430c28c3 1916 struct rbd_obj_request *obj_request)
bf0d5f50 1917{
bf0d5f50
AE
1918 struct ceph_snap_context *snapc = NULL;
1919 struct ceph_osd_client *osdc;
1920 struct ceph_osd_request *osd_req;
bf0d5f50 1921
90e98c52
GZ
1922 if (obj_request_img_data_test(obj_request) &&
1923 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 1924 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
1925 if (op_type == OBJ_OP_WRITE) {
1926 rbd_assert(img_request_write_test(img_request));
1927 } else {
1928 rbd_assert(img_request_discard_test(img_request));
1929 }
6d2940c8 1930 snapc = img_request->snapc;
bf0d5f50
AE
1931 }
1932
6d2940c8 1933 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3
ID
1934
1935 /* Allocate and initialize the request, for the num_ops ops */
bf0d5f50
AE
1936
1937 osdc = &rbd_dev->rbd_client->client->osdc;
deb236b3 1938 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2224d879 1939 GFP_NOIO);
bf0d5f50 1940 if (!osd_req)
13d1ad16 1941 goto fail;
bf0d5f50 1942
90e98c52 1943 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
bf0d5f50 1944 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1945 else
bf0d5f50 1946 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1947
1948 osd_req->r_callback = rbd_osd_req_callback;
1949 osd_req->r_priv = obj_request;
1950
7627151e 1951 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
1952 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
1953 obj_request->object_name))
1954 goto fail;
bf0d5f50 1955
13d1ad16
ID
1956 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
1957 goto fail;
1958
bf0d5f50 1959 return osd_req;
13d1ad16
ID
1960
1961fail:
1962 ceph_osdc_put_request(osd_req);
1963 return NULL;
bf0d5f50
AE
1964}
1965
0eefd470 1966/*
d3246fb0
JD
1967 * Create a copyup osd request based on the information in the object
1968 * request supplied. A copyup request has two or three osd ops, a
1969 * copyup method call, potentially a hint op, and a write or truncate
1970 * or zero op.
0eefd470
AE
1971 */
1972static struct ceph_osd_request *
1973rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1974{
1975 struct rbd_img_request *img_request;
1976 struct ceph_snap_context *snapc;
1977 struct rbd_device *rbd_dev;
1978 struct ceph_osd_client *osdc;
1979 struct ceph_osd_request *osd_req;
d3246fb0 1980 int num_osd_ops = 3;
0eefd470
AE
1981
1982 rbd_assert(obj_request_img_data_test(obj_request));
1983 img_request = obj_request->img_request;
1984 rbd_assert(img_request);
d3246fb0
JD
1985 rbd_assert(img_request_write_test(img_request) ||
1986 img_request_discard_test(img_request));
0eefd470 1987
d3246fb0
JD
1988 if (img_request_discard_test(img_request))
1989 num_osd_ops = 2;
1990
1991 /* Allocate and initialize the request, for all the ops */
0eefd470
AE
1992
1993 snapc = img_request->snapc;
1994 rbd_dev = img_request->rbd_dev;
1995 osdc = &rbd_dev->rbd_client->client->osdc;
d3246fb0 1996 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2224d879 1997 false, GFP_NOIO);
0eefd470 1998 if (!osd_req)
13d1ad16 1999 goto fail;
0eefd470
AE
2000
2001 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2002 osd_req->r_callback = rbd_osd_req_callback;
2003 osd_req->r_priv = obj_request;
2004
7627151e 2005 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
2006 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2007 obj_request->object_name))
2008 goto fail;
0eefd470 2009
13d1ad16
ID
2010 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2011 goto fail;
2012
0eefd470 2013 return osd_req;
13d1ad16
ID
2014
2015fail:
2016 ceph_osdc_put_request(osd_req);
2017 return NULL;
0eefd470
AE
2018}
2019
2020
bf0d5f50
AE
2021static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2022{
2023 ceph_osdc_put_request(osd_req);
2024}
2025
2026/* object_name is assumed to be a non-null pointer and NUL-terminated */
2027
2028static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2029 u64 offset, u64 length,
2030 enum obj_request_type type)
2031{
2032 struct rbd_obj_request *obj_request;
2033 size_t size;
2034 char *name;
2035
2036 rbd_assert(obj_request_type_valid(type));
2037
2038 size = strlen(object_name) + 1;
5a60e876 2039 name = kmalloc(size, GFP_NOIO);
f907ad55 2040 if (!name)
bf0d5f50
AE
2041 return NULL;
2042
5a60e876 2043 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
f907ad55
AE
2044 if (!obj_request) {
2045 kfree(name);
2046 return NULL;
2047 }
2048
bf0d5f50
AE
2049 obj_request->object_name = memcpy(name, object_name, size);
2050 obj_request->offset = offset;
2051 obj_request->length = length;
926f9b3f 2052 obj_request->flags = 0;
bf0d5f50
AE
2053 obj_request->which = BAD_WHICH;
2054 obj_request->type = type;
2055 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2056 init_completion(&obj_request->completion);
bf0d5f50
AE
2057 kref_init(&obj_request->kref);
2058
37206ee5
AE
2059 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2060 offset, length, (int)type, obj_request);
2061
bf0d5f50
AE
2062 return obj_request;
2063}
2064
2065static void rbd_obj_request_destroy(struct kref *kref)
2066{
2067 struct rbd_obj_request *obj_request;
2068
2069 obj_request = container_of(kref, struct rbd_obj_request, kref);
2070
37206ee5
AE
2071 dout("%s: obj %p\n", __func__, obj_request);
2072
bf0d5f50
AE
2073 rbd_assert(obj_request->img_request == NULL);
2074 rbd_assert(obj_request->which == BAD_WHICH);
2075
2076 if (obj_request->osd_req)
2077 rbd_osd_req_destroy(obj_request->osd_req);
2078
2079 rbd_assert(obj_request_type_valid(obj_request->type));
2080 switch (obj_request->type) {
9969ebc5
AE
2081 case OBJ_REQUEST_NODATA:
2082 break; /* Nothing to do */
bf0d5f50
AE
2083 case OBJ_REQUEST_BIO:
2084 if (obj_request->bio_list)
2085 bio_chain_put(obj_request->bio_list);
2086 break;
788e2df3
AE
2087 case OBJ_REQUEST_PAGES:
2088 if (obj_request->pages)
2089 ceph_release_page_vector(obj_request->pages,
2090 obj_request->page_count);
2091 break;
bf0d5f50
AE
2092 }
2093
f907ad55 2094 kfree(obj_request->object_name);
868311b1
AE
2095 obj_request->object_name = NULL;
2096 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2097}
2098
fb65d228
AE
2099/* It's OK to call this for a device with no parent */
2100
2101static void rbd_spec_put(struct rbd_spec *spec);
2102static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2103{
2104 rbd_dev_remove_parent(rbd_dev);
2105 rbd_spec_put(rbd_dev->parent_spec);
2106 rbd_dev->parent_spec = NULL;
2107 rbd_dev->parent_overlap = 0;
2108}
2109
a2acd00e
AE
2110/*
2111 * Parent image reference counting is used to determine when an
2112 * image's parent fields can be safely torn down--after there are no
2113 * more in-flight requests to the parent image. When the last
2114 * reference is dropped, cleaning them up is safe.
2115 */
2116static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2117{
2118 int counter;
2119
2120 if (!rbd_dev->parent_spec)
2121 return;
2122
2123 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2124 if (counter > 0)
2125 return;
2126
2127 /* Last reference; clean up parent data structures */
2128
2129 if (!counter)
2130 rbd_dev_unparent(rbd_dev);
2131 else
9584d508 2132 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2133}
2134
2135/*
2136 * If an image has a non-zero parent overlap, get a reference to its
2137 * parent.
2138 *
2139 * Returns true if the rbd device has a parent with a non-zero
2140 * overlap and a reference for it was successfully taken, or
2141 * false otherwise.
2142 */
2143static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2144{
ae43e9d0 2145 int counter = 0;
a2acd00e
AE
2146
2147 if (!rbd_dev->parent_spec)
2148 return false;
2149
ae43e9d0
ID
2150 down_read(&rbd_dev->header_rwsem);
2151 if (rbd_dev->parent_overlap)
2152 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2153 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2154
2155 if (counter < 0)
9584d508 2156 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2157
ae43e9d0 2158 return counter > 0;
a2acd00e
AE
2159}
2160
bf0d5f50
AE
2161/*
2162 * Caller is responsible for filling in the list of object requests
2163 * that comprises the image request, and the Linux request pointer
2164 * (if there is one).
2165 */
cc344fa1
AE
2166static struct rbd_img_request *rbd_img_request_create(
2167 struct rbd_device *rbd_dev,
bf0d5f50 2168 u64 offset, u64 length,
6d2940c8 2169 enum obj_operation_type op_type,
4e752f0a 2170 struct ceph_snap_context *snapc)
bf0d5f50
AE
2171{
2172 struct rbd_img_request *img_request;
bf0d5f50 2173
7a716aac 2174 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2175 if (!img_request)
2176 return NULL;
2177
bf0d5f50
AE
2178 img_request->rq = NULL;
2179 img_request->rbd_dev = rbd_dev;
2180 img_request->offset = offset;
2181 img_request->length = length;
0c425248 2182 img_request->flags = 0;
90e98c52
GZ
2183 if (op_type == OBJ_OP_DISCARD) {
2184 img_request_discard_set(img_request);
2185 img_request->snapc = snapc;
2186 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2187 img_request_write_set(img_request);
4e752f0a 2188 img_request->snapc = snapc;
0c425248 2189 } else {
bf0d5f50 2190 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2191 }
a2acd00e 2192 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2193 img_request_layered_set(img_request);
bf0d5f50
AE
2194 spin_lock_init(&img_request->completion_lock);
2195 img_request->next_completion = 0;
2196 img_request->callback = NULL;
a5a337d4 2197 img_request->result = 0;
bf0d5f50
AE
2198 img_request->obj_request_count = 0;
2199 INIT_LIST_HEAD(&img_request->obj_requests);
2200 kref_init(&img_request->kref);
2201
37206ee5 2202 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2203 obj_op_name(op_type), offset, length, img_request);
37206ee5 2204
bf0d5f50
AE
2205 return img_request;
2206}
2207
2208static void rbd_img_request_destroy(struct kref *kref)
2209{
2210 struct rbd_img_request *img_request;
2211 struct rbd_obj_request *obj_request;
2212 struct rbd_obj_request *next_obj_request;
2213
2214 img_request = container_of(kref, struct rbd_img_request, kref);
2215
37206ee5
AE
2216 dout("%s: img %p\n", __func__, img_request);
2217
bf0d5f50
AE
2218 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2219 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2220 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2221
a2acd00e
AE
2222 if (img_request_layered_test(img_request)) {
2223 img_request_layered_clear(img_request);
2224 rbd_dev_parent_put(img_request->rbd_dev);
2225 }
2226
bef95455
JD
2227 if (img_request_write_test(img_request) ||
2228 img_request_discard_test(img_request))
812164f8 2229 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2230
1c2a9dfe 2231 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2232}
2233
e93f3152
AE
2234static struct rbd_img_request *rbd_parent_request_create(
2235 struct rbd_obj_request *obj_request,
2236 u64 img_offset, u64 length)
2237{
2238 struct rbd_img_request *parent_request;
2239 struct rbd_device *rbd_dev;
2240
2241 rbd_assert(obj_request->img_request);
2242 rbd_dev = obj_request->img_request->rbd_dev;
2243
4e752f0a 2244 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2245 length, OBJ_OP_READ, NULL);
e93f3152
AE
2246 if (!parent_request)
2247 return NULL;
2248
2249 img_request_child_set(parent_request);
2250 rbd_obj_request_get(obj_request);
2251 parent_request->obj_request = obj_request;
2252
2253 return parent_request;
2254}
2255
2256static void rbd_parent_request_destroy(struct kref *kref)
2257{
2258 struct rbd_img_request *parent_request;
2259 struct rbd_obj_request *orig_request;
2260
2261 parent_request = container_of(kref, struct rbd_img_request, kref);
2262 orig_request = parent_request->obj_request;
2263
2264 parent_request->obj_request = NULL;
2265 rbd_obj_request_put(orig_request);
2266 img_request_child_clear(parent_request);
2267
2268 rbd_img_request_destroy(kref);
2269}
2270
1217857f
AE
2271static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2272{
6365d33a 2273 struct rbd_img_request *img_request;
1217857f
AE
2274 unsigned int xferred;
2275 int result;
8b3e1a56 2276 bool more;
1217857f 2277
6365d33a
AE
2278 rbd_assert(obj_request_img_data_test(obj_request));
2279 img_request = obj_request->img_request;
2280
1217857f
AE
2281 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2282 xferred = (unsigned int)obj_request->xferred;
2283 result = obj_request->result;
2284 if (result) {
2285 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2286 enum obj_operation_type op_type;
2287
90e98c52
GZ
2288 if (img_request_discard_test(img_request))
2289 op_type = OBJ_OP_DISCARD;
2290 else if (img_request_write_test(img_request))
2291 op_type = OBJ_OP_WRITE;
2292 else
2293 op_type = OBJ_OP_READ;
1217857f 2294
9584d508 2295 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2296 obj_op_name(op_type), obj_request->length,
2297 obj_request->img_offset, obj_request->offset);
9584d508 2298 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2299 result, xferred);
2300 if (!img_request->result)
2301 img_request->result = result;
082a75da
ID
2302 /*
2303 * Need to end I/O on the entire obj_request worth of
2304 * bytes in case of error.
2305 */
2306 xferred = obj_request->length;
1217857f
AE
2307 }
2308
f1a4739f
AE
2309 /* Image object requests don't own their page array */
2310
2311 if (obj_request->type == OBJ_REQUEST_PAGES) {
2312 obj_request->pages = NULL;
2313 obj_request->page_count = 0;
2314 }
2315
8b3e1a56
AE
2316 if (img_request_child_test(img_request)) {
2317 rbd_assert(img_request->obj_request != NULL);
2318 more = obj_request->which < img_request->obj_request_count - 1;
2319 } else {
2320 rbd_assert(img_request->rq != NULL);
7ad18afa
CH
2321
2322 more = blk_update_request(img_request->rq, result, xferred);
2323 if (!more)
2324 __blk_mq_end_request(img_request->rq, result);
8b3e1a56
AE
2325 }
2326
2327 return more;
1217857f
AE
2328}
2329
2169238d
AE
2330static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2331{
2332 struct rbd_img_request *img_request;
2333 u32 which = obj_request->which;
2334 bool more = true;
2335
6365d33a 2336 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2337 img_request = obj_request->img_request;
2338
2339 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2340 rbd_assert(img_request != NULL);
2169238d
AE
2341 rbd_assert(img_request->obj_request_count > 0);
2342 rbd_assert(which != BAD_WHICH);
2343 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2344
2345 spin_lock_irq(&img_request->completion_lock);
2346 if (which != img_request->next_completion)
2347 goto out;
2348
2349 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2350 rbd_assert(more);
2351 rbd_assert(which < img_request->obj_request_count);
2352
2353 if (!obj_request_done_test(obj_request))
2354 break;
1217857f 2355 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2356 which++;
2357 }
2358
2359 rbd_assert(more ^ (which == img_request->obj_request_count));
2360 img_request->next_completion = which;
2361out:
2362 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2363 rbd_img_request_put(img_request);
2169238d
AE
2364
2365 if (!more)
2366 rbd_img_request_complete(img_request);
2367}
2368
3b434a2a
JD
2369/*
2370 * Add individual osd ops to the given ceph_osd_request and prepare
2371 * them for submission. num_ops is the current number of
2372 * osd operations already to the object request.
2373 */
2374static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2375 struct ceph_osd_request *osd_request,
2376 enum obj_operation_type op_type,
2377 unsigned int num_ops)
2378{
2379 struct rbd_img_request *img_request = obj_request->img_request;
2380 struct rbd_device *rbd_dev = img_request->rbd_dev;
2381 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2382 u64 offset = obj_request->offset;
2383 u64 length = obj_request->length;
2384 u64 img_end;
2385 u16 opcode;
2386
2387 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2388 if (!offset && length == object_size &&
2389 (!img_request_layered_test(img_request) ||
2390 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2391 opcode = CEPH_OSD_OP_DELETE;
2392 } else if ((offset + length == object_size)) {
2393 opcode = CEPH_OSD_OP_TRUNCATE;
2394 } else {
2395 down_read(&rbd_dev->header_rwsem);
2396 img_end = rbd_dev->header.image_size;
2397 up_read(&rbd_dev->header_rwsem);
2398
2399 if (obj_request->img_offset + length == img_end)
2400 opcode = CEPH_OSD_OP_TRUNCATE;
2401 else
2402 opcode = CEPH_OSD_OP_ZERO;
2403 }
2404 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2405 if (!offset && length == object_size)
2406 opcode = CEPH_OSD_OP_WRITEFULL;
2407 else
2408 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2409 osd_req_op_alloc_hint_init(osd_request, num_ops,
2410 object_size, object_size);
2411 num_ops++;
2412 } else {
2413 opcode = CEPH_OSD_OP_READ;
2414 }
2415
7e868b6e 2416 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2417 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2418 else
2419 osd_req_op_extent_init(osd_request, num_ops, opcode,
2420 offset, length, 0, 0);
2421
3b434a2a
JD
2422 if (obj_request->type == OBJ_REQUEST_BIO)
2423 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2424 obj_request->bio_list, length);
2425 else if (obj_request->type == OBJ_REQUEST_PAGES)
2426 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2427 obj_request->pages, length,
2428 offset & ~PAGE_MASK, false, false);
2429
2430 /* Discards are also writes */
2431 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2432 rbd_osd_req_format_write(obj_request);
2433 else
2434 rbd_osd_req_format_read(obj_request);
2435}
2436
f1a4739f
AE
2437/*
2438 * Split up an image request into one or more object requests, each
2439 * to a different object. The "type" parameter indicates whether
2440 * "data_desc" is the pointer to the head of a list of bio
2441 * structures, or the base of a page array. In either case this
2442 * function assumes data_desc describes memory sufficient to hold
2443 * all data described by the image request.
2444 */
2445static int rbd_img_request_fill(struct rbd_img_request *img_request,
2446 enum obj_request_type type,
2447 void *data_desc)
bf0d5f50
AE
2448{
2449 struct rbd_device *rbd_dev = img_request->rbd_dev;
2450 struct rbd_obj_request *obj_request = NULL;
2451 struct rbd_obj_request *next_obj_request;
a158073c 2452 struct bio *bio_list = NULL;
f1a4739f 2453 unsigned int bio_offset = 0;
a158073c 2454 struct page **pages = NULL;
6d2940c8 2455 enum obj_operation_type op_type;
7da22d29 2456 u64 img_offset;
bf0d5f50 2457 u64 resid;
bf0d5f50 2458
f1a4739f
AE
2459 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2460 (int)type, data_desc);
37206ee5 2461
7da22d29 2462 img_offset = img_request->offset;
bf0d5f50 2463 resid = img_request->length;
4dda41d3 2464 rbd_assert(resid > 0);
3b434a2a 2465 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2466
2467 if (type == OBJ_REQUEST_BIO) {
2468 bio_list = data_desc;
4f024f37
KO
2469 rbd_assert(img_offset ==
2470 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2471 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2472 pages = data_desc;
2473 }
2474
bf0d5f50 2475 while (resid) {
2fa12320 2476 struct ceph_osd_request *osd_req;
bf0d5f50 2477 const char *object_name;
bf0d5f50
AE
2478 u64 offset;
2479 u64 length;
2480
7da22d29 2481 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2482 if (!object_name)
2483 goto out_unwind;
7da22d29
AE
2484 offset = rbd_segment_offset(rbd_dev, img_offset);
2485 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2486 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2487 offset, length, type);
78c2a44a
AE
2488 /* object request has its own copy of the object name */
2489 rbd_segment_name_free(object_name);
bf0d5f50
AE
2490 if (!obj_request)
2491 goto out_unwind;
62054da6 2492
03507db6
JD
2493 /*
2494 * set obj_request->img_request before creating the
2495 * osd_request so that it gets the right snapc
2496 */
2497 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2498
f1a4739f
AE
2499 if (type == OBJ_REQUEST_BIO) {
2500 unsigned int clone_size;
2501
2502 rbd_assert(length <= (u64)UINT_MAX);
2503 clone_size = (unsigned int)length;
2504 obj_request->bio_list =
2505 bio_chain_clone_range(&bio_list,
2506 &bio_offset,
2507 clone_size,
2224d879 2508 GFP_NOIO);
f1a4739f 2509 if (!obj_request->bio_list)
62054da6 2510 goto out_unwind;
90e98c52 2511 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2512 unsigned int page_count;
2513
2514 obj_request->pages = pages;
2515 page_count = (u32)calc_pages_for(offset, length);
2516 obj_request->page_count = page_count;
2517 if ((offset + length) & ~PAGE_MASK)
2518 page_count--; /* more on last page */
2519 pages += page_count;
2520 }
bf0d5f50 2521
6d2940c8
GZ
2522 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2523 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2524 obj_request);
2fa12320 2525 if (!osd_req)
62054da6 2526 goto out_unwind;
3b434a2a 2527
2fa12320 2528 obj_request->osd_req = osd_req;
2169238d 2529 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2530 obj_request->img_offset = img_offset;
9d4df01f 2531
3b434a2a 2532 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2533
3b434a2a 2534 rbd_img_request_get(img_request);
bf0d5f50 2535
7da22d29 2536 img_offset += length;
bf0d5f50
AE
2537 resid -= length;
2538 }
2539
2540 return 0;
2541
bf0d5f50
AE
2542out_unwind:
2543 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2544 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2545
2546 return -ENOMEM;
2547}
2548
0eefd470 2549static void
2761713d 2550rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2551{
2552 struct rbd_img_request *img_request;
2553 struct rbd_device *rbd_dev;
ebda6408 2554 struct page **pages;
0eefd470
AE
2555 u32 page_count;
2556
2761713d
ID
2557 dout("%s: obj %p\n", __func__, obj_request);
2558
d3246fb0
JD
2559 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2560 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2561 rbd_assert(obj_request_img_data_test(obj_request));
2562 img_request = obj_request->img_request;
2563 rbd_assert(img_request);
2564
2565 rbd_dev = img_request->rbd_dev;
2566 rbd_assert(rbd_dev);
0eefd470 2567
ebda6408
AE
2568 pages = obj_request->copyup_pages;
2569 rbd_assert(pages != NULL);
0eefd470 2570 obj_request->copyup_pages = NULL;
ebda6408
AE
2571 page_count = obj_request->copyup_page_count;
2572 rbd_assert(page_count);
2573 obj_request->copyup_page_count = 0;
2574 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2575
2576 /*
2577 * We want the transfer count to reflect the size of the
2578 * original write request. There is no such thing as a
2579 * successful short write, so if the request was successful
2580 * we can just set it to the originally-requested length.
2581 */
2582 if (!obj_request->result)
2583 obj_request->xferred = obj_request->length;
2584
2761713d 2585 obj_request_done_set(obj_request);
0eefd470
AE
2586}
2587
3d7efd18
AE
2588static void
2589rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2590{
2591 struct rbd_obj_request *orig_request;
0eefd470
AE
2592 struct ceph_osd_request *osd_req;
2593 struct ceph_osd_client *osdc;
2594 struct rbd_device *rbd_dev;
3d7efd18 2595 struct page **pages;
d3246fb0 2596 enum obj_operation_type op_type;
ebda6408 2597 u32 page_count;
bbea1c1a 2598 int img_result;
ebda6408 2599 u64 parent_length;
3d7efd18
AE
2600
2601 rbd_assert(img_request_child_test(img_request));
2602
2603 /* First get what we need from the image request */
2604
2605 pages = img_request->copyup_pages;
2606 rbd_assert(pages != NULL);
2607 img_request->copyup_pages = NULL;
ebda6408
AE
2608 page_count = img_request->copyup_page_count;
2609 rbd_assert(page_count);
2610 img_request->copyup_page_count = 0;
3d7efd18
AE
2611
2612 orig_request = img_request->obj_request;
2613 rbd_assert(orig_request != NULL);
b91f09f1 2614 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2615 img_result = img_request->result;
ebda6408
AE
2616 parent_length = img_request->length;
2617 rbd_assert(parent_length == img_request->xferred);
91c6febb 2618 rbd_img_request_put(img_request);
3d7efd18 2619
91c6febb
AE
2620 rbd_assert(orig_request->img_request);
2621 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2622 rbd_assert(rbd_dev);
0eefd470 2623
bbea1c1a
AE
2624 /*
2625 * If the overlap has become 0 (most likely because the
2626 * image has been flattened) we need to free the pages
2627 * and re-submit the original write request.
2628 */
2629 if (!rbd_dev->parent_overlap) {
2630 struct ceph_osd_client *osdc;
3d7efd18 2631
bbea1c1a
AE
2632 ceph_release_page_vector(pages, page_count);
2633 osdc = &rbd_dev->rbd_client->client->osdc;
2634 img_result = rbd_obj_request_submit(osdc, orig_request);
2635 if (!img_result)
2636 return;
2637 }
0eefd470 2638
bbea1c1a 2639 if (img_result)
0eefd470 2640 goto out_err;
0eefd470 2641
8785b1d4
AE
2642 /*
2643 * The original osd request is of no use to use any more.
0ccd5926 2644 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2645 * request. Allocate the new copyup osd request for the
2646 * original request, and release the old one.
2647 */
bbea1c1a 2648 img_result = -ENOMEM;
0eefd470
AE
2649 osd_req = rbd_osd_req_create_copyup(orig_request);
2650 if (!osd_req)
2651 goto out_err;
8785b1d4 2652 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2653 orig_request->osd_req = osd_req;
2654 orig_request->copyup_pages = pages;
ebda6408 2655 orig_request->copyup_page_count = page_count;
3d7efd18 2656
0eefd470 2657 /* Initialize the copyup op */
3d7efd18 2658
0eefd470 2659 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2660 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2661 false, false);
3d7efd18 2662
d3246fb0 2663 /* Add the other op(s) */
0eefd470 2664
d3246fb0
JD
2665 op_type = rbd_img_request_op_type(orig_request->img_request);
2666 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2667
2668 /* All set, send it off. */
2669
0eefd470 2670 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2671 img_result = rbd_obj_request_submit(osdc, orig_request);
2672 if (!img_result)
0eefd470
AE
2673 return;
2674out_err:
2675 /* Record the error code and complete the request */
2676
bbea1c1a 2677 orig_request->result = img_result;
0eefd470
AE
2678 orig_request->xferred = 0;
2679 obj_request_done_set(orig_request);
2680 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2681}
2682
2683/*
2684 * Read from the parent image the range of data that covers the
2685 * entire target of the given object request. This is used for
2686 * satisfying a layered image write request when the target of an
2687 * object request from the image request does not exist.
2688 *
2689 * A page array big enough to hold the returned data is allocated
2690 * and supplied to rbd_img_request_fill() as the "data descriptor."
2691 * When the read completes, this page array will be transferred to
2692 * the original object request for the copyup operation.
2693 *
2694 * If an error occurs, record it as the result of the original
2695 * object request and mark it done so it gets completed.
2696 */
2697static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2698{
2699 struct rbd_img_request *img_request = NULL;
2700 struct rbd_img_request *parent_request = NULL;
2701 struct rbd_device *rbd_dev;
2702 u64 img_offset;
2703 u64 length;
2704 struct page **pages = NULL;
2705 u32 page_count;
2706 int result;
2707
2708 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2709 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2710
2711 img_request = obj_request->img_request;
2712 rbd_assert(img_request != NULL);
2713 rbd_dev = img_request->rbd_dev;
2714 rbd_assert(rbd_dev->parent != NULL);
2715
2716 /*
2717 * Determine the byte range covered by the object in the
2718 * child image to which the original request was to be sent.
2719 */
2720 img_offset = obj_request->img_offset - obj_request->offset;
2721 length = (u64)1 << rbd_dev->header.obj_order;
2722
a9e8ba2c
AE
2723 /*
2724 * There is no defined parent data beyond the parent
2725 * overlap, so limit what we read at that boundary if
2726 * necessary.
2727 */
2728 if (img_offset + length > rbd_dev->parent_overlap) {
2729 rbd_assert(img_offset < rbd_dev->parent_overlap);
2730 length = rbd_dev->parent_overlap - img_offset;
2731 }
2732
3d7efd18
AE
2733 /*
2734 * Allocate a page array big enough to receive the data read
2735 * from the parent.
2736 */
2737 page_count = (u32)calc_pages_for(0, length);
2738 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2739 if (IS_ERR(pages)) {
2740 result = PTR_ERR(pages);
2741 pages = NULL;
2742 goto out_err;
2743 }
2744
2745 result = -ENOMEM;
e93f3152
AE
2746 parent_request = rbd_parent_request_create(obj_request,
2747 img_offset, length);
3d7efd18
AE
2748 if (!parent_request)
2749 goto out_err;
3d7efd18
AE
2750
2751 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2752 if (result)
2753 goto out_err;
2754 parent_request->copyup_pages = pages;
ebda6408 2755 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2756
2757 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2758 result = rbd_img_request_submit(parent_request);
2759 if (!result)
2760 return 0;
2761
2762 parent_request->copyup_pages = NULL;
ebda6408 2763 parent_request->copyup_page_count = 0;
3d7efd18
AE
2764 parent_request->obj_request = NULL;
2765 rbd_obj_request_put(obj_request);
2766out_err:
2767 if (pages)
2768 ceph_release_page_vector(pages, page_count);
2769 if (parent_request)
2770 rbd_img_request_put(parent_request);
2771 obj_request->result = result;
2772 obj_request->xferred = 0;
2773 obj_request_done_set(obj_request);
2774
2775 return result;
2776}
2777
c5b5ef6c
AE
2778static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2779{
c5b5ef6c 2780 struct rbd_obj_request *orig_request;
638f5abe 2781 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2782 int result;
2783
2784 rbd_assert(!obj_request_img_data_test(obj_request));
2785
2786 /*
2787 * All we need from the object request is the original
2788 * request and the result of the STAT op. Grab those, then
2789 * we're done with the request.
2790 */
2791 orig_request = obj_request->obj_request;
2792 obj_request->obj_request = NULL;
912c317d 2793 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2794 rbd_assert(orig_request);
2795 rbd_assert(orig_request->img_request);
2796
2797 result = obj_request->result;
2798 obj_request->result = 0;
2799
2800 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2801 obj_request, orig_request, result,
2802 obj_request->xferred, obj_request->length);
2803 rbd_obj_request_put(obj_request);
2804
638f5abe
AE
2805 /*
2806 * If the overlap has become 0 (most likely because the
2807 * image has been flattened) we need to free the pages
2808 * and re-submit the original write request.
2809 */
2810 rbd_dev = orig_request->img_request->rbd_dev;
2811 if (!rbd_dev->parent_overlap) {
2812 struct ceph_osd_client *osdc;
2813
638f5abe
AE
2814 osdc = &rbd_dev->rbd_client->client->osdc;
2815 result = rbd_obj_request_submit(osdc, orig_request);
2816 if (!result)
2817 return;
2818 }
c5b5ef6c
AE
2819
2820 /*
2821 * Our only purpose here is to determine whether the object
2822 * exists, and we don't want to treat the non-existence as
2823 * an error. If something else comes back, transfer the
2824 * error to the original request and complete it now.
2825 */
2826 if (!result) {
2827 obj_request_existence_set(orig_request, true);
2828 } else if (result == -ENOENT) {
2829 obj_request_existence_set(orig_request, false);
2830 } else if (result) {
2831 orig_request->result = result;
3d7efd18 2832 goto out;
c5b5ef6c
AE
2833 }
2834
2835 /*
2836 * Resubmit the original request now that we have recorded
2837 * whether the target object exists.
2838 */
b454e36d 2839 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2840out:
c5b5ef6c
AE
2841 if (orig_request->result)
2842 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2843}
2844
2845static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2846{
2847 struct rbd_obj_request *stat_request;
2848 struct rbd_device *rbd_dev;
2849 struct ceph_osd_client *osdc;
2850 struct page **pages = NULL;
2851 u32 page_count;
2852 size_t size;
2853 int ret;
2854
2855 /*
2856 * The response data for a STAT call consists of:
2857 * le64 length;
2858 * struct {
2859 * le32 tv_sec;
2860 * le32 tv_nsec;
2861 * } mtime;
2862 */
2863 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2864 page_count = (u32)calc_pages_for(0, size);
2865 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2866 if (IS_ERR(pages))
2867 return PTR_ERR(pages);
2868
2869 ret = -ENOMEM;
2870 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2871 OBJ_REQUEST_PAGES);
2872 if (!stat_request)
2873 goto out;
2874
2875 rbd_obj_request_get(obj_request);
2876 stat_request->obj_request = obj_request;
2877 stat_request->pages = pages;
2878 stat_request->page_count = page_count;
2879
2880 rbd_assert(obj_request->img_request);
2881 rbd_dev = obj_request->img_request->rbd_dev;
6d2940c8 2882 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 2883 stat_request);
c5b5ef6c
AE
2884 if (!stat_request->osd_req)
2885 goto out;
2886 stat_request->callback = rbd_img_obj_exists_callback;
2887
144cba14 2888 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
c5b5ef6c
AE
2889 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2890 false, false);
9d4df01f 2891 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2892
2893 osdc = &rbd_dev->rbd_client->client->osdc;
2894 ret = rbd_obj_request_submit(osdc, stat_request);
2895out:
2896 if (ret)
2897 rbd_obj_request_put(obj_request);
2898
2899 return ret;
2900}
2901
70d045f6 2902static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d
AE
2903{
2904 struct rbd_img_request *img_request;
a9e8ba2c 2905 struct rbd_device *rbd_dev;
b454e36d
AE
2906
2907 rbd_assert(obj_request_img_data_test(obj_request));
2908
2909 img_request = obj_request->img_request;
2910 rbd_assert(img_request);
a9e8ba2c 2911 rbd_dev = img_request->rbd_dev;
b454e36d 2912
70d045f6 2913 /* Reads */
1c220881
JD
2914 if (!img_request_write_test(img_request) &&
2915 !img_request_discard_test(img_request))
70d045f6
ID
2916 return true;
2917
2918 /* Non-layered writes */
2919 if (!img_request_layered_test(img_request))
2920 return true;
2921
b454e36d 2922 /*
70d045f6
ID
2923 * Layered writes outside of the parent overlap range don't
2924 * share any data with the parent.
b454e36d 2925 */
70d045f6
ID
2926 if (!obj_request_overlaps_parent(obj_request))
2927 return true;
b454e36d 2928
c622d226
GZ
2929 /*
2930 * Entire-object layered writes - we will overwrite whatever
2931 * parent data there is anyway.
2932 */
2933 if (!obj_request->offset &&
2934 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2935 return true;
2936
70d045f6
ID
2937 /*
2938 * If the object is known to already exist, its parent data has
2939 * already been copied.
2940 */
2941 if (obj_request_known_test(obj_request) &&
2942 obj_request_exists_test(obj_request))
2943 return true;
2944
2945 return false;
2946}
2947
2948static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2949{
2950 if (img_obj_request_simple(obj_request)) {
b454e36d
AE
2951 struct rbd_device *rbd_dev;
2952 struct ceph_osd_client *osdc;
2953
2954 rbd_dev = obj_request->img_request->rbd_dev;
2955 osdc = &rbd_dev->rbd_client->client->osdc;
2956
2957 return rbd_obj_request_submit(osdc, obj_request);
2958 }
2959
2960 /*
3d7efd18
AE
2961 * It's a layered write. The target object might exist but
2962 * we may not know that yet. If we know it doesn't exist,
2963 * start by reading the data for the full target object from
2964 * the parent so we can use it for a copyup to the target.
b454e36d 2965 */
70d045f6 2966 if (obj_request_known_test(obj_request))
3d7efd18
AE
2967 return rbd_img_obj_parent_read_full(obj_request);
2968
2969 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2970
2971 return rbd_img_obj_exists_submit(obj_request);
2972}
2973
bf0d5f50
AE
2974static int rbd_img_request_submit(struct rbd_img_request *img_request)
2975{
bf0d5f50 2976 struct rbd_obj_request *obj_request;
46faeed4 2977 struct rbd_obj_request *next_obj_request;
663ae2cc 2978 int ret = 0;
bf0d5f50 2979
37206ee5 2980 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 2981
663ae2cc
ID
2982 rbd_img_request_get(img_request);
2983 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 2984 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 2985 if (ret)
663ae2cc 2986 goto out_put_ireq;
bf0d5f50
AE
2987 }
2988
663ae2cc
ID
2989out_put_ireq:
2990 rbd_img_request_put(img_request);
2991 return ret;
bf0d5f50 2992}
8b3e1a56
AE
2993
2994static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2995{
2996 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2997 struct rbd_device *rbd_dev;
2998 u64 obj_end;
02c74fba
AE
2999 u64 img_xferred;
3000 int img_result;
8b3e1a56
AE
3001
3002 rbd_assert(img_request_child_test(img_request));
3003
02c74fba
AE
3004 /* First get what we need from the image request and release it */
3005
8b3e1a56 3006 obj_request = img_request->obj_request;
02c74fba
AE
3007 img_xferred = img_request->xferred;
3008 img_result = img_request->result;
3009 rbd_img_request_put(img_request);
3010
3011 /*
3012 * If the overlap has become 0 (most likely because the
3013 * image has been flattened) we need to re-submit the
3014 * original request.
3015 */
a9e8ba2c
AE
3016 rbd_assert(obj_request);
3017 rbd_assert(obj_request->img_request);
02c74fba
AE
3018 rbd_dev = obj_request->img_request->rbd_dev;
3019 if (!rbd_dev->parent_overlap) {
3020 struct ceph_osd_client *osdc;
3021
3022 osdc = &rbd_dev->rbd_client->client->osdc;
3023 img_result = rbd_obj_request_submit(osdc, obj_request);
3024 if (!img_result)
3025 return;
3026 }
a9e8ba2c 3027
02c74fba 3028 obj_request->result = img_result;
a9e8ba2c
AE
3029 if (obj_request->result)
3030 goto out;
3031
3032 /*
3033 * We need to zero anything beyond the parent overlap
3034 * boundary. Since rbd_img_obj_request_read_callback()
3035 * will zero anything beyond the end of a short read, an
3036 * easy way to do this is to pretend the data from the
3037 * parent came up short--ending at the overlap boundary.
3038 */
3039 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3040 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
3041 if (obj_end > rbd_dev->parent_overlap) {
3042 u64 xferred = 0;
3043
3044 if (obj_request->img_offset < rbd_dev->parent_overlap)
3045 xferred = rbd_dev->parent_overlap -
3046 obj_request->img_offset;
8b3e1a56 3047
02c74fba 3048 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 3049 } else {
02c74fba 3050 obj_request->xferred = img_xferred;
a9e8ba2c
AE
3051 }
3052out:
8b3e1a56
AE
3053 rbd_img_obj_request_read_callback(obj_request);
3054 rbd_obj_request_complete(obj_request);
3055}
3056
3057static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3058{
8b3e1a56
AE
3059 struct rbd_img_request *img_request;
3060 int result;
3061
3062 rbd_assert(obj_request_img_data_test(obj_request));
3063 rbd_assert(obj_request->img_request != NULL);
3064 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 3065 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 3066
8b3e1a56 3067 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 3068 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 3069 obj_request->img_offset,
e93f3152 3070 obj_request->length);
8b3e1a56
AE
3071 result = -ENOMEM;
3072 if (!img_request)
3073 goto out_err;
3074
5b2ab72d
AE
3075 if (obj_request->type == OBJ_REQUEST_BIO)
3076 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3077 obj_request->bio_list);
3078 else
3079 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3080 obj_request->pages);
8b3e1a56
AE
3081 if (result)
3082 goto out_err;
3083
3084 img_request->callback = rbd_img_parent_read_callback;
3085 result = rbd_img_request_submit(img_request);
3086 if (result)
3087 goto out_err;
3088
3089 return;
3090out_err:
3091 if (img_request)
3092 rbd_img_request_put(img_request);
3093 obj_request->result = result;
3094 obj_request->xferred = 0;
3095 obj_request_done_set(obj_request);
3096}
bf0d5f50 3097
922dab61
ID
3098static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3099 u64 notifier_id, void *data, size_t data_len)
b8d70035 3100{
922dab61
ID
3101 struct rbd_device *rbd_dev = arg;
3102 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
e627db08 3103 int ret;
b8d70035 3104
922dab61
ID
3105 dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
3106 cookie, notify_id);
52bb1f9b
ID
3107
3108 /*
3109 * Until adequate refresh error handling is in place, there is
3110 * not much we can do here, except warn.
3111 *
3112 * See http://tracker.ceph.com/issues/5040
3113 */
e627db08
AE
3114 ret = rbd_dev_refresh(rbd_dev);
3115 if (ret)
9584d508 3116 rbd_warn(rbd_dev, "refresh failed: %d", ret);
b8d70035 3117
922dab61
ID
3118 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3119 &rbd_dev->header_oloc, notify_id, cookie,
3120 NULL, 0);
52bb1f9b 3121 if (ret)
9584d508 3122 rbd_warn(rbd_dev, "notify_ack ret %d", ret);
b8d70035
AE
3123}
3124
99d16943
ID
3125static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3126
922dab61 3127static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3128{
922dab61 3129 struct rbd_device *rbd_dev = arg;
bb040aa0 3130
922dab61 3131 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3132
99d16943
ID
3133 mutex_lock(&rbd_dev->watch_mutex);
3134 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3135 __rbd_unregister_watch(rbd_dev);
3136 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3137
99d16943 3138 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3139 }
99d16943 3140 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3141}
3142
9969ebc5 3143/*
99d16943 3144 * watch_mutex must be locked
9969ebc5 3145 */
99d16943 3146static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3147{
3148 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3149 struct ceph_osd_linger_request *handle;
9969ebc5 3150
922dab61 3151 rbd_assert(!rbd_dev->watch_handle);
99d16943 3152 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3153
922dab61
ID
3154 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3155 &rbd_dev->header_oloc, rbd_watch_cb,
3156 rbd_watch_errcb, rbd_dev);
3157 if (IS_ERR(handle))
3158 return PTR_ERR(handle);
8eb87565 3159
922dab61 3160 rbd_dev->watch_handle = handle;
b30a01f2 3161 return 0;
b30a01f2
ID
3162}
3163
99d16943
ID
3164/*
3165 * watch_mutex must be locked
3166 */
3167static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3168{
922dab61
ID
3169 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3170 int ret;
b30a01f2 3171
99d16943
ID
3172 rbd_assert(rbd_dev->watch_handle);
3173 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3174
922dab61
ID
3175 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3176 if (ret)
3177 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3178
922dab61 3179 rbd_dev->watch_handle = NULL;
c525f036
ID
3180}
3181
99d16943
ID
3182static int rbd_register_watch(struct rbd_device *rbd_dev)
3183{
3184 int ret;
3185
3186 mutex_lock(&rbd_dev->watch_mutex);
3187 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3188 ret = __rbd_register_watch(rbd_dev);
3189 if (ret)
3190 goto out;
3191
3192 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3193 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3194
3195out:
3196 mutex_unlock(&rbd_dev->watch_mutex);
3197 return ret;
3198}
3199
3200static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3201{
99d16943
ID
3202 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3203
3204 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3205}
3206
3207static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3208{
3209 cancel_tasks_sync(rbd_dev);
3210
3211 mutex_lock(&rbd_dev->watch_mutex);
3212 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3213 __rbd_unregister_watch(rbd_dev);
3214 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3215 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3216
811c6688 3217 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3218}
3219
99d16943
ID
3220static void rbd_reregister_watch(struct work_struct *work)
3221{
3222 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3223 struct rbd_device, watch_dwork);
3224 int ret;
3225
3226 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3227
3228 mutex_lock(&rbd_dev->watch_mutex);
3229 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
3230 goto fail_unlock;
3231
3232 ret = __rbd_register_watch(rbd_dev);
3233 if (ret) {
3234 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3235 if (ret != -EBLACKLISTED)
3236 queue_delayed_work(rbd_dev->task_wq,
3237 &rbd_dev->watch_dwork,
3238 RBD_RETRY_DELAY);
3239 goto fail_unlock;
3240 }
3241
3242 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3243 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3244 mutex_unlock(&rbd_dev->watch_mutex);
3245
3246 ret = rbd_dev_refresh(rbd_dev);
3247 if (ret)
3248 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3249
3250 return;
3251
3252fail_unlock:
3253 mutex_unlock(&rbd_dev->watch_mutex);
3254}
3255
36be9a76 3256/*
f40eb349
AE
3257 * Synchronous osd object method call. Returns the number of bytes
3258 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3259 */
3260static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3261 const char *object_name,
3262 const char *class_name,
3263 const char *method_name,
4157976b 3264 const void *outbound,
36be9a76 3265 size_t outbound_size,
4157976b 3266 void *inbound,
e2a58ee5 3267 size_t inbound_size)
36be9a76 3268{
2169238d 3269 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 3270 struct rbd_obj_request *obj_request;
36be9a76
AE
3271 struct page **pages;
3272 u32 page_count;
3273 int ret;
3274
3275 /*
6010a451
AE
3276 * Method calls are ultimately read operations. The result
3277 * should placed into the inbound buffer provided. They
3278 * also supply outbound data--parameters for the object
3279 * method. Currently if this is present it will be a
3280 * snapshot id.
36be9a76 3281 */
57385b51 3282 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
3283 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3284 if (IS_ERR(pages))
3285 return PTR_ERR(pages);
3286
3287 ret = -ENOMEM;
6010a451 3288 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
3289 OBJ_REQUEST_PAGES);
3290 if (!obj_request)
3291 goto out;
3292
3293 obj_request->pages = pages;
3294 obj_request->page_count = page_count;
3295
6d2940c8 3296 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 3297 obj_request);
36be9a76
AE
3298 if (!obj_request->osd_req)
3299 goto out;
3300
c99d2d4a 3301 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
3302 class_name, method_name);
3303 if (outbound_size) {
3304 struct ceph_pagelist *pagelist;
3305
3306 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3307 if (!pagelist)
3308 goto out;
3309
3310 ceph_pagelist_init(pagelist);
3311 ceph_pagelist_append(pagelist, outbound, outbound_size);
3312 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3313 pagelist);
3314 }
a4ce40a9
AE
3315 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3316 obj_request->pages, inbound_size,
44cd188d 3317 0, false, false);
9d4df01f 3318 rbd_osd_req_format_read(obj_request);
430c28c3 3319
36be9a76
AE
3320 ret = rbd_obj_request_submit(osdc, obj_request);
3321 if (ret)
3322 goto out;
3323 ret = rbd_obj_request_wait(obj_request);
3324 if (ret)
3325 goto out;
3326
3327 ret = obj_request->result;
3328 if (ret < 0)
3329 goto out;
57385b51
AE
3330
3331 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3332 ret = (int)obj_request->xferred;
903bb32e 3333 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3334out:
3335 if (obj_request)
3336 rbd_obj_request_put(obj_request);
3337 else
3338 ceph_release_page_vector(pages, page_count);
3339
3340 return ret;
3341}
3342
7ad18afa 3343static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 3344{
7ad18afa
CH
3345 struct request *rq = blk_mq_rq_from_pdu(work);
3346 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 3347 struct rbd_img_request *img_request;
4e752f0a 3348 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
3349 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3350 u64 length = blk_rq_bytes(rq);
6d2940c8 3351 enum obj_operation_type op_type;
4e752f0a 3352 u64 mapping_size;
bf0d5f50
AE
3353 int result;
3354
7ad18afa
CH
3355 if (rq->cmd_type != REQ_TYPE_FS) {
3356 dout("%s: non-fs request type %d\n", __func__,
3357 (int) rq->cmd_type);
3358 result = -EIO;
3359 goto err;
3360 }
3361
c2df40df 3362 if (req_op(rq) == REQ_OP_DISCARD)
90e98c52 3363 op_type = OBJ_OP_DISCARD;
c2df40df 3364 else if (req_op(rq) == REQ_OP_WRITE)
6d2940c8
GZ
3365 op_type = OBJ_OP_WRITE;
3366 else
3367 op_type = OBJ_OP_READ;
3368
bc1ecc65 3369 /* Ignore/skip any zero-length requests */
bf0d5f50 3370
bc1ecc65
ID
3371 if (!length) {
3372 dout("%s: zero-length request\n", __func__);
3373 result = 0;
3374 goto err_rq;
3375 }
bf0d5f50 3376
6d2940c8 3377 /* Only reads are allowed to a read-only device */
bc1ecc65 3378
6d2940c8 3379 if (op_type != OBJ_OP_READ) {
bc1ecc65
ID
3380 if (rbd_dev->mapping.read_only) {
3381 result = -EROFS;
3382 goto err_rq;
4dda41d3 3383 }
bc1ecc65
ID
3384 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3385 }
4dda41d3 3386
bc1ecc65
ID
3387 /*
3388 * Quit early if the mapped snapshot no longer exists. It's
3389 * still possible the snapshot will have disappeared by the
3390 * time our request arrives at the osd, but there's no sense in
3391 * sending it if we already know.
3392 */
3393 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3394 dout("request for non-existent snapshot");
3395 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3396 result = -ENXIO;
3397 goto err_rq;
3398 }
4dda41d3 3399
bc1ecc65
ID
3400 if (offset && length > U64_MAX - offset + 1) {
3401 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3402 length);
3403 result = -EINVAL;
3404 goto err_rq; /* Shouldn't happen */
3405 }
4dda41d3 3406
7ad18afa
CH
3407 blk_mq_start_request(rq);
3408
4e752f0a
JD
3409 down_read(&rbd_dev->header_rwsem);
3410 mapping_size = rbd_dev->mapping.size;
6d2940c8 3411 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
3412 snapc = rbd_dev->header.snapc;
3413 ceph_get_snap_context(snapc);
3414 }
3415 up_read(&rbd_dev->header_rwsem);
3416
3417 if (offset + length > mapping_size) {
bc1ecc65 3418 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 3419 length, mapping_size);
bc1ecc65
ID
3420 result = -EIO;
3421 goto err_rq;
3422 }
bf0d5f50 3423
6d2940c8 3424 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 3425 snapc);
bc1ecc65
ID
3426 if (!img_request) {
3427 result = -ENOMEM;
3428 goto err_rq;
3429 }
3430 img_request->rq = rq;
70b16db8 3431 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 3432
90e98c52
GZ
3433 if (op_type == OBJ_OP_DISCARD)
3434 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
3435 NULL);
3436 else
3437 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3438 rq->bio);
bc1ecc65
ID
3439 if (result)
3440 goto err_img_request;
bf0d5f50 3441
bc1ecc65
ID
3442 result = rbd_img_request_submit(img_request);
3443 if (result)
3444 goto err_img_request;
bf0d5f50 3445
bc1ecc65 3446 return;
bf0d5f50 3447
bc1ecc65
ID
3448err_img_request:
3449 rbd_img_request_put(img_request);
3450err_rq:
3451 if (result)
3452 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 3453 obj_op_name(op_type), length, offset, result);
e96a650a 3454 ceph_put_snap_context(snapc);
7ad18afa
CH
3455err:
3456 blk_mq_end_request(rq, result);
bc1ecc65 3457}
bf0d5f50 3458
7ad18afa
CH
3459static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3460 const struct blk_mq_queue_data *bd)
bc1ecc65 3461{
7ad18afa
CH
3462 struct request *rq = bd->rq;
3463 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 3464
7ad18afa
CH
3465 queue_work(rbd_wq, work);
3466 return BLK_MQ_RQ_QUEUE_OK;
bf0d5f50
AE
3467}
3468
602adf40
YS
3469static void rbd_free_disk(struct rbd_device *rbd_dev)
3470{
3471 struct gendisk *disk = rbd_dev->disk;
3472
3473 if (!disk)
3474 return;
3475
a0cab924
AE
3476 rbd_dev->disk = NULL;
3477 if (disk->flags & GENHD_FL_UP) {
602adf40 3478 del_gendisk(disk);
a0cab924
AE
3479 if (disk->queue)
3480 blk_cleanup_queue(disk->queue);
7ad18afa 3481 blk_mq_free_tag_set(&rbd_dev->tag_set);
a0cab924 3482 }
602adf40
YS
3483 put_disk(disk);
3484}
3485
788e2df3
AE
3486static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3487 const char *object_name,
7097f8df 3488 u64 offset, u64 length, void *buf)
788e2df3
AE
3489
3490{
2169238d 3491 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3492 struct rbd_obj_request *obj_request;
788e2df3
AE
3493 struct page **pages = NULL;
3494 u32 page_count;
1ceae7ef 3495 size_t size;
788e2df3
AE
3496 int ret;
3497
3498 page_count = (u32) calc_pages_for(offset, length);
3499 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3500 if (IS_ERR(pages))
a8d42056 3501 return PTR_ERR(pages);
788e2df3
AE
3502
3503 ret = -ENOMEM;
3504 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3505 OBJ_REQUEST_PAGES);
788e2df3
AE
3506 if (!obj_request)
3507 goto out;
3508
3509 obj_request->pages = pages;
3510 obj_request->page_count = page_count;
3511
6d2940c8 3512 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 3513 obj_request);
788e2df3
AE
3514 if (!obj_request->osd_req)
3515 goto out;
3516
c99d2d4a
AE
3517 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3518 offset, length, 0, 0);
406e2c9f 3519 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3520 obj_request->pages,
44cd188d
AE
3521 obj_request->length,
3522 obj_request->offset & ~PAGE_MASK,
3523 false, false);
9d4df01f 3524 rbd_osd_req_format_read(obj_request);
430c28c3 3525
788e2df3
AE
3526 ret = rbd_obj_request_submit(osdc, obj_request);
3527 if (ret)
3528 goto out;
3529 ret = rbd_obj_request_wait(obj_request);
3530 if (ret)
3531 goto out;
3532
3533 ret = obj_request->result;
3534 if (ret < 0)
3535 goto out;
1ceae7ef
AE
3536
3537 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3538 size = (size_t) obj_request->xferred;
903bb32e 3539 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3540 rbd_assert(size <= (size_t)INT_MAX);
3541 ret = (int)size;
788e2df3
AE
3542out:
3543 if (obj_request)
3544 rbd_obj_request_put(obj_request);
3545 else
3546 ceph_release_page_vector(pages, page_count);
3547
3548 return ret;
3549}
3550
602adf40 3551/*
662518b1
AE
3552 * Read the complete header for the given rbd device. On successful
3553 * return, the rbd_dev->header field will contain up-to-date
3554 * information about the image.
602adf40 3555 */
99a41ebc 3556static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3557{
4156d998 3558 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3559 u32 snap_count = 0;
4156d998
AE
3560 u64 names_size = 0;
3561 u32 want_count;
3562 int ret;
602adf40 3563
00f1f36f 3564 /*
4156d998
AE
3565 * The complete header will include an array of its 64-bit
3566 * snapshot ids, followed by the names of those snapshots as
3567 * a contiguous block of NUL-terminated strings. Note that
3568 * the number of snapshots could change by the time we read
3569 * it in, in which case we re-read it.
00f1f36f 3570 */
4156d998
AE
3571 do {
3572 size_t size;
3573
3574 kfree(ondisk);
3575
3576 size = sizeof (*ondisk);
3577 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3578 size += names_size;
3579 ondisk = kmalloc(size, GFP_KERNEL);
3580 if (!ondisk)
662518b1 3581 return -ENOMEM;
4156d998 3582
c41d13a3 3583 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
7097f8df 3584 0, size, ondisk);
4156d998 3585 if (ret < 0)
662518b1 3586 goto out;
c0cd10db 3587 if ((size_t)ret < size) {
4156d998 3588 ret = -ENXIO;
06ecc6cb
AE
3589 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3590 size, ret);
662518b1 3591 goto out;
4156d998
AE
3592 }
3593 if (!rbd_dev_ondisk_valid(ondisk)) {
3594 ret = -ENXIO;
06ecc6cb 3595 rbd_warn(rbd_dev, "invalid header");
662518b1 3596 goto out;
81e759fb 3597 }
602adf40 3598
4156d998
AE
3599 names_size = le64_to_cpu(ondisk->snap_names_len);
3600 want_count = snap_count;
3601 snap_count = le32_to_cpu(ondisk->snap_count);
3602 } while (snap_count != want_count);
00f1f36f 3603
662518b1
AE
3604 ret = rbd_header_from_disk(rbd_dev, ondisk);
3605out:
4156d998
AE
3606 kfree(ondisk);
3607
3608 return ret;
602adf40
YS
3609}
3610
15228ede
AE
3611/*
3612 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3613 * has disappeared from the (just updated) snapshot context.
3614 */
3615static void rbd_exists_validate(struct rbd_device *rbd_dev)
3616{
3617 u64 snap_id;
3618
3619 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3620 return;
3621
3622 snap_id = rbd_dev->spec->snap_id;
3623 if (snap_id == CEPH_NOSNAP)
3624 return;
3625
3626 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3627 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3628}
3629
9875201e
JD
3630static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3631{
3632 sector_t size;
9875201e
JD
3633
3634 /*
811c6688
ID
3635 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3636 * try to update its size. If REMOVING is set, updating size
3637 * is just useless work since the device can't be opened.
9875201e 3638 */
811c6688
ID
3639 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3640 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
3641 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3642 dout("setting size to %llu sectors", (unsigned long long)size);
3643 set_capacity(rbd_dev->disk, size);
3644 revalidate_disk(rbd_dev->disk);
3645 }
3646}
3647
cc4a38bd 3648static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3649{
e627db08 3650 u64 mapping_size;
1fe5e993
AE
3651 int ret;
3652
cfbf6377 3653 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3654 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
3655
3656 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 3657 if (ret)
73e39e4d 3658 goto out;
15228ede 3659
e8f59b59
ID
3660 /*
3661 * If there is a parent, see if it has disappeared due to the
3662 * mapped image getting flattened.
3663 */
3664 if (rbd_dev->parent) {
3665 ret = rbd_dev_v2_parent_info(rbd_dev);
3666 if (ret)
73e39e4d 3667 goto out;
e8f59b59
ID
3668 }
3669
5ff1108c 3670 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 3671 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
3672 } else {
3673 /* validate mapped snapshot's EXISTS flag */
3674 rbd_exists_validate(rbd_dev);
3675 }
15228ede 3676
73e39e4d 3677out:
cfbf6377 3678 up_write(&rbd_dev->header_rwsem);
73e39e4d 3679 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 3680 rbd_dev_update_size(rbd_dev);
1fe5e993 3681
73e39e4d 3682 return ret;
1fe5e993
AE
3683}
3684
7ad18afa
CH
3685static int rbd_init_request(void *data, struct request *rq,
3686 unsigned int hctx_idx, unsigned int request_idx,
3687 unsigned int numa_node)
3688{
3689 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3690
3691 INIT_WORK(work, rbd_queue_workfn);
3692 return 0;
3693}
3694
3695static struct blk_mq_ops rbd_mq_ops = {
3696 .queue_rq = rbd_queue_rq,
3697 .map_queue = blk_mq_map_queue,
3698 .init_request = rbd_init_request,
3699};
3700
602adf40
YS
3701static int rbd_init_disk(struct rbd_device *rbd_dev)
3702{
3703 struct gendisk *disk;
3704 struct request_queue *q;
593a9e7b 3705 u64 segment_size;
7ad18afa 3706 int err;
602adf40 3707
602adf40 3708 /* create gendisk info */
7e513d43
ID
3709 disk = alloc_disk(single_major ?
3710 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3711 RBD_MINORS_PER_MAJOR);
602adf40 3712 if (!disk)
1fcdb8aa 3713 return -ENOMEM;
602adf40 3714
f0f8cef5 3715 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3716 rbd_dev->dev_id);
602adf40 3717 disk->major = rbd_dev->major;
dd82fff1 3718 disk->first_minor = rbd_dev->minor;
7e513d43
ID
3719 if (single_major)
3720 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
3721 disk->fops = &rbd_bd_ops;
3722 disk->private_data = rbd_dev;
3723
7ad18afa
CH
3724 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3725 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 3726 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 3727 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 3728 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
3729 rbd_dev->tag_set.nr_hw_queues = 1;
3730 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3731
3732 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3733 if (err)
602adf40 3734 goto out_disk;
029bcbd8 3735
7ad18afa
CH
3736 q = blk_mq_init_queue(&rbd_dev->tag_set);
3737 if (IS_ERR(q)) {
3738 err = PTR_ERR(q);
3739 goto out_tag_set;
3740 }
3741
d8a2c89c
ID
3742 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
3743 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 3744
029bcbd8 3745 /* set io sizes to object size */
593a9e7b
AE
3746 segment_size = rbd_obj_bytes(&rbd_dev->header);
3747 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 3748 q->limits.max_sectors = queue_max_hw_sectors(q);
d3834fef 3749 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
593a9e7b
AE
3750 blk_queue_max_segment_size(q, segment_size);
3751 blk_queue_io_min(q, segment_size);
3752 blk_queue_io_opt(q, segment_size);
029bcbd8 3753
90e98c52
GZ
3754 /* enable the discard support */
3755 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
3756 q->limits.discard_granularity = segment_size;
3757 q->limits.discard_alignment = segment_size;
2bb4cd5c 3758 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
b76f8239 3759 q->limits.discard_zeroes_data = 1;
90e98c52 3760
bae818ee
RH
3761 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
3762 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
3763
602adf40
YS
3764 disk->queue = q;
3765
3766 q->queuedata = rbd_dev;
3767
3768 rbd_dev->disk = disk;
602adf40 3769
602adf40 3770 return 0;
7ad18afa
CH
3771out_tag_set:
3772 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
3773out_disk:
3774 put_disk(disk);
7ad18afa 3775 return err;
602adf40
YS
3776}
3777
dfc5606d
YS
3778/*
3779 sysfs
3780*/
3781
593a9e7b
AE
3782static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3783{
3784 return container_of(dev, struct rbd_device, dev);
3785}
3786
dfc5606d
YS
3787static ssize_t rbd_size_show(struct device *dev,
3788 struct device_attribute *attr, char *buf)
3789{
593a9e7b 3790 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3791
fc71d833
AE
3792 return sprintf(buf, "%llu\n",
3793 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3794}
3795
34b13184
AE
3796/*
3797 * Note this shows the features for whatever's mapped, which is not
3798 * necessarily the base image.
3799 */
3800static ssize_t rbd_features_show(struct device *dev,
3801 struct device_attribute *attr, char *buf)
3802{
3803 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3804
3805 return sprintf(buf, "0x%016llx\n",
fc71d833 3806 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3807}
3808
dfc5606d
YS
3809static ssize_t rbd_major_show(struct device *dev,
3810 struct device_attribute *attr, char *buf)
3811{
593a9e7b 3812 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3813
fc71d833
AE
3814 if (rbd_dev->major)
3815 return sprintf(buf, "%d\n", rbd_dev->major);
3816
3817 return sprintf(buf, "(none)\n");
dd82fff1
ID
3818}
3819
3820static ssize_t rbd_minor_show(struct device *dev,
3821 struct device_attribute *attr, char *buf)
3822{
3823 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 3824
dd82fff1 3825 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
3826}
3827
3828static ssize_t rbd_client_id_show(struct device *dev,
3829 struct device_attribute *attr, char *buf)
602adf40 3830{
593a9e7b 3831 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3832
1dbb4399 3833 return sprintf(buf, "client%lld\n",
033268a5 3834 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
3835}
3836
dfc5606d
YS
3837static ssize_t rbd_pool_show(struct device *dev,
3838 struct device_attribute *attr, char *buf)
602adf40 3839{
593a9e7b 3840 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3841
0d7dbfce 3842 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3843}
3844
9bb2f334
AE
3845static ssize_t rbd_pool_id_show(struct device *dev,
3846 struct device_attribute *attr, char *buf)
3847{
3848 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3849
0d7dbfce 3850 return sprintf(buf, "%llu\n",
fc71d833 3851 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3852}
3853
dfc5606d
YS
3854static ssize_t rbd_name_show(struct device *dev,
3855 struct device_attribute *attr, char *buf)
3856{
593a9e7b 3857 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3858
a92ffdf8
AE
3859 if (rbd_dev->spec->image_name)
3860 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3861
3862 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3863}
3864
589d30e0
AE
3865static ssize_t rbd_image_id_show(struct device *dev,
3866 struct device_attribute *attr, char *buf)
3867{
3868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3869
0d7dbfce 3870 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3871}
3872
34b13184
AE
3873/*
3874 * Shows the name of the currently-mapped snapshot (or
3875 * RBD_SNAP_HEAD_NAME for the base image).
3876 */
dfc5606d
YS
3877static ssize_t rbd_snap_show(struct device *dev,
3878 struct device_attribute *attr,
3879 char *buf)
3880{
593a9e7b 3881 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3882
0d7dbfce 3883 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3884}
3885
86b00e0d 3886/*
ff96128f
ID
3887 * For a v2 image, shows the chain of parent images, separated by empty
3888 * lines. For v1 images or if there is no parent, shows "(no parent
3889 * image)".
86b00e0d
AE
3890 */
3891static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
3892 struct device_attribute *attr,
3893 char *buf)
86b00e0d
AE
3894{
3895 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 3896 ssize_t count = 0;
86b00e0d 3897
ff96128f 3898 if (!rbd_dev->parent)
86b00e0d
AE
3899 return sprintf(buf, "(no parent image)\n");
3900
ff96128f
ID
3901 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3902 struct rbd_spec *spec = rbd_dev->parent_spec;
3903
3904 count += sprintf(&buf[count], "%s"
3905 "pool_id %llu\npool_name %s\n"
3906 "image_id %s\nimage_name %s\n"
3907 "snap_id %llu\nsnap_name %s\n"
3908 "overlap %llu\n",
3909 !count ? "" : "\n", /* first? */
3910 spec->pool_id, spec->pool_name,
3911 spec->image_id, spec->image_name ?: "(unknown)",
3912 spec->snap_id, spec->snap_name,
3913 rbd_dev->parent_overlap);
3914 }
3915
3916 return count;
86b00e0d
AE
3917}
3918
dfc5606d
YS
3919static ssize_t rbd_image_refresh(struct device *dev,
3920 struct device_attribute *attr,
3921 const char *buf,
3922 size_t size)
3923{
593a9e7b 3924 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3925 int ret;
602adf40 3926
cc4a38bd 3927 ret = rbd_dev_refresh(rbd_dev);
e627db08 3928 if (ret)
52bb1f9b 3929 return ret;
b813623a 3930
52bb1f9b 3931 return size;
dfc5606d 3932}
602adf40 3933
dfc5606d 3934static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3935static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 3936static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 3937static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
dfc5606d
YS
3938static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3939static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3940static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3941static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3942static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3943static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3944static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3945static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3946
3947static struct attribute *rbd_attrs[] = {
3948 &dev_attr_size.attr,
34b13184 3949 &dev_attr_features.attr,
dfc5606d 3950 &dev_attr_major.attr,
dd82fff1 3951 &dev_attr_minor.attr,
dfc5606d
YS
3952 &dev_attr_client_id.attr,
3953 &dev_attr_pool.attr,
9bb2f334 3954 &dev_attr_pool_id.attr,
dfc5606d 3955 &dev_attr_name.attr,
589d30e0 3956 &dev_attr_image_id.attr,
dfc5606d 3957 &dev_attr_current_snap.attr,
86b00e0d 3958 &dev_attr_parent.attr,
dfc5606d 3959 &dev_attr_refresh.attr,
dfc5606d
YS
3960 NULL
3961};
3962
3963static struct attribute_group rbd_attr_group = {
3964 .attrs = rbd_attrs,
3965};
3966
3967static const struct attribute_group *rbd_attr_groups[] = {
3968 &rbd_attr_group,
3969 NULL
3970};
3971
6cac4695 3972static void rbd_dev_release(struct device *dev);
dfc5606d
YS
3973
3974static struct device_type rbd_device_type = {
3975 .name = "rbd",
3976 .groups = rbd_attr_groups,
6cac4695 3977 .release = rbd_dev_release,
dfc5606d
YS
3978};
3979
8b8fb99c
AE
3980static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3981{
3982 kref_get(&spec->kref);
3983
3984 return spec;
3985}
3986
3987static void rbd_spec_free(struct kref *kref);
3988static void rbd_spec_put(struct rbd_spec *spec)
3989{
3990 if (spec)
3991 kref_put(&spec->kref, rbd_spec_free);
3992}
3993
3994static struct rbd_spec *rbd_spec_alloc(void)
3995{
3996 struct rbd_spec *spec;
3997
3998 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3999 if (!spec)
4000 return NULL;
04077599
ID
4001
4002 spec->pool_id = CEPH_NOPOOL;
4003 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4004 kref_init(&spec->kref);
4005
8b8fb99c
AE
4006 return spec;
4007}
4008
4009static void rbd_spec_free(struct kref *kref)
4010{
4011 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4012
4013 kfree(spec->pool_name);
4014 kfree(spec->image_id);
4015 kfree(spec->image_name);
4016 kfree(spec->snap_name);
4017 kfree(spec);
4018}
4019
1643dfa4 4020static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4021{
99d16943
ID
4022 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4023
c41d13a3 4024 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4025 ceph_oloc_destroy(&rbd_dev->header_oloc);
c41d13a3 4026
dd5ac32d
ID
4027 rbd_put_client(rbd_dev->rbd_client);
4028 rbd_spec_put(rbd_dev->spec);
4029 kfree(rbd_dev->opts);
4030 kfree(rbd_dev);
1643dfa4
ID
4031}
4032
4033static void rbd_dev_release(struct device *dev)
4034{
4035 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4036 bool need_put = !!rbd_dev->opts;
4037
4038 if (need_put) {
4039 destroy_workqueue(rbd_dev->task_wq);
4040 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4041 }
4042
4043 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4044
4045 /*
4046 * This is racy, but way better than putting module outside of
4047 * the release callback. The race window is pretty small, so
4048 * doing something similar to dm (dm-builtin.c) is overkill.
4049 */
4050 if (need_put)
4051 module_put(THIS_MODULE);
4052}
4053
1643dfa4
ID
4054static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4055 struct rbd_spec *spec)
c53d5893
AE
4056{
4057 struct rbd_device *rbd_dev;
4058
1643dfa4 4059 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4060 if (!rbd_dev)
4061 return NULL;
4062
4063 spin_lock_init(&rbd_dev->lock);
4064 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4065 init_rwsem(&rbd_dev->header_rwsem);
4066
c41d13a3 4067 ceph_oid_init(&rbd_dev->header_oid);
922dab61 4068 ceph_oloc_init(&rbd_dev->header_oloc);
c41d13a3 4069
99d16943
ID
4070 mutex_init(&rbd_dev->watch_mutex);
4071 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4072 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4073
dd5ac32d
ID
4074 rbd_dev->dev.bus = &rbd_bus_type;
4075 rbd_dev->dev.type = &rbd_device_type;
4076 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4077 device_initialize(&rbd_dev->dev);
4078
c53d5893 4079 rbd_dev->rbd_client = rbdc;
d147543d 4080 rbd_dev->spec = spec;
0903e875 4081
7627151e
YZ
4082 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4083 rbd_dev->layout.stripe_count = 1;
4084 rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
4085 rbd_dev->layout.pool_id = spec->pool_id;
30c156d9 4086 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
0903e875 4087
1643dfa4
ID
4088 return rbd_dev;
4089}
4090
4091/*
4092 * Create a mapping rbd_dev.
4093 */
4094static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4095 struct rbd_spec *spec,
4096 struct rbd_options *opts)
4097{
4098 struct rbd_device *rbd_dev;
4099
4100 rbd_dev = __rbd_dev_create(rbdc, spec);
4101 if (!rbd_dev)
4102 return NULL;
4103
4104 rbd_dev->opts = opts;
4105
4106 /* get an id and fill in device name */
4107 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4108 minor_to_rbd_dev_id(1 << MINORBITS),
4109 GFP_KERNEL);
4110 if (rbd_dev->dev_id < 0)
4111 goto fail_rbd_dev;
4112
4113 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4114 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4115 rbd_dev->name);
4116 if (!rbd_dev->task_wq)
4117 goto fail_dev_id;
dd5ac32d 4118
1643dfa4
ID
4119 /* we have a ref from do_rbd_add() */
4120 __module_get(THIS_MODULE);
4121
4122 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4123 return rbd_dev;
1643dfa4
ID
4124
4125fail_dev_id:
4126 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4127fail_rbd_dev:
4128 rbd_dev_free(rbd_dev);
4129 return NULL;
c53d5893
AE
4130}
4131
4132static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4133{
dd5ac32d
ID
4134 if (rbd_dev)
4135 put_device(&rbd_dev->dev);
c53d5893
AE
4136}
4137
9d475de5
AE
4138/*
4139 * Get the size and object order for an image snapshot, or if
4140 * snap_id is CEPH_NOSNAP, gets this information for the base
4141 * image.
4142 */
4143static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4144 u8 *order, u64 *snap_size)
4145{
4146 __le64 snapid = cpu_to_le64(snap_id);
4147 int ret;
4148 struct {
4149 u8 order;
4150 __le64 size;
4151 } __attribute__ ((packed)) size_buf = { 0 };
4152
c41d13a3 4153 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
9d475de5 4154 "rbd", "get_size",
4157976b 4155 &snapid, sizeof (snapid),
e2a58ee5 4156 &size_buf, sizeof (size_buf));
36be9a76 4157 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4158 if (ret < 0)
4159 return ret;
57385b51
AE
4160 if (ret < sizeof (size_buf))
4161 return -ERANGE;
9d475de5 4162
c3545579 4163 if (order) {
c86f86e9 4164 *order = size_buf.order;
c3545579
JD
4165 dout(" order %u", (unsigned int)*order);
4166 }
9d475de5
AE
4167 *snap_size = le64_to_cpu(size_buf.size);
4168
c3545579
JD
4169 dout(" snap_id 0x%016llx snap_size = %llu\n",
4170 (unsigned long long)snap_id,
57385b51 4171 (unsigned long long)*snap_size);
9d475de5
AE
4172
4173 return 0;
4174}
4175
4176static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4177{
4178 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4179 &rbd_dev->header.obj_order,
4180 &rbd_dev->header.image_size);
4181}
4182
1e130199
AE
4183static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4184{
4185 void *reply_buf;
4186 int ret;
4187 void *p;
4188
4189 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4190 if (!reply_buf)
4191 return -ENOMEM;
4192
c41d13a3 4193 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 4194 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 4195 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4196 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4197 if (ret < 0)
4198 goto out;
4199
4200 p = reply_buf;
4201 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4202 p + ret, NULL, GFP_NOIO);
4203 ret = 0;
1e130199
AE
4204
4205 if (IS_ERR(rbd_dev->header.object_prefix)) {
4206 ret = PTR_ERR(rbd_dev->header.object_prefix);
4207 rbd_dev->header.object_prefix = NULL;
4208 } else {
4209 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4210 }
1e130199
AE
4211out:
4212 kfree(reply_buf);
4213
4214 return ret;
4215}
4216
b1b5402a
AE
4217static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4218 u64 *snap_features)
4219{
4220 __le64 snapid = cpu_to_le64(snap_id);
4221 struct {
4222 __le64 features;
4223 __le64 incompat;
4157976b 4224 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4225 u64 unsup;
b1b5402a
AE
4226 int ret;
4227
c41d13a3 4228 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b1b5402a 4229 "rbd", "get_features",
4157976b 4230 &snapid, sizeof (snapid),
e2a58ee5 4231 &features_buf, sizeof (features_buf));
36be9a76 4232 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4233 if (ret < 0)
4234 return ret;
57385b51
AE
4235 if (ret < sizeof (features_buf))
4236 return -ERANGE;
d889140c 4237
d3767f0f
ID
4238 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4239 if (unsup) {
4240 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4241 unsup);
b8f5c6ed 4242 return -ENXIO;
d3767f0f 4243 }
d889140c 4244
b1b5402a
AE
4245 *snap_features = le64_to_cpu(features_buf.features);
4246
4247 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4248 (unsigned long long)snap_id,
4249 (unsigned long long)*snap_features,
4250 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4251
4252 return 0;
4253}
4254
4255static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4256{
4257 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4258 &rbd_dev->header.features);
4259}
4260
86b00e0d
AE
4261static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4262{
4263 struct rbd_spec *parent_spec;
4264 size_t size;
4265 void *reply_buf = NULL;
4266 __le64 snapid;
4267 void *p;
4268 void *end;
642a2537 4269 u64 pool_id;
86b00e0d 4270 char *image_id;
3b5cf2a2 4271 u64 snap_id;
86b00e0d 4272 u64 overlap;
86b00e0d
AE
4273 int ret;
4274
4275 parent_spec = rbd_spec_alloc();
4276 if (!parent_spec)
4277 return -ENOMEM;
4278
4279 size = sizeof (__le64) + /* pool_id */
4280 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4281 sizeof (__le64) + /* snap_id */
4282 sizeof (__le64); /* overlap */
4283 reply_buf = kmalloc(size, GFP_KERNEL);
4284 if (!reply_buf) {
4285 ret = -ENOMEM;
4286 goto out_err;
4287 }
4288
4d9b67cd 4289 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
c41d13a3 4290 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
86b00e0d 4291 "rbd", "get_parent",
4157976b 4292 &snapid, sizeof (snapid),
e2a58ee5 4293 reply_buf, size);
36be9a76 4294 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
4295 if (ret < 0)
4296 goto out_err;
4297
86b00e0d 4298 p = reply_buf;
57385b51
AE
4299 end = reply_buf + ret;
4300 ret = -ERANGE;
642a2537 4301 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
4302 if (pool_id == CEPH_NOPOOL) {
4303 /*
4304 * Either the parent never existed, or we have
4305 * record of it but the image got flattened so it no
4306 * longer has a parent. When the parent of a
4307 * layered image disappears we immediately set the
4308 * overlap to 0. The effect of this is that all new
4309 * requests will be treated as if the image had no
4310 * parent.
4311 */
4312 if (rbd_dev->parent_overlap) {
4313 rbd_dev->parent_overlap = 0;
392a9dad
AE
4314 rbd_dev_parent_put(rbd_dev);
4315 pr_info("%s: clone image has been flattened\n",
4316 rbd_dev->disk->disk_name);
4317 }
4318
86b00e0d 4319 goto out; /* No parent? No problem. */
392a9dad 4320 }
86b00e0d 4321
0903e875
AE
4322 /* The ceph file layout needs to fit pool id in 32 bits */
4323
4324 ret = -EIO;
642a2537 4325 if (pool_id > (u64)U32_MAX) {
9584d508 4326 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 4327 (unsigned long long)pool_id, U32_MAX);
57385b51 4328 goto out_err;
c0cd10db 4329 }
0903e875 4330
979ed480 4331 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
4332 if (IS_ERR(image_id)) {
4333 ret = PTR_ERR(image_id);
4334 goto out_err;
4335 }
3b5cf2a2 4336 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
4337 ceph_decode_64_safe(&p, end, overlap, out_err);
4338
3b5cf2a2
AE
4339 /*
4340 * The parent won't change (except when the clone is
4341 * flattened, already handled that). So we only need to
4342 * record the parent spec we have not already done so.
4343 */
4344 if (!rbd_dev->parent_spec) {
4345 parent_spec->pool_id = pool_id;
4346 parent_spec->image_id = image_id;
4347 parent_spec->snap_id = snap_id;
70cf49cf
AE
4348 rbd_dev->parent_spec = parent_spec;
4349 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
4350 } else {
4351 kfree(image_id);
3b5cf2a2
AE
4352 }
4353
4354 /*
cf32bd9c
ID
4355 * We always update the parent overlap. If it's zero we issue
4356 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 4357 */
3b5cf2a2 4358 if (!overlap) {
3b5cf2a2 4359 if (parent_spec) {
cf32bd9c
ID
4360 /* refresh, careful to warn just once */
4361 if (rbd_dev->parent_overlap)
4362 rbd_warn(rbd_dev,
4363 "clone now standalone (overlap became 0)");
3b5cf2a2 4364 } else {
cf32bd9c
ID
4365 /* initial probe */
4366 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 4367 }
70cf49cf 4368 }
cf32bd9c
ID
4369 rbd_dev->parent_overlap = overlap;
4370
86b00e0d
AE
4371out:
4372 ret = 0;
4373out_err:
4374 kfree(reply_buf);
4375 rbd_spec_put(parent_spec);
4376
4377 return ret;
4378}
4379
cc070d59
AE
4380static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4381{
4382 struct {
4383 __le64 stripe_unit;
4384 __le64 stripe_count;
4385 } __attribute__ ((packed)) striping_info_buf = { 0 };
4386 size_t size = sizeof (striping_info_buf);
4387 void *p;
4388 u64 obj_size;
4389 u64 stripe_unit;
4390 u64 stripe_count;
4391 int ret;
4392
c41d13a3 4393 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
cc070d59 4394 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 4395 (char *)&striping_info_buf, size);
cc070d59
AE
4396 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4397 if (ret < 0)
4398 return ret;
4399 if (ret < size)
4400 return -ERANGE;
4401
4402 /*
4403 * We don't actually support the "fancy striping" feature
4404 * (STRIPINGV2) yet, but if the striping sizes are the
4405 * defaults the behavior is the same as before. So find
4406 * out, and only fail if the image has non-default values.
4407 */
4408 ret = -EINVAL;
4409 obj_size = (u64)1 << rbd_dev->header.obj_order;
4410 p = &striping_info_buf;
4411 stripe_unit = ceph_decode_64(&p);
4412 if (stripe_unit != obj_size) {
4413 rbd_warn(rbd_dev, "unsupported stripe unit "
4414 "(got %llu want %llu)",
4415 stripe_unit, obj_size);
4416 return -EINVAL;
4417 }
4418 stripe_count = ceph_decode_64(&p);
4419 if (stripe_count != 1) {
4420 rbd_warn(rbd_dev, "unsupported stripe count "
4421 "(got %llu want 1)", stripe_count);
4422 return -EINVAL;
4423 }
500d0c0f
AE
4424 rbd_dev->header.stripe_unit = stripe_unit;
4425 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
4426
4427 return 0;
4428}
4429
9e15b77d
AE
4430static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4431{
4432 size_t image_id_size;
4433 char *image_id;
4434 void *p;
4435 void *end;
4436 size_t size;
4437 void *reply_buf = NULL;
4438 size_t len = 0;
4439 char *image_name = NULL;
4440 int ret;
4441
4442 rbd_assert(!rbd_dev->spec->image_name);
4443
69e7a02f
AE
4444 len = strlen(rbd_dev->spec->image_id);
4445 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4446 image_id = kmalloc(image_id_size, GFP_KERNEL);
4447 if (!image_id)
4448 return NULL;
4449
4450 p = image_id;
4157976b 4451 end = image_id + image_id_size;
57385b51 4452 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4453
4454 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4455 reply_buf = kmalloc(size, GFP_KERNEL);
4456 if (!reply_buf)
4457 goto out;
4458
36be9a76 4459 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4460 "rbd", "dir_get_name",
4461 image_id, image_id_size,
e2a58ee5 4462 reply_buf, size);
9e15b77d
AE
4463 if (ret < 0)
4464 goto out;
4465 p = reply_buf;
f40eb349
AE
4466 end = reply_buf + ret;
4467
9e15b77d
AE
4468 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4469 if (IS_ERR(image_name))
4470 image_name = NULL;
4471 else
4472 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4473out:
4474 kfree(reply_buf);
4475 kfree(image_id);
4476
4477 return image_name;
4478}
4479
2ad3d716
AE
4480static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4481{
4482 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4483 const char *snap_name;
4484 u32 which = 0;
4485
4486 /* Skip over names until we find the one we are looking for */
4487
4488 snap_name = rbd_dev->header.snap_names;
4489 while (which < snapc->num_snaps) {
4490 if (!strcmp(name, snap_name))
4491 return snapc->snaps[which];
4492 snap_name += strlen(snap_name) + 1;
4493 which++;
4494 }
4495 return CEPH_NOSNAP;
4496}
4497
4498static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4499{
4500 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4501 u32 which;
4502 bool found = false;
4503 u64 snap_id;
4504
4505 for (which = 0; !found && which < snapc->num_snaps; which++) {
4506 const char *snap_name;
4507
4508 snap_id = snapc->snaps[which];
4509 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4510 if (IS_ERR(snap_name)) {
4511 /* ignore no-longer existing snapshots */
4512 if (PTR_ERR(snap_name) == -ENOENT)
4513 continue;
4514 else
4515 break;
4516 }
2ad3d716
AE
4517 found = !strcmp(name, snap_name);
4518 kfree(snap_name);
4519 }
4520 return found ? snap_id : CEPH_NOSNAP;
4521}
4522
4523/*
4524 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4525 * no snapshot by that name is found, or if an error occurs.
4526 */
4527static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4528{
4529 if (rbd_dev->image_format == 1)
4530 return rbd_v1_snap_id_by_name(rbd_dev, name);
4531
4532 return rbd_v2_snap_id_by_name(rbd_dev, name);
4533}
4534
9e15b77d 4535/*
04077599
ID
4536 * An image being mapped will have everything but the snap id.
4537 */
4538static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4539{
4540 struct rbd_spec *spec = rbd_dev->spec;
4541
4542 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4543 rbd_assert(spec->image_id && spec->image_name);
4544 rbd_assert(spec->snap_name);
4545
4546 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4547 u64 snap_id;
4548
4549 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4550 if (snap_id == CEPH_NOSNAP)
4551 return -ENOENT;
4552
4553 spec->snap_id = snap_id;
4554 } else {
4555 spec->snap_id = CEPH_NOSNAP;
4556 }
4557
4558 return 0;
4559}
4560
4561/*
4562 * A parent image will have all ids but none of the names.
e1d4213f 4563 *
04077599
ID
4564 * All names in an rbd spec are dynamically allocated. It's OK if we
4565 * can't figure out the name for an image id.
9e15b77d 4566 */
04077599 4567static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 4568{
2e9f7f1c
AE
4569 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4570 struct rbd_spec *spec = rbd_dev->spec;
4571 const char *pool_name;
4572 const char *image_name;
4573 const char *snap_name;
9e15b77d
AE
4574 int ret;
4575
04077599
ID
4576 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4577 rbd_assert(spec->image_id);
4578 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 4579
2e9f7f1c 4580 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4581
2e9f7f1c
AE
4582 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4583 if (!pool_name) {
4584 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4585 return -EIO;
4586 }
2e9f7f1c
AE
4587 pool_name = kstrdup(pool_name, GFP_KERNEL);
4588 if (!pool_name)
9e15b77d
AE
4589 return -ENOMEM;
4590
4591 /* Fetch the image name; tolerate failure here */
4592
2e9f7f1c
AE
4593 image_name = rbd_dev_image_name(rbd_dev);
4594 if (!image_name)
06ecc6cb 4595 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4596
04077599 4597 /* Fetch the snapshot name */
9e15b77d 4598
2e9f7f1c 4599 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4600 if (IS_ERR(snap_name)) {
4601 ret = PTR_ERR(snap_name);
9e15b77d 4602 goto out_err;
2e9f7f1c
AE
4603 }
4604
4605 spec->pool_name = pool_name;
4606 spec->image_name = image_name;
4607 spec->snap_name = snap_name;
9e15b77d
AE
4608
4609 return 0;
04077599 4610
9e15b77d 4611out_err:
2e9f7f1c
AE
4612 kfree(image_name);
4613 kfree(pool_name);
9e15b77d
AE
4614 return ret;
4615}
4616
cc4a38bd 4617static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4618{
4619 size_t size;
4620 int ret;
4621 void *reply_buf;
4622 void *p;
4623 void *end;
4624 u64 seq;
4625 u32 snap_count;
4626 struct ceph_snap_context *snapc;
4627 u32 i;
4628
4629 /*
4630 * We'll need room for the seq value (maximum snapshot id),
4631 * snapshot count, and array of that many snapshot ids.
4632 * For now we have a fixed upper limit on the number we're
4633 * prepared to receive.
4634 */
4635 size = sizeof (__le64) + sizeof (__le32) +
4636 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4637 reply_buf = kzalloc(size, GFP_KERNEL);
4638 if (!reply_buf)
4639 return -ENOMEM;
4640
c41d13a3 4641 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 4642 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4643 reply_buf, size);
36be9a76 4644 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4645 if (ret < 0)
4646 goto out;
4647
35d489f9 4648 p = reply_buf;
57385b51
AE
4649 end = reply_buf + ret;
4650 ret = -ERANGE;
35d489f9
AE
4651 ceph_decode_64_safe(&p, end, seq, out);
4652 ceph_decode_32_safe(&p, end, snap_count, out);
4653
4654 /*
4655 * Make sure the reported number of snapshot ids wouldn't go
4656 * beyond the end of our buffer. But before checking that,
4657 * make sure the computed size of the snapshot context we
4658 * allocate is representable in a size_t.
4659 */
4660 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4661 / sizeof (u64)) {
4662 ret = -EINVAL;
4663 goto out;
4664 }
4665 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4666 goto out;
468521c1 4667 ret = 0;
35d489f9 4668
812164f8 4669 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4670 if (!snapc) {
4671 ret = -ENOMEM;
4672 goto out;
4673 }
35d489f9 4674 snapc->seq = seq;
35d489f9
AE
4675 for (i = 0; i < snap_count; i++)
4676 snapc->snaps[i] = ceph_decode_64(&p);
4677
49ece554 4678 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4679 rbd_dev->header.snapc = snapc;
4680
4681 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4682 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4683out:
4684 kfree(reply_buf);
4685
57385b51 4686 return ret;
35d489f9
AE
4687}
4688
54cac61f
AE
4689static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4690 u64 snap_id)
b8b1e2db
AE
4691{
4692 size_t size;
4693 void *reply_buf;
54cac61f 4694 __le64 snapid;
b8b1e2db
AE
4695 int ret;
4696 void *p;
4697 void *end;
b8b1e2db
AE
4698 char *snap_name;
4699
4700 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4701 reply_buf = kmalloc(size, GFP_KERNEL);
4702 if (!reply_buf)
4703 return ERR_PTR(-ENOMEM);
4704
54cac61f 4705 snapid = cpu_to_le64(snap_id);
c41d13a3 4706 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b8b1e2db 4707 "rbd", "get_snapshot_name",
54cac61f 4708 &snapid, sizeof (snapid),
e2a58ee5 4709 reply_buf, size);
36be9a76 4710 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4711 if (ret < 0) {
4712 snap_name = ERR_PTR(ret);
b8b1e2db 4713 goto out;
f40eb349 4714 }
b8b1e2db
AE
4715
4716 p = reply_buf;
f40eb349 4717 end = reply_buf + ret;
e5c35534 4718 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4719 if (IS_ERR(snap_name))
b8b1e2db 4720 goto out;
b8b1e2db 4721
f40eb349 4722 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4723 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4724out:
4725 kfree(reply_buf);
4726
f40eb349 4727 return snap_name;
b8b1e2db
AE
4728}
4729
2df3fac7 4730static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4731{
2df3fac7 4732 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4733 int ret;
117973fb 4734
1617e40c
JD
4735 ret = rbd_dev_v2_image_size(rbd_dev);
4736 if (ret)
cfbf6377 4737 return ret;
1617e40c 4738
2df3fac7
AE
4739 if (first_time) {
4740 ret = rbd_dev_v2_header_onetime(rbd_dev);
4741 if (ret)
cfbf6377 4742 return ret;
2df3fac7
AE
4743 }
4744
cc4a38bd 4745 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
4746 if (ret && first_time) {
4747 kfree(rbd_dev->header.object_prefix);
4748 rbd_dev->header.object_prefix = NULL;
4749 }
117973fb
AE
4750
4751 return ret;
4752}
4753
a720ae09
ID
4754static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4755{
4756 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4757
4758 if (rbd_dev->image_format == 1)
4759 return rbd_dev_v1_header_info(rbd_dev);
4760
4761 return rbd_dev_v2_header_info(rbd_dev);
4762}
4763
e28fff26
AE
4764/*
4765 * Skips over white space at *buf, and updates *buf to point to the
4766 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4767 * the token (string of non-white space characters) found. Note
4768 * that *buf must be terminated with '\0'.
e28fff26
AE
4769 */
4770static inline size_t next_token(const char **buf)
4771{
4772 /*
4773 * These are the characters that produce nonzero for
4774 * isspace() in the "C" and "POSIX" locales.
4775 */
4776 const char *spaces = " \f\n\r\t\v";
4777
4778 *buf += strspn(*buf, spaces); /* Find start of token */
4779
4780 return strcspn(*buf, spaces); /* Return token length */
4781}
4782
ea3352f4
AE
4783/*
4784 * Finds the next token in *buf, dynamically allocates a buffer big
4785 * enough to hold a copy of it, and copies the token into the new
4786 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4787 * that a duplicate buffer is created even for a zero-length token.
4788 *
4789 * Returns a pointer to the newly-allocated duplicate, or a null
4790 * pointer if memory for the duplicate was not available. If
4791 * the lenp argument is a non-null pointer, the length of the token
4792 * (not including the '\0') is returned in *lenp.
4793 *
4794 * If successful, the *buf pointer will be updated to point beyond
4795 * the end of the found token.
4796 *
4797 * Note: uses GFP_KERNEL for allocation.
4798 */
4799static inline char *dup_token(const char **buf, size_t *lenp)
4800{
4801 char *dup;
4802 size_t len;
4803
4804 len = next_token(buf);
4caf35f9 4805 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4806 if (!dup)
4807 return NULL;
ea3352f4
AE
4808 *(dup + len) = '\0';
4809 *buf += len;
4810
4811 if (lenp)
4812 *lenp = len;
4813
4814 return dup;
4815}
4816
a725f65e 4817/*
859c31df
AE
4818 * Parse the options provided for an "rbd add" (i.e., rbd image
4819 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4820 * and the data written is passed here via a NUL-terminated buffer.
4821 * Returns 0 if successful or an error code otherwise.
d22f76e7 4822 *
859c31df
AE
4823 * The information extracted from these options is recorded in
4824 * the other parameters which return dynamically-allocated
4825 * structures:
4826 * ceph_opts
4827 * The address of a pointer that will refer to a ceph options
4828 * structure. Caller must release the returned pointer using
4829 * ceph_destroy_options() when it is no longer needed.
4830 * rbd_opts
4831 * Address of an rbd options pointer. Fully initialized by
4832 * this function; caller must release with kfree().
4833 * spec
4834 * Address of an rbd image specification pointer. Fully
4835 * initialized by this function based on parsed options.
4836 * Caller must release with rbd_spec_put().
4837 *
4838 * The options passed take this form:
4839 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4840 * where:
4841 * <mon_addrs>
4842 * A comma-separated list of one or more monitor addresses.
4843 * A monitor address is an ip address, optionally followed
4844 * by a port number (separated by a colon).
4845 * I.e.: ip1[:port1][,ip2[:port2]...]
4846 * <options>
4847 * A comma-separated list of ceph and/or rbd options.
4848 * <pool_name>
4849 * The name of the rados pool containing the rbd image.
4850 * <image_name>
4851 * The name of the image in that pool to map.
4852 * <snap_id>
4853 * An optional snapshot id. If provided, the mapping will
4854 * present data from the image at the time that snapshot was
4855 * created. The image head is used if no snapshot id is
4856 * provided. Snapshot mappings are always read-only.
a725f65e 4857 */
859c31df 4858static int rbd_add_parse_args(const char *buf,
dc79b113 4859 struct ceph_options **ceph_opts,
859c31df
AE
4860 struct rbd_options **opts,
4861 struct rbd_spec **rbd_spec)
e28fff26 4862{
d22f76e7 4863 size_t len;
859c31df 4864 char *options;
0ddebc0c 4865 const char *mon_addrs;
ecb4dc22 4866 char *snap_name;
0ddebc0c 4867 size_t mon_addrs_size;
859c31df 4868 struct rbd_spec *spec = NULL;
4e9afeba 4869 struct rbd_options *rbd_opts = NULL;
859c31df 4870 struct ceph_options *copts;
dc79b113 4871 int ret;
e28fff26
AE
4872
4873 /* The first four tokens are required */
4874
7ef3214a 4875 len = next_token(&buf);
4fb5d671
AE
4876 if (!len) {
4877 rbd_warn(NULL, "no monitor address(es) provided");
4878 return -EINVAL;
4879 }
0ddebc0c 4880 mon_addrs = buf;
f28e565a 4881 mon_addrs_size = len + 1;
7ef3214a 4882 buf += len;
a725f65e 4883
dc79b113 4884 ret = -EINVAL;
f28e565a
AE
4885 options = dup_token(&buf, NULL);
4886 if (!options)
dc79b113 4887 return -ENOMEM;
4fb5d671
AE
4888 if (!*options) {
4889 rbd_warn(NULL, "no options provided");
4890 goto out_err;
4891 }
e28fff26 4892
859c31df
AE
4893 spec = rbd_spec_alloc();
4894 if (!spec)
f28e565a 4895 goto out_mem;
859c31df
AE
4896
4897 spec->pool_name = dup_token(&buf, NULL);
4898 if (!spec->pool_name)
4899 goto out_mem;
4fb5d671
AE
4900 if (!*spec->pool_name) {
4901 rbd_warn(NULL, "no pool name provided");
4902 goto out_err;
4903 }
e28fff26 4904
69e7a02f 4905 spec->image_name = dup_token(&buf, NULL);
859c31df 4906 if (!spec->image_name)
f28e565a 4907 goto out_mem;
4fb5d671
AE
4908 if (!*spec->image_name) {
4909 rbd_warn(NULL, "no image name provided");
4910 goto out_err;
4911 }
d4b125e9 4912
f28e565a
AE
4913 /*
4914 * Snapshot name is optional; default is to use "-"
4915 * (indicating the head/no snapshot).
4916 */
3feeb894 4917 len = next_token(&buf);
820a5f3e 4918 if (!len) {
3feeb894
AE
4919 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4920 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4921 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4922 ret = -ENAMETOOLONG;
f28e565a 4923 goto out_err;
849b4260 4924 }
ecb4dc22
AE
4925 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4926 if (!snap_name)
f28e565a 4927 goto out_mem;
ecb4dc22
AE
4928 *(snap_name + len) = '\0';
4929 spec->snap_name = snap_name;
e5c35534 4930
0ddebc0c 4931 /* Initialize all rbd options to the defaults */
e28fff26 4932
4e9afeba
AE
4933 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4934 if (!rbd_opts)
4935 goto out_mem;
4936
4937 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 4938 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
d22f76e7 4939
859c31df 4940 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4941 mon_addrs + mon_addrs_size - 1,
4e9afeba 4942 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4943 if (IS_ERR(copts)) {
4944 ret = PTR_ERR(copts);
dc79b113
AE
4945 goto out_err;
4946 }
859c31df
AE
4947 kfree(options);
4948
4949 *ceph_opts = copts;
4e9afeba 4950 *opts = rbd_opts;
859c31df 4951 *rbd_spec = spec;
0ddebc0c 4952
dc79b113 4953 return 0;
f28e565a 4954out_mem:
dc79b113 4955 ret = -ENOMEM;
d22f76e7 4956out_err:
859c31df
AE
4957 kfree(rbd_opts);
4958 rbd_spec_put(spec);
f28e565a 4959 kfree(options);
d22f76e7 4960
dc79b113 4961 return ret;
a725f65e
AE
4962}
4963
30ba1f02
ID
4964/*
4965 * Return pool id (>= 0) or a negative error code.
4966 */
4967static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4968{
a319bf56 4969 struct ceph_options *opts = rbdc->client->options;
30ba1f02 4970 u64 newest_epoch;
30ba1f02
ID
4971 int tries = 0;
4972 int ret;
4973
4974again:
4975 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
4976 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
4977 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
4978 &newest_epoch);
30ba1f02
ID
4979 if (ret < 0)
4980 return ret;
4981
4982 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 4983 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 4984 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
4985 newest_epoch,
4986 opts->mount_timeout);
30ba1f02
ID
4987 goto again;
4988 } else {
4989 /* the osdmap we have is new enough */
4990 return -ENOENT;
4991 }
4992 }
4993
4994 return ret;
4995}
4996
589d30e0
AE
4997/*
4998 * An rbd format 2 image has a unique identifier, distinct from the
4999 * name given to it by the user. Internally, that identifier is
5000 * what's used to specify the names of objects related to the image.
5001 *
5002 * A special "rbd id" object is used to map an rbd image name to its
5003 * id. If that object doesn't exist, then there is no v2 rbd image
5004 * with the supplied name.
5005 *
5006 * This function will record the given rbd_dev's image_id field if
5007 * it can be determined, and in that case will return 0. If any
5008 * errors occur a negative errno will be returned and the rbd_dev's
5009 * image_id field will be unchanged (and should be NULL).
5010 */
5011static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5012{
5013 int ret;
5014 size_t size;
5015 char *object_name;
5016 void *response;
c0fba368 5017 char *image_id;
2f82ee54 5018
2c0d0a10
AE
5019 /*
5020 * When probing a parent image, the image id is already
5021 * known (and the image name likely is not). There's no
c0fba368
AE
5022 * need to fetch the image id again in this case. We
5023 * do still need to set the image format though.
2c0d0a10 5024 */
c0fba368
AE
5025 if (rbd_dev->spec->image_id) {
5026 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5027
2c0d0a10 5028 return 0;
c0fba368 5029 }
2c0d0a10 5030
589d30e0
AE
5031 /*
5032 * First, see if the format 2 image id file exists, and if
5033 * so, get the image's persistent id from it.
5034 */
69e7a02f 5035 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
5036 object_name = kmalloc(size, GFP_NOIO);
5037 if (!object_name)
5038 return -ENOMEM;
0d7dbfce 5039 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
5040 dout("rbd id object name is %s\n", object_name);
5041
5042 /* Response will be an encoded string, which includes a length */
5043
5044 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5045 response = kzalloc(size, GFP_NOIO);
5046 if (!response) {
5047 ret = -ENOMEM;
5048 goto out;
5049 }
5050
c0fba368
AE
5051 /* If it doesn't exist we'll assume it's a format 1 image */
5052
36be9a76 5053 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 5054 "rbd", "get_id", NULL, 0,
e2a58ee5 5055 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5056 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5057 if (ret == -ENOENT) {
5058 image_id = kstrdup("", GFP_KERNEL);
5059 ret = image_id ? 0 : -ENOMEM;
5060 if (!ret)
5061 rbd_dev->image_format = 1;
7dd440c9 5062 } else if (ret >= 0) {
c0fba368
AE
5063 void *p = response;
5064
5065 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5066 NULL, GFP_NOIO);
461f758a 5067 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5068 if (!ret)
5069 rbd_dev->image_format = 2;
c0fba368
AE
5070 }
5071
5072 if (!ret) {
5073 rbd_dev->spec->image_id = image_id;
5074 dout("image_id is %s\n", image_id);
589d30e0
AE
5075 }
5076out:
5077 kfree(response);
5078 kfree(object_name);
5079
5080 return ret;
5081}
5082
3abef3b3
AE
5083/*
5084 * Undo whatever state changes are made by v1 or v2 header info
5085 * call.
5086 */
6fd48b3b
AE
5087static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5088{
5089 struct rbd_image_header *header;
5090
e69b8d41 5091 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5092
5093 /* Free dynamic fields from the header, then zero it out */
5094
5095 header = &rbd_dev->header;
812164f8 5096 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5097 kfree(header->snap_sizes);
5098 kfree(header->snap_names);
5099 kfree(header->object_prefix);
5100 memset(header, 0, sizeof (*header));
5101}
5102
2df3fac7 5103static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5104{
5105 int ret;
a30b71b9 5106
1e130199 5107 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5108 if (ret)
b1b5402a
AE
5109 goto out_err;
5110
2df3fac7
AE
5111 /*
5112 * Get the and check features for the image. Currently the
5113 * features are assumed to never change.
5114 */
b1b5402a 5115 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5116 if (ret)
9d475de5 5117 goto out_err;
35d489f9 5118
cc070d59
AE
5119 /* If the image supports fancy striping, get its parameters */
5120
5121 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5122 ret = rbd_dev_v2_striping_info(rbd_dev);
5123 if (ret < 0)
5124 goto out_err;
5125 }
2df3fac7 5126 /* No support for crypto and compression type format 2 images */
a30b71b9 5127
35152979 5128 return 0;
9d475de5 5129out_err:
642a2537 5130 rbd_dev->header.features = 0;
1e130199
AE
5131 kfree(rbd_dev->header.object_prefix);
5132 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
5133
5134 return ret;
a30b71b9
AE
5135}
5136
6d69bb53
ID
5137/*
5138 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5139 * rbd_dev_image_probe() recursion depth, which means it's also the
5140 * length of the already discovered part of the parent chain.
5141 */
5142static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5143{
2f82ee54 5144 struct rbd_device *parent = NULL;
124afba2
AE
5145 int ret;
5146
5147 if (!rbd_dev->parent_spec)
5148 return 0;
124afba2 5149
6d69bb53
ID
5150 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5151 pr_info("parent chain is too long (%d)\n", depth);
5152 ret = -EINVAL;
5153 goto out_err;
5154 }
5155
1643dfa4 5156 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5157 if (!parent) {
5158 ret = -ENOMEM;
124afba2 5159 goto out_err;
1f2c6651
ID
5160 }
5161
5162 /*
5163 * Images related by parent/child relationships always share
5164 * rbd_client and spec/parent_spec, so bump their refcounts.
5165 */
5166 __rbd_get_client(rbd_dev->rbd_client);
5167 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5168
6d69bb53 5169 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5170 if (ret < 0)
5171 goto out_err;
1f2c6651 5172
124afba2 5173 rbd_dev->parent = parent;
a2acd00e 5174 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5175 return 0;
1f2c6651 5176
124afba2 5177out_err:
1f2c6651 5178 rbd_dev_unparent(rbd_dev);
1761b229 5179 rbd_dev_destroy(parent);
124afba2
AE
5180 return ret;
5181}
5182
811c6688
ID
5183/*
5184 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5185 * upon return.
5186 */
200a6a8b 5187static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5188{
83a06263 5189 int ret;
d1cf5788 5190
9b60e70b 5191 /* Record our major and minor device numbers. */
83a06263 5192
9b60e70b
ID
5193 if (!single_major) {
5194 ret = register_blkdev(0, rbd_dev->name);
5195 if (ret < 0)
1643dfa4 5196 goto err_out_unlock;
9b60e70b
ID
5197
5198 rbd_dev->major = ret;
5199 rbd_dev->minor = 0;
5200 } else {
5201 rbd_dev->major = rbd_major;
5202 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5203 }
83a06263
AE
5204
5205 /* Set up the blkdev mapping. */
5206
5207 ret = rbd_init_disk(rbd_dev);
5208 if (ret)
5209 goto err_out_blkdev;
5210
f35a4dee 5211 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5212 if (ret)
5213 goto err_out_disk;
bc1ecc65 5214
f35a4dee 5215 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
22001f61 5216 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
f35a4dee 5217
dd5ac32d
ID
5218 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5219 ret = device_add(&rbd_dev->dev);
f35a4dee 5220 if (ret)
f5ee37bd 5221 goto err_out_mapping;
83a06263 5222
83a06263
AE
5223 /* Everything's ready. Announce the disk to the world. */
5224
129b79d4 5225 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5226 up_write(&rbd_dev->header_rwsem);
83a06263 5227
1643dfa4
ID
5228 spin_lock(&rbd_dev_list_lock);
5229 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5230 spin_unlock(&rbd_dev_list_lock);
5231
811c6688 5232 add_disk(rbd_dev->disk);
83a06263
AE
5233 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5234 (unsigned long long) rbd_dev->mapping.size);
5235
5236 return ret;
2f82ee54 5237
f35a4dee
AE
5238err_out_mapping:
5239 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5240err_out_disk:
5241 rbd_free_disk(rbd_dev);
5242err_out_blkdev:
9b60e70b
ID
5243 if (!single_major)
5244 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5245err_out_unlock:
5246 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5247 return ret;
5248}
5249
332bb12d
AE
5250static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5251{
5252 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5253 int ret;
332bb12d
AE
5254
5255 /* Record the header object name for this rbd image. */
5256
5257 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5258
7627151e 5259 rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
332bb12d 5260 if (rbd_dev->image_format == 1)
c41d13a3
ID
5261 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5262 spec->image_name, RBD_SUFFIX);
332bb12d 5263 else
c41d13a3
ID
5264 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5265 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5266
c41d13a3 5267 return ret;
332bb12d
AE
5268}
5269
200a6a8b
AE
5270static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5271{
6fd48b3b 5272 rbd_dev_unprobe(rbd_dev);
6fd48b3b
AE
5273 rbd_dev->image_format = 0;
5274 kfree(rbd_dev->spec->image_id);
5275 rbd_dev->spec->image_id = NULL;
5276
200a6a8b
AE
5277 rbd_dev_destroy(rbd_dev);
5278}
5279
a30b71b9
AE
5280/*
5281 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
5282 * device. If this image is the one being mapped (i.e., not a
5283 * parent), initiate a watch on its header object before using that
5284 * object to get detailed information about the rbd image.
a30b71b9 5285 */
6d69bb53 5286static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
5287{
5288 int ret;
5289
5290 /*
3abef3b3
AE
5291 * Get the id from the image id object. Unless there's an
5292 * error, rbd_dev->spec->image_id will be filled in with
5293 * a dynamically-allocated string, and rbd_dev->image_format
5294 * will be set to either 1 or 2.
a30b71b9
AE
5295 */
5296 ret = rbd_dev_image_id(rbd_dev);
5297 if (ret)
c0fba368 5298 return ret;
c0fba368 5299
332bb12d
AE
5300 ret = rbd_dev_header_name(rbd_dev);
5301 if (ret)
5302 goto err_out_format;
5303
6d69bb53 5304 if (!depth) {
99d16943 5305 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
5306 if (ret) {
5307 if (ret == -ENOENT)
5308 pr_info("image %s/%s does not exist\n",
5309 rbd_dev->spec->pool_name,
5310 rbd_dev->spec->image_name);
c41d13a3 5311 goto err_out_format;
1fe48023 5312 }
1f3ef788 5313 }
b644de2b 5314
a720ae09 5315 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 5316 if (ret)
b644de2b 5317 goto err_out_watch;
83a06263 5318
04077599
ID
5319 /*
5320 * If this image is the one being mapped, we have pool name and
5321 * id, image name and id, and snap name - need to fill snap id.
5322 * Otherwise this is a parent image, identified by pool, image
5323 * and snap ids - need to fill in names for those ids.
5324 */
6d69bb53 5325 if (!depth)
04077599
ID
5326 ret = rbd_spec_fill_snap_id(rbd_dev);
5327 else
5328 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
5329 if (ret) {
5330 if (ret == -ENOENT)
5331 pr_info("snap %s/%s@%s does not exist\n",
5332 rbd_dev->spec->pool_name,
5333 rbd_dev->spec->image_name,
5334 rbd_dev->spec->snap_name);
33dca39f 5335 goto err_out_probe;
1fe48023 5336 }
9bb81c9b 5337
e8f59b59
ID
5338 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5339 ret = rbd_dev_v2_parent_info(rbd_dev);
5340 if (ret)
5341 goto err_out_probe;
5342
5343 /*
5344 * Need to warn users if this image is the one being
5345 * mapped and has a parent.
5346 */
6d69bb53 5347 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
5348 rbd_warn(rbd_dev,
5349 "WARNING: kernel layering is EXPERIMENTAL!");
5350 }
5351
6d69bb53 5352 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
5353 if (ret)
5354 goto err_out_probe;
5355
5356 dout("discovered format %u image, header name is %s\n",
c41d13a3 5357 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 5358 return 0;
e8f59b59 5359
6fd48b3b
AE
5360err_out_probe:
5361 rbd_dev_unprobe(rbd_dev);
b644de2b 5362err_out_watch:
6d69bb53 5363 if (!depth)
99d16943 5364 rbd_unregister_watch(rbd_dev);
332bb12d
AE
5365err_out_format:
5366 rbd_dev->image_format = 0;
5655c4d9
AE
5367 kfree(rbd_dev->spec->image_id);
5368 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
5369 return ret;
5370}
5371
9b60e70b
ID
5372static ssize_t do_rbd_add(struct bus_type *bus,
5373 const char *buf,
5374 size_t count)
602adf40 5375{
cb8627c7 5376 struct rbd_device *rbd_dev = NULL;
dc79b113 5377 struct ceph_options *ceph_opts = NULL;
4e9afeba 5378 struct rbd_options *rbd_opts = NULL;
859c31df 5379 struct rbd_spec *spec = NULL;
9d3997fd 5380 struct rbd_client *rbdc;
51344a38 5381 bool read_only;
b51c83c2 5382 int rc;
602adf40
YS
5383
5384 if (!try_module_get(THIS_MODULE))
5385 return -ENODEV;
5386
602adf40 5387 /* parse add command */
859c31df 5388 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5389 if (rc < 0)
dd5ac32d 5390 goto out;
78cea76e 5391
9d3997fd
AE
5392 rbdc = rbd_get_client(ceph_opts);
5393 if (IS_ERR(rbdc)) {
5394 rc = PTR_ERR(rbdc);
0ddebc0c 5395 goto err_out_args;
9d3997fd 5396 }
602adf40 5397
602adf40 5398 /* pick the pool */
30ba1f02 5399 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
5400 if (rc < 0) {
5401 if (rc == -ENOENT)
5402 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 5403 goto err_out_client;
1fe48023 5404 }
c0cd10db 5405 spec->pool_id = (u64)rc;
859c31df 5406
d147543d 5407 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
5408 if (!rbd_dev) {
5409 rc = -ENOMEM;
bd4ba655 5410 goto err_out_client;
b51c83c2 5411 }
c53d5893
AE
5412 rbdc = NULL; /* rbd_dev now owns this */
5413 spec = NULL; /* rbd_dev now owns this */
d147543d 5414 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 5415
811c6688 5416 down_write(&rbd_dev->header_rwsem);
6d69bb53 5417 rc = rbd_dev_image_probe(rbd_dev, 0);
a30b71b9 5418 if (rc < 0)
c53d5893 5419 goto err_out_rbd_dev;
05fd6f6f 5420
7ce4eef7
AE
5421 /* If we are mapping a snapshot it must be marked read-only */
5422
d147543d 5423 read_only = rbd_dev->opts->read_only;
7ce4eef7
AE
5424 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5425 read_only = true;
5426 rbd_dev->mapping.read_only = read_only;
5427
b536f69a 5428 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3 5429 if (rc) {
e37180c0 5430 /*
99d16943 5431 * rbd_unregister_watch() can't be moved into
e37180c0
ID
5432 * rbd_dev_image_release() without refactoring, see
5433 * commit 1f3ef78861ac.
5434 */
99d16943 5435 rbd_unregister_watch(rbd_dev);
3abef3b3 5436 rbd_dev_image_release(rbd_dev);
dd5ac32d 5437 goto out;
3abef3b3
AE
5438 }
5439
dd5ac32d
ID
5440 rc = count;
5441out:
5442 module_put(THIS_MODULE);
5443 return rc;
b536f69a 5444
c53d5893 5445err_out_rbd_dev:
811c6688 5446 up_write(&rbd_dev->header_rwsem);
c53d5893 5447 rbd_dev_destroy(rbd_dev);
bd4ba655 5448err_out_client:
9d3997fd 5449 rbd_put_client(rbdc);
0ddebc0c 5450err_out_args:
859c31df 5451 rbd_spec_put(spec);
d147543d 5452 kfree(rbd_opts);
dd5ac32d 5453 goto out;
602adf40
YS
5454}
5455
9b60e70b
ID
5456static ssize_t rbd_add(struct bus_type *bus,
5457 const char *buf,
5458 size_t count)
5459{
5460 if (single_major)
5461 return -EINVAL;
5462
5463 return do_rbd_add(bus, buf, count);
5464}
5465
5466static ssize_t rbd_add_single_major(struct bus_type *bus,
5467 const char *buf,
5468 size_t count)
5469{
5470 return do_rbd_add(bus, buf, count);
5471}
5472
dd5ac32d 5473static void rbd_dev_device_release(struct rbd_device *rbd_dev)
602adf40 5474{
602adf40 5475 rbd_free_disk(rbd_dev);
1643dfa4
ID
5476
5477 spin_lock(&rbd_dev_list_lock);
5478 list_del_init(&rbd_dev->node);
5479 spin_unlock(&rbd_dev_list_lock);
5480
200a6a8b 5481 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
dd5ac32d 5482 device_del(&rbd_dev->dev);
6d80b130 5483 rbd_dev_mapping_clear(rbd_dev);
9b60e70b
ID
5484 if (!single_major)
5485 unregister_blkdev(rbd_dev->major, rbd_dev->name);
602adf40
YS
5486}
5487
05a46afd
AE
5488static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5489{
ad945fc1 5490 while (rbd_dev->parent) {
05a46afd
AE
5491 struct rbd_device *first = rbd_dev;
5492 struct rbd_device *second = first->parent;
5493 struct rbd_device *third;
5494
5495 /*
5496 * Follow to the parent with no grandparent and
5497 * remove it.
5498 */
5499 while (second && (third = second->parent)) {
5500 first = second;
5501 second = third;
5502 }
ad945fc1 5503 rbd_assert(second);
8ad42cd0 5504 rbd_dev_image_release(second);
ad945fc1
AE
5505 first->parent = NULL;
5506 first->parent_overlap = 0;
5507
5508 rbd_assert(first->parent_spec);
05a46afd
AE
5509 rbd_spec_put(first->parent_spec);
5510 first->parent_spec = NULL;
05a46afd
AE
5511 }
5512}
5513
9b60e70b
ID
5514static ssize_t do_rbd_remove(struct bus_type *bus,
5515 const char *buf,
5516 size_t count)
602adf40
YS
5517{
5518 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5519 struct list_head *tmp;
5520 int dev_id;
602adf40 5521 unsigned long ul;
82a442d2 5522 bool already = false;
0d8189e1 5523 int ret;
602adf40 5524
bb8e0e84 5525 ret = kstrtoul(buf, 10, &ul);
0d8189e1
AE
5526 if (ret)
5527 return ret;
602adf40
YS
5528
5529 /* convert to int; abort if we lost anything in the conversion */
751cc0e3
AE
5530 dev_id = (int)ul;
5531 if (dev_id != ul)
602adf40
YS
5532 return -EINVAL;
5533
751cc0e3
AE
5534 ret = -ENOENT;
5535 spin_lock(&rbd_dev_list_lock);
5536 list_for_each(tmp, &rbd_dev_list) {
5537 rbd_dev = list_entry(tmp, struct rbd_device, node);
5538 if (rbd_dev->dev_id == dev_id) {
5539 ret = 0;
5540 break;
5541 }
42382b70 5542 }
751cc0e3
AE
5543 if (!ret) {
5544 spin_lock_irq(&rbd_dev->lock);
5545 if (rbd_dev->open_count)
5546 ret = -EBUSY;
5547 else
82a442d2
AE
5548 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5549 &rbd_dev->flags);
751cc0e3
AE
5550 spin_unlock_irq(&rbd_dev->lock);
5551 }
5552 spin_unlock(&rbd_dev_list_lock);
82a442d2 5553 if (ret < 0 || already)
1ba0f1e7 5554 return ret;
751cc0e3 5555
99d16943 5556 rbd_unregister_watch(rbd_dev);
fca27065 5557
9875201e
JD
5558 /*
5559 * Don't free anything from rbd_dev->disk until after all
5560 * notifies are completely processed. Otherwise
5561 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5562 * in a potential use after free of rbd_dev->disk or rbd_dev.
5563 */
dd5ac32d 5564 rbd_dev_device_release(rbd_dev);
8ad42cd0 5565 rbd_dev_image_release(rbd_dev);
aafb230e 5566
1ba0f1e7 5567 return count;
602adf40
YS
5568}
5569
9b60e70b
ID
5570static ssize_t rbd_remove(struct bus_type *bus,
5571 const char *buf,
5572 size_t count)
5573{
5574 if (single_major)
5575 return -EINVAL;
5576
5577 return do_rbd_remove(bus, buf, count);
5578}
5579
5580static ssize_t rbd_remove_single_major(struct bus_type *bus,
5581 const char *buf,
5582 size_t count)
5583{
5584 return do_rbd_remove(bus, buf, count);
5585}
5586
602adf40
YS
5587/*
5588 * create control files in sysfs
dfc5606d 5589 * /sys/bus/rbd/...
602adf40
YS
5590 */
5591static int rbd_sysfs_init(void)
5592{
dfc5606d 5593 int ret;
602adf40 5594
fed4c143 5595 ret = device_register(&rbd_root_dev);
21079786 5596 if (ret < 0)
dfc5606d 5597 return ret;
602adf40 5598
fed4c143
AE
5599 ret = bus_register(&rbd_bus_type);
5600 if (ret < 0)
5601 device_unregister(&rbd_root_dev);
602adf40 5602
602adf40
YS
5603 return ret;
5604}
5605
5606static void rbd_sysfs_cleanup(void)
5607{
dfc5606d 5608 bus_unregister(&rbd_bus_type);
fed4c143 5609 device_unregister(&rbd_root_dev);
602adf40
YS
5610}
5611
1c2a9dfe
AE
5612static int rbd_slab_init(void)
5613{
5614 rbd_assert(!rbd_img_request_cache);
03d94406 5615 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
5616 if (!rbd_img_request_cache)
5617 return -ENOMEM;
5618
5619 rbd_assert(!rbd_obj_request_cache);
03d94406 5620 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
5621 if (!rbd_obj_request_cache)
5622 goto out_err;
5623
5624 rbd_assert(!rbd_segment_name_cache);
5625 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
2d0ebc5d 5626 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
78c2a44a 5627 if (rbd_segment_name_cache)
1c2a9dfe 5628 return 0;
78c2a44a 5629out_err:
13bf2834
JL
5630 kmem_cache_destroy(rbd_obj_request_cache);
5631 rbd_obj_request_cache = NULL;
1c2a9dfe 5632
868311b1
AE
5633 kmem_cache_destroy(rbd_img_request_cache);
5634 rbd_img_request_cache = NULL;
5635
1c2a9dfe
AE
5636 return -ENOMEM;
5637}
5638
5639static void rbd_slab_exit(void)
5640{
78c2a44a
AE
5641 rbd_assert(rbd_segment_name_cache);
5642 kmem_cache_destroy(rbd_segment_name_cache);
5643 rbd_segment_name_cache = NULL;
5644
868311b1
AE
5645 rbd_assert(rbd_obj_request_cache);
5646 kmem_cache_destroy(rbd_obj_request_cache);
5647 rbd_obj_request_cache = NULL;
5648
1c2a9dfe
AE
5649 rbd_assert(rbd_img_request_cache);
5650 kmem_cache_destroy(rbd_img_request_cache);
5651 rbd_img_request_cache = NULL;
5652}
5653
cc344fa1 5654static int __init rbd_init(void)
602adf40
YS
5655{
5656 int rc;
5657
1e32d34c
AE
5658 if (!libceph_compatible(NULL)) {
5659 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5660 return -EINVAL;
5661 }
e1b4d96d 5662
1c2a9dfe 5663 rc = rbd_slab_init();
602adf40
YS
5664 if (rc)
5665 return rc;
e1b4d96d 5666
f5ee37bd
ID
5667 /*
5668 * The number of active work items is limited by the number of
f77303bd 5669 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
5670 */
5671 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5672 if (!rbd_wq) {
5673 rc = -ENOMEM;
5674 goto err_out_slab;
5675 }
5676
9b60e70b
ID
5677 if (single_major) {
5678 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5679 if (rbd_major < 0) {
5680 rc = rbd_major;
f5ee37bd 5681 goto err_out_wq;
9b60e70b
ID
5682 }
5683 }
5684
1c2a9dfe
AE
5685 rc = rbd_sysfs_init();
5686 if (rc)
9b60e70b
ID
5687 goto err_out_blkdev;
5688
5689 if (single_major)
5690 pr_info("loaded (major %d)\n", rbd_major);
5691 else
5692 pr_info("loaded\n");
1c2a9dfe 5693
e1b4d96d
ID
5694 return 0;
5695
9b60e70b
ID
5696err_out_blkdev:
5697 if (single_major)
5698 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
5699err_out_wq:
5700 destroy_workqueue(rbd_wq);
e1b4d96d
ID
5701err_out_slab:
5702 rbd_slab_exit();
1c2a9dfe 5703 return rc;
602adf40
YS
5704}
5705
cc344fa1 5706static void __exit rbd_exit(void)
602adf40 5707{
ffe312cf 5708 ida_destroy(&rbd_dev_id_ida);
602adf40 5709 rbd_sysfs_cleanup();
9b60e70b
ID
5710 if (single_major)
5711 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 5712 destroy_workqueue(rbd_wq);
1c2a9dfe 5713 rbd_slab_exit();
602adf40
YS
5714}
5715
5716module_init(rbd_init);
5717module_exit(rbd_exit);
5718
d552c619 5719MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
5720MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5721MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
5722/* following authorship retained from original osdblk.c */
5723MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5724
90da258b 5725MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 5726MODULE_LICENSE("GPL");