libceph: don't WARN() if user tries to add invalid key
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
593a9e7b
AE
53/*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
a2acd00e
AE
62/*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81/* Decrement the counter. Return the resulting value, or -EINVAL */
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
f0f8cef5 95#define RBD_DRV_NAME "rbd"
602adf40 96
7e513d43
ID
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 99
6d69bb53
ID
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
d4b125e9
AE
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
35d489f9 106#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
107
108#define RBD_SNAP_HEAD_NAME "-"
109
9682fc6d
AE
110#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
9e15b77d
AE
112/* This allows a single page to hold an image name sent by OSD */
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 114#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 115
1e130199 116#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 117
ed95b21a 118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
d889140c
AE
121/* Feature bits */
122
8767b293
ID
123#define RBD_FEATURE_LAYERING (1ULL<<0)
124#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
126#define RBD_FEATURE_DATA_POOL (1ULL<<7)
127
ed95b21a
ID
128#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
129 RBD_FEATURE_STRIPINGV2 | \
7e97332e
ID
130 RBD_FEATURE_EXCLUSIVE_LOCK | \
131 RBD_FEATURE_DATA_POOL)
d889140c
AE
132
133/* Features supported by this (client software) implementation. */
134
770eba6e 135#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 136
81a89793
AE
137/*
138 * An RBD device name will be "rbd#", where the "rbd" comes from
139 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 140 */
602adf40
YS
141#define DEV_NAME_LEN 32
142
143/*
144 * block device image metadata (in-memory version)
145 */
146struct rbd_image_header {
f35a4dee 147 /* These six fields never change for a given rbd image */
849b4260 148 char *object_prefix;
602adf40 149 __u8 obj_order;
f35a4dee
AE
150 u64 stripe_unit;
151 u64 stripe_count;
7e97332e 152 s64 data_pool_id;
f35a4dee 153 u64 features; /* Might be changeable someday? */
602adf40 154
f84344f3
AE
155 /* The remaining fields need to be updated occasionally */
156 u64 image_size;
157 struct ceph_snap_context *snapc;
f35a4dee
AE
158 char *snap_names; /* format 1 only */
159 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
160};
161
0d7dbfce
AE
162/*
163 * An rbd image specification.
164 *
165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
166 * identify an image. Each rbd_dev structure includes a pointer to
167 * an rbd_spec structure that encapsulates this identity.
168 *
169 * Each of the id's in an rbd_spec has an associated name. For a
170 * user-mapped image, the names are supplied and the id's associated
171 * with them are looked up. For a layered image, a parent image is
172 * defined by the tuple, and the names are looked up.
173 *
174 * An rbd_dev structure contains a parent_spec pointer which is
175 * non-null if the image it represents is a child in a layered
176 * image. This pointer will refer to the rbd_spec structure used
177 * by the parent rbd_dev for its own identity (i.e., the structure
178 * is shared between the parent and child).
179 *
180 * Since these structures are populated once, during the discovery
181 * phase of image construction, they are effectively immutable so
182 * we make no effort to synchronize access to them.
183 *
184 * Note that code herein does not assume the image name is known (it
185 * could be a null pointer).
0d7dbfce
AE
186 */
187struct rbd_spec {
188 u64 pool_id;
ecb4dc22 189 const char *pool_name;
0d7dbfce 190
ecb4dc22
AE
191 const char *image_id;
192 const char *image_name;
0d7dbfce
AE
193
194 u64 snap_id;
ecb4dc22 195 const char *snap_name;
0d7dbfce
AE
196
197 struct kref kref;
198};
199
602adf40 200/*
f0f8cef5 201 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
202 */
203struct rbd_client {
204 struct ceph_client *client;
205 struct kref kref;
206 struct list_head node;
207};
208
bf0d5f50
AE
209struct rbd_img_request;
210typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211
212#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
213
214struct rbd_obj_request;
215typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216
9969ebc5
AE
217enum obj_request_type {
218 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219};
bf0d5f50 220
6d2940c8
GZ
221enum obj_operation_type {
222 OBJ_OP_WRITE,
223 OBJ_OP_READ,
90e98c52 224 OBJ_OP_DISCARD,
6d2940c8
GZ
225};
226
926f9b3f
AE
227enum obj_req_flags {
228 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 229 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
230 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
231 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
232};
233
bf0d5f50 234struct rbd_obj_request {
a90bb0c1 235 u64 object_no;
bf0d5f50
AE
236 u64 offset; /* object start byte */
237 u64 length; /* bytes from offset */
926f9b3f 238 unsigned long flags;
bf0d5f50 239
c5b5ef6c
AE
240 /*
241 * An object request associated with an image will have its
242 * img_data flag set; a standalone object request will not.
243 *
244 * A standalone object request will have which == BAD_WHICH
245 * and a null obj_request pointer.
246 *
247 * An object request initiated in support of a layered image
248 * object (to check for its existence before a write) will
249 * have which == BAD_WHICH and a non-null obj_request pointer.
250 *
251 * Finally, an object request for rbd image data will have
252 * which != BAD_WHICH, and will have a non-null img_request
253 * pointer. The value of which will be in the range
254 * 0..(img_request->obj_request_count-1).
255 */
256 union {
257 struct rbd_obj_request *obj_request; /* STAT op */
258 struct {
259 struct rbd_img_request *img_request;
260 u64 img_offset;
261 /* links for img_request->obj_requests list */
262 struct list_head links;
263 };
264 };
bf0d5f50
AE
265 u32 which; /* posn image request list */
266
267 enum obj_request_type type;
788e2df3
AE
268 union {
269 struct bio *bio_list;
270 struct {
271 struct page **pages;
272 u32 page_count;
273 };
274 };
0eefd470 275 struct page **copyup_pages;
ebda6408 276 u32 copyup_page_count;
bf0d5f50
AE
277
278 struct ceph_osd_request *osd_req;
279
280 u64 xferred; /* bytes transferred */
1b83bef2 281 int result;
bf0d5f50
AE
282
283 rbd_obj_callback_t callback;
788e2df3 284 struct completion completion;
bf0d5f50
AE
285
286 struct kref kref;
287};
288
0c425248 289enum img_req_flags {
9849e986
AE
290 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
291 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 292 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 293 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
294};
295
bf0d5f50 296struct rbd_img_request {
bf0d5f50
AE
297 struct rbd_device *rbd_dev;
298 u64 offset; /* starting image byte offset */
299 u64 length; /* byte count from offset */
0c425248 300 unsigned long flags;
bf0d5f50 301 union {
9849e986 302 u64 snap_id; /* for reads */
bf0d5f50 303 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
304 };
305 union {
306 struct request *rq; /* block request */
307 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 308 };
3d7efd18 309 struct page **copyup_pages;
ebda6408 310 u32 copyup_page_count;
bf0d5f50
AE
311 spinlock_t completion_lock;/* protects next_completion */
312 u32 next_completion;
313 rbd_img_callback_t callback;
55f27e09 314 u64 xferred;/* aggregate bytes transferred */
a5a337d4 315 int result; /* first nonzero obj_request result */
bf0d5f50
AE
316
317 u32 obj_request_count;
318 struct list_head obj_requests; /* rbd_obj_request structs */
319
320 struct kref kref;
321};
322
323#define for_each_obj_request(ireq, oreq) \
ef06f4d3 324 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 325#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 326 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 327#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 328 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 329
99d16943
ID
330enum rbd_watch_state {
331 RBD_WATCH_STATE_UNREGISTERED,
332 RBD_WATCH_STATE_REGISTERED,
333 RBD_WATCH_STATE_ERROR,
334};
335
ed95b21a
ID
336enum rbd_lock_state {
337 RBD_LOCK_STATE_UNLOCKED,
338 RBD_LOCK_STATE_LOCKED,
339 RBD_LOCK_STATE_RELEASING,
340};
341
342/* WatchNotify::ClientId */
343struct rbd_client_id {
344 u64 gid;
345 u64 handle;
346};
347
f84344f3 348struct rbd_mapping {
99c1f08f 349 u64 size;
34b13184 350 u64 features;
f84344f3
AE
351};
352
602adf40
YS
353/*
354 * a single device
355 */
356struct rbd_device {
de71a297 357 int dev_id; /* blkdev unique id */
602adf40
YS
358
359 int major; /* blkdev assigned major */
dd82fff1 360 int minor;
602adf40 361 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 362
a30b71b9 363 u32 image_format; /* Either 1 or 2 */
602adf40
YS
364 struct rbd_client *rbd_client;
365
366 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
367
b82d167b 368 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
369
370 struct rbd_image_header header;
b82d167b 371 unsigned long flags; /* possibly lock protected */
0d7dbfce 372 struct rbd_spec *spec;
d147543d 373 struct rbd_options *opts;
0d6d1e9c 374 char *config_info; /* add{,_single_major} string */
602adf40 375
c41d13a3 376 struct ceph_object_id header_oid;
922dab61 377 struct ceph_object_locator header_oloc;
971f839a 378
1643dfa4 379 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 380
99d16943
ID
381 struct mutex watch_mutex;
382 enum rbd_watch_state watch_state;
922dab61 383 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
384 u64 watch_cookie;
385 struct delayed_work watch_dwork;
59c2be1e 386
ed95b21a
ID
387 struct rw_semaphore lock_rwsem;
388 enum rbd_lock_state lock_state;
cbbfb0ff 389 char lock_cookie[32];
ed95b21a
ID
390 struct rbd_client_id owner_cid;
391 struct work_struct acquired_lock_work;
392 struct work_struct released_lock_work;
393 struct delayed_work lock_dwork;
394 struct work_struct unlock_work;
395 wait_queue_head_t lock_waitq;
396
1643dfa4 397 struct workqueue_struct *task_wq;
59c2be1e 398
86b00e0d
AE
399 struct rbd_spec *parent_spec;
400 u64 parent_overlap;
a2acd00e 401 atomic_t parent_ref;
2f82ee54 402 struct rbd_device *parent;
86b00e0d 403
7ad18afa
CH
404 /* Block layer tags. */
405 struct blk_mq_tag_set tag_set;
406
c666601a
JD
407 /* protects updating the header */
408 struct rw_semaphore header_rwsem;
f84344f3
AE
409
410 struct rbd_mapping mapping;
602adf40
YS
411
412 struct list_head node;
dfc5606d 413
dfc5606d
YS
414 /* sysfs related */
415 struct device dev;
b82d167b 416 unsigned long open_count; /* protected by lock */
dfc5606d
YS
417};
418
b82d167b 419/*
87c0fded
ID
420 * Flag bits for rbd_dev->flags:
421 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
422 * by rbd_dev->lock
423 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 424 */
6d292906
AE
425enum rbd_dev_flags {
426 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 427 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 428 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
429};
430
cfbf6377 431static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 432
602adf40 433static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
434static DEFINE_SPINLOCK(rbd_dev_list_lock);
435
432b8587
AE
436static LIST_HEAD(rbd_client_list); /* clients */
437static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 438
78c2a44a
AE
439/* Slab caches for frequently-allocated structures */
440
1c2a9dfe 441static struct kmem_cache *rbd_img_request_cache;
868311b1 442static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 443
f856dc36
N
444static struct bio_set *rbd_bio_clone;
445
9b60e70b 446static int rbd_major;
f8a22fc2
ID
447static DEFINE_IDA(rbd_dev_id_ida);
448
f5ee37bd
ID
449static struct workqueue_struct *rbd_wq;
450
9b60e70b
ID
451/*
452 * Default to false for now, as single-major requires >= 0.75 version of
453 * userspace rbd utility.
454 */
455static bool single_major = false;
456module_param(single_major, bool, S_IRUGO);
457MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
458
3d7efd18
AE
459static int rbd_img_request_submit(struct rbd_img_request *img_request);
460
f0f8cef5
AE
461static ssize_t rbd_add(struct bus_type *bus, const char *buf,
462 size_t count);
463static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
464 size_t count);
9b60e70b
ID
465static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
466 size_t count);
467static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
468 size_t count);
6d69bb53 469static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 470static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 471
9b60e70b
ID
472static int rbd_dev_id_to_minor(int dev_id)
473{
7e513d43 474 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
475}
476
477static int minor_to_rbd_dev_id(int minor)
478{
7e513d43 479 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
480}
481
ed95b21a
ID
482static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
483{
484 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
485 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
486}
487
488static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
489{
490 bool is_lock_owner;
491
492 down_read(&rbd_dev->lock_rwsem);
493 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
494 up_read(&rbd_dev->lock_rwsem);
495 return is_lock_owner;
496}
497
8767b293
ID
498static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
499{
500 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
501}
502
b15a21dd
GKH
503static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
504static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
505static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
506static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
8767b293 507static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
b15a21dd
GKH
508
509static struct attribute *rbd_bus_attrs[] = {
510 &bus_attr_add.attr,
511 &bus_attr_remove.attr,
9b60e70b
ID
512 &bus_attr_add_single_major.attr,
513 &bus_attr_remove_single_major.attr,
8767b293 514 &bus_attr_supported_features.attr,
b15a21dd 515 NULL,
f0f8cef5 516};
92c76dc0
ID
517
518static umode_t rbd_bus_is_visible(struct kobject *kobj,
519 struct attribute *attr, int index)
520{
9b60e70b
ID
521 if (!single_major &&
522 (attr == &bus_attr_add_single_major.attr ||
523 attr == &bus_attr_remove_single_major.attr))
524 return 0;
525
92c76dc0
ID
526 return attr->mode;
527}
528
529static const struct attribute_group rbd_bus_group = {
530 .attrs = rbd_bus_attrs,
531 .is_visible = rbd_bus_is_visible,
532};
533__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
534
535static struct bus_type rbd_bus_type = {
536 .name = "rbd",
b15a21dd 537 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
538};
539
540static void rbd_root_dev_release(struct device *dev)
541{
542}
543
544static struct device rbd_root_dev = {
545 .init_name = "rbd",
546 .release = rbd_root_dev_release,
547};
548
06ecc6cb
AE
549static __printf(2, 3)
550void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
551{
552 struct va_format vaf;
553 va_list args;
554
555 va_start(args, fmt);
556 vaf.fmt = fmt;
557 vaf.va = &args;
558
559 if (!rbd_dev)
560 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
561 else if (rbd_dev->disk)
562 printk(KERN_WARNING "%s: %s: %pV\n",
563 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
564 else if (rbd_dev->spec && rbd_dev->spec->image_name)
565 printk(KERN_WARNING "%s: image %s: %pV\n",
566 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
567 else if (rbd_dev->spec && rbd_dev->spec->image_id)
568 printk(KERN_WARNING "%s: id %s: %pV\n",
569 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
570 else /* punt */
571 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
572 RBD_DRV_NAME, rbd_dev, &vaf);
573 va_end(args);
574}
575
aafb230e
AE
576#ifdef RBD_DEBUG
577#define rbd_assert(expr) \
578 if (unlikely(!(expr))) { \
579 printk(KERN_ERR "\nAssertion failure in %s() " \
580 "at line %d:\n\n" \
581 "\trbd_assert(%s);\n\n", \
582 __func__, __LINE__, #expr); \
583 BUG(); \
584 }
585#else /* !RBD_DEBUG */
586# define rbd_assert(expr) ((void) 0)
587#endif /* !RBD_DEBUG */
dfc5606d 588
2761713d 589static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 590static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
591static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
592static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 593
cc4a38bd 594static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 595static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 596static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 597static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
598static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
599 u64 snap_id);
2ad3d716
AE
600static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
601 u8 *order, u64 *snap_size);
602static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
603 u64 *snap_features);
59c2be1e 604
602adf40
YS
605static int rbd_open(struct block_device *bdev, fmode_t mode)
606{
f0f8cef5 607 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 608 bool removing = false;
602adf40 609
a14ea269 610 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
611 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
612 removing = true;
613 else
614 rbd_dev->open_count++;
a14ea269 615 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
616 if (removing)
617 return -ENOENT;
618
c3e946ce 619 (void) get_device(&rbd_dev->dev);
340c7a2b 620
602adf40
YS
621 return 0;
622}
623
db2a144b 624static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
625{
626 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
627 unsigned long open_count_before;
628
a14ea269 629 spin_lock_irq(&rbd_dev->lock);
b82d167b 630 open_count_before = rbd_dev->open_count--;
a14ea269 631 spin_unlock_irq(&rbd_dev->lock);
b82d167b 632 rbd_assert(open_count_before > 0);
dfc5606d 633
c3e946ce 634 put_device(&rbd_dev->dev);
dfc5606d
YS
635}
636
131fd9f6
GZ
637static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
638{
1de797bb 639 int ro;
131fd9f6 640
1de797bb 641 if (get_user(ro, (int __user *)arg))
131fd9f6
GZ
642 return -EFAULT;
643
1de797bb 644 /* Snapshots can't be marked read-write */
131fd9f6
GZ
645 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
646 return -EROFS;
647
1de797bb
ID
648 /* Let blkdev_roset() handle it */
649 return -ENOTTY;
131fd9f6
GZ
650}
651
652static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
653 unsigned int cmd, unsigned long arg)
654{
655 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
1de797bb 656 int ret;
131fd9f6 657
131fd9f6
GZ
658 switch (cmd) {
659 case BLKROSET:
660 ret = rbd_ioctl_set_ro(rbd_dev, arg);
661 break;
662 default:
663 ret = -ENOTTY;
664 }
665
131fd9f6
GZ
666 return ret;
667}
668
669#ifdef CONFIG_COMPAT
670static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
671 unsigned int cmd, unsigned long arg)
672{
673 return rbd_ioctl(bdev, mode, cmd, arg);
674}
675#endif /* CONFIG_COMPAT */
676
602adf40
YS
677static const struct block_device_operations rbd_bd_ops = {
678 .owner = THIS_MODULE,
679 .open = rbd_open,
dfc5606d 680 .release = rbd_release,
131fd9f6
GZ
681 .ioctl = rbd_ioctl,
682#ifdef CONFIG_COMPAT
683 .compat_ioctl = rbd_compat_ioctl,
684#endif
602adf40
YS
685};
686
687/*
7262cfca 688 * Initialize an rbd client instance. Success or not, this function
cfbf6377 689 * consumes ceph_opts. Caller holds client_mutex.
602adf40 690 */
f8c38929 691static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
692{
693 struct rbd_client *rbdc;
694 int ret = -ENOMEM;
695
37206ee5 696 dout("%s:\n", __func__);
602adf40
YS
697 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
698 if (!rbdc)
699 goto out_opt;
700
701 kref_init(&rbdc->kref);
702 INIT_LIST_HEAD(&rbdc->node);
703
74da4a0f 704 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 705 if (IS_ERR(rbdc->client))
08f75463 706 goto out_rbdc;
43ae4701 707 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
708
709 ret = ceph_open_session(rbdc->client);
710 if (ret < 0)
08f75463 711 goto out_client;
602adf40 712
432b8587 713 spin_lock(&rbd_client_list_lock);
602adf40 714 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 715 spin_unlock(&rbd_client_list_lock);
602adf40 716
37206ee5 717 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 718
602adf40 719 return rbdc;
08f75463 720out_client:
602adf40 721 ceph_destroy_client(rbdc->client);
08f75463 722out_rbdc:
602adf40
YS
723 kfree(rbdc);
724out_opt:
43ae4701
AE
725 if (ceph_opts)
726 ceph_destroy_options(ceph_opts);
37206ee5
AE
727 dout("%s: error %d\n", __func__, ret);
728
28f259b7 729 return ERR_PTR(ret);
602adf40
YS
730}
731
2f82ee54
AE
732static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
733{
734 kref_get(&rbdc->kref);
735
736 return rbdc;
737}
738
602adf40 739/*
1f7ba331
AE
740 * Find a ceph client with specific addr and configuration. If
741 * found, bump its reference count.
602adf40 742 */
1f7ba331 743static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
744{
745 struct rbd_client *client_node;
1f7ba331 746 bool found = false;
602adf40 747
43ae4701 748 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
749 return NULL;
750
1f7ba331
AE
751 spin_lock(&rbd_client_list_lock);
752 list_for_each_entry(client_node, &rbd_client_list, node) {
753 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
754 __rbd_get_client(client_node);
755
1f7ba331
AE
756 found = true;
757 break;
758 }
759 }
760 spin_unlock(&rbd_client_list_lock);
761
762 return found ? client_node : NULL;
602adf40
YS
763}
764
59c2be1e 765/*
210c104c 766 * (Per device) rbd map options
59c2be1e
YS
767 */
768enum {
b5584180 769 Opt_queue_depth,
59c2be1e
YS
770 Opt_last_int,
771 /* int args above */
772 Opt_last_string,
773 /* string args above */
cc0538b6
AE
774 Opt_read_only,
775 Opt_read_write,
80de1912 776 Opt_lock_on_read,
e010dd0a 777 Opt_exclusive,
210c104c 778 Opt_err
59c2be1e
YS
779};
780
43ae4701 781static match_table_t rbd_opts_tokens = {
b5584180 782 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
783 /* int args above */
784 /* string args above */
be466c1c 785 {Opt_read_only, "read_only"},
cc0538b6
AE
786 {Opt_read_only, "ro"}, /* Alternate spelling */
787 {Opt_read_write, "read_write"},
788 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 789 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 790 {Opt_exclusive, "exclusive"},
210c104c 791 {Opt_err, NULL}
59c2be1e
YS
792};
793
98571b5a 794struct rbd_options {
b5584180 795 int queue_depth;
98571b5a 796 bool read_only;
80de1912 797 bool lock_on_read;
e010dd0a 798 bool exclusive;
98571b5a
AE
799};
800
b5584180 801#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 802#define RBD_READ_ONLY_DEFAULT false
80de1912 803#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 804#define RBD_EXCLUSIVE_DEFAULT false
98571b5a 805
59c2be1e
YS
806static int parse_rbd_opts_token(char *c, void *private)
807{
43ae4701 808 struct rbd_options *rbd_opts = private;
59c2be1e
YS
809 substring_t argstr[MAX_OPT_ARGS];
810 int token, intval, ret;
811
43ae4701 812 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
813 if (token < Opt_last_int) {
814 ret = match_int(&argstr[0], &intval);
815 if (ret < 0) {
210c104c 816 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
817 return ret;
818 }
819 dout("got int token %d val %d\n", token, intval);
820 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 821 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
822 } else {
823 dout("got token %d\n", token);
824 }
825
826 switch (token) {
b5584180
ID
827 case Opt_queue_depth:
828 if (intval < 1) {
829 pr_err("queue_depth out of range\n");
830 return -EINVAL;
831 }
832 rbd_opts->queue_depth = intval;
833 break;
cc0538b6
AE
834 case Opt_read_only:
835 rbd_opts->read_only = true;
836 break;
837 case Opt_read_write:
838 rbd_opts->read_only = false;
839 break;
80de1912
ID
840 case Opt_lock_on_read:
841 rbd_opts->lock_on_read = true;
842 break;
e010dd0a
ID
843 case Opt_exclusive:
844 rbd_opts->exclusive = true;
845 break;
59c2be1e 846 default:
210c104c
ID
847 /* libceph prints "bad option" msg */
848 return -EINVAL;
59c2be1e 849 }
210c104c 850
59c2be1e
YS
851 return 0;
852}
853
6d2940c8
GZ
854static char* obj_op_name(enum obj_operation_type op_type)
855{
856 switch (op_type) {
857 case OBJ_OP_READ:
858 return "read";
859 case OBJ_OP_WRITE:
860 return "write";
90e98c52
GZ
861 case OBJ_OP_DISCARD:
862 return "discard";
6d2940c8
GZ
863 default:
864 return "???";
865 }
866}
867
602adf40
YS
868/*
869 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
870 * not exist create it. Either way, ceph_opts is consumed by this
871 * function.
602adf40 872 */
9d3997fd 873static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 874{
f8c38929 875 struct rbd_client *rbdc;
59c2be1e 876
cfbf6377 877 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 878 rbdc = rbd_client_find(ceph_opts);
9d3997fd 879 if (rbdc) /* using an existing client */
43ae4701 880 ceph_destroy_options(ceph_opts);
9d3997fd 881 else
f8c38929 882 rbdc = rbd_client_create(ceph_opts);
cfbf6377 883 mutex_unlock(&client_mutex);
602adf40 884
9d3997fd 885 return rbdc;
602adf40
YS
886}
887
888/*
889 * Destroy ceph client
d23a4b3f 890 *
432b8587 891 * Caller must hold rbd_client_list_lock.
602adf40
YS
892 */
893static void rbd_client_release(struct kref *kref)
894{
895 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
896
37206ee5 897 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 898 spin_lock(&rbd_client_list_lock);
602adf40 899 list_del(&rbdc->node);
cd9d9f5d 900 spin_unlock(&rbd_client_list_lock);
602adf40
YS
901
902 ceph_destroy_client(rbdc->client);
903 kfree(rbdc);
904}
905
906/*
907 * Drop reference to ceph client node. If it's not referenced anymore, release
908 * it.
909 */
9d3997fd 910static void rbd_put_client(struct rbd_client *rbdc)
602adf40 911{
c53d5893
AE
912 if (rbdc)
913 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
914}
915
a30b71b9
AE
916static bool rbd_image_format_valid(u32 image_format)
917{
918 return image_format == 1 || image_format == 2;
919}
920
8e94af8e
AE
921static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
922{
103a150f
AE
923 size_t size;
924 u32 snap_count;
925
926 /* The header has to start with the magic rbd header text */
927 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
928 return false;
929
db2388b6
AE
930 /* The bio layer requires at least sector-sized I/O */
931
932 if (ondisk->options.order < SECTOR_SHIFT)
933 return false;
934
935 /* If we use u64 in a few spots we may be able to loosen this */
936
937 if (ondisk->options.order > 8 * sizeof (int) - 1)
938 return false;
939
103a150f
AE
940 /*
941 * The size of a snapshot header has to fit in a size_t, and
942 * that limits the number of snapshots.
943 */
944 snap_count = le32_to_cpu(ondisk->snap_count);
945 size = SIZE_MAX - sizeof (struct ceph_snap_context);
946 if (snap_count > size / sizeof (__le64))
947 return false;
948
949 /*
950 * Not only that, but the size of the entire the snapshot
951 * header must also be representable in a size_t.
952 */
953 size -= snap_count * sizeof (__le64);
954 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
955 return false;
956
957 return true;
8e94af8e
AE
958}
959
5bc3fb17
ID
960/*
961 * returns the size of an object in the image
962 */
963static u32 rbd_obj_bytes(struct rbd_image_header *header)
964{
965 return 1U << header->obj_order;
966}
967
263423f8
ID
968static void rbd_init_layout(struct rbd_device *rbd_dev)
969{
970 if (rbd_dev->header.stripe_unit == 0 ||
971 rbd_dev->header.stripe_count == 0) {
972 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
973 rbd_dev->header.stripe_count = 1;
974 }
975
976 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
977 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
978 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
979 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
980 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
981 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
982}
983
602adf40 984/*
bb23e37a
AE
985 * Fill an rbd image header with information from the given format 1
986 * on-disk header.
602adf40 987 */
662518b1 988static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 989 struct rbd_image_header_ondisk *ondisk)
602adf40 990{
662518b1 991 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
992 bool first_time = header->object_prefix == NULL;
993 struct ceph_snap_context *snapc;
994 char *object_prefix = NULL;
995 char *snap_names = NULL;
996 u64 *snap_sizes = NULL;
ccece235 997 u32 snap_count;
bb23e37a 998 int ret = -ENOMEM;
621901d6 999 u32 i;
602adf40 1000
bb23e37a 1001 /* Allocate this now to avoid having to handle failure below */
6a52325f 1002
bb23e37a 1003 if (first_time) {
848d796c
ID
1004 object_prefix = kstrndup(ondisk->object_prefix,
1005 sizeof(ondisk->object_prefix),
1006 GFP_KERNEL);
bb23e37a
AE
1007 if (!object_prefix)
1008 return -ENOMEM;
bb23e37a 1009 }
00f1f36f 1010
bb23e37a 1011 /* Allocate the snapshot context and fill it in */
00f1f36f 1012
bb23e37a
AE
1013 snap_count = le32_to_cpu(ondisk->snap_count);
1014 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1015 if (!snapc)
1016 goto out_err;
1017 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1018 if (snap_count) {
bb23e37a 1019 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1020 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1021
bb23e37a 1022 /* We'll keep a copy of the snapshot names... */
621901d6 1023
bb23e37a
AE
1024 if (snap_names_len > (u64)SIZE_MAX)
1025 goto out_2big;
1026 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1027 if (!snap_names)
6a52325f
AE
1028 goto out_err;
1029
bb23e37a 1030 /* ...as well as the array of their sizes. */
88a25a5f
ME
1031 snap_sizes = kmalloc_array(snap_count,
1032 sizeof(*header->snap_sizes),
1033 GFP_KERNEL);
bb23e37a 1034 if (!snap_sizes)
6a52325f 1035 goto out_err;
bb23e37a 1036
f785cc1d 1037 /*
bb23e37a
AE
1038 * Copy the names, and fill in each snapshot's id
1039 * and size.
1040 *
99a41ebc 1041 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1042 * ondisk buffer we're working with has
f785cc1d
AE
1043 * snap_names_len bytes beyond the end of the
1044 * snapshot id array, this memcpy() is safe.
1045 */
bb23e37a
AE
1046 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1047 snaps = ondisk->snaps;
1048 for (i = 0; i < snap_count; i++) {
1049 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1050 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1051 }
602adf40 1052 }
6a52325f 1053
bb23e37a 1054 /* We won't fail any more, fill in the header */
621901d6 1055
bb23e37a
AE
1056 if (first_time) {
1057 header->object_prefix = object_prefix;
1058 header->obj_order = ondisk->options.order;
263423f8 1059 rbd_init_layout(rbd_dev);
602adf40 1060 } else {
662518b1
AE
1061 ceph_put_snap_context(header->snapc);
1062 kfree(header->snap_names);
1063 kfree(header->snap_sizes);
602adf40 1064 }
849b4260 1065
bb23e37a 1066 /* The remaining fields always get updated (when we refresh) */
621901d6 1067
f84344f3 1068 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1069 header->snapc = snapc;
1070 header->snap_names = snap_names;
1071 header->snap_sizes = snap_sizes;
468521c1 1072
602adf40 1073 return 0;
bb23e37a
AE
1074out_2big:
1075 ret = -EIO;
6a52325f 1076out_err:
bb23e37a
AE
1077 kfree(snap_sizes);
1078 kfree(snap_names);
1079 ceph_put_snap_context(snapc);
1080 kfree(object_prefix);
ccece235 1081
bb23e37a 1082 return ret;
602adf40
YS
1083}
1084
9682fc6d
AE
1085static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1086{
1087 const char *snap_name;
1088
1089 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1090
1091 /* Skip over names until we find the one we are looking for */
1092
1093 snap_name = rbd_dev->header.snap_names;
1094 while (which--)
1095 snap_name += strlen(snap_name) + 1;
1096
1097 return kstrdup(snap_name, GFP_KERNEL);
1098}
1099
30d1cff8
AE
1100/*
1101 * Snapshot id comparison function for use with qsort()/bsearch().
1102 * Note that result is for snapshots in *descending* order.
1103 */
1104static int snapid_compare_reverse(const void *s1, const void *s2)
1105{
1106 u64 snap_id1 = *(u64 *)s1;
1107 u64 snap_id2 = *(u64 *)s2;
1108
1109 if (snap_id1 < snap_id2)
1110 return 1;
1111 return snap_id1 == snap_id2 ? 0 : -1;
1112}
1113
1114/*
1115 * Search a snapshot context to see if the given snapshot id is
1116 * present.
1117 *
1118 * Returns the position of the snapshot id in the array if it's found,
1119 * or BAD_SNAP_INDEX otherwise.
1120 *
1121 * Note: The snapshot array is in kept sorted (by the osd) in
1122 * reverse order, highest snapshot id first.
1123 */
9682fc6d
AE
1124static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1125{
1126 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1127 u64 *found;
9682fc6d 1128
30d1cff8
AE
1129 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1130 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1131
30d1cff8 1132 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1133}
1134
2ad3d716
AE
1135static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1136 u64 snap_id)
9e15b77d 1137{
54cac61f 1138 u32 which;
da6a6b63 1139 const char *snap_name;
9e15b77d 1140
54cac61f
AE
1141 which = rbd_dev_snap_index(rbd_dev, snap_id);
1142 if (which == BAD_SNAP_INDEX)
da6a6b63 1143 return ERR_PTR(-ENOENT);
54cac61f 1144
da6a6b63
JD
1145 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1146 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1147}
1148
1149static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1150{
9e15b77d
AE
1151 if (snap_id == CEPH_NOSNAP)
1152 return RBD_SNAP_HEAD_NAME;
1153
54cac61f
AE
1154 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1155 if (rbd_dev->image_format == 1)
1156 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1157
54cac61f 1158 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1159}
1160
2ad3d716
AE
1161static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1162 u64 *snap_size)
602adf40 1163{
2ad3d716
AE
1164 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1165 if (snap_id == CEPH_NOSNAP) {
1166 *snap_size = rbd_dev->header.image_size;
1167 } else if (rbd_dev->image_format == 1) {
1168 u32 which;
602adf40 1169
2ad3d716
AE
1170 which = rbd_dev_snap_index(rbd_dev, snap_id);
1171 if (which == BAD_SNAP_INDEX)
1172 return -ENOENT;
e86924a8 1173
2ad3d716
AE
1174 *snap_size = rbd_dev->header.snap_sizes[which];
1175 } else {
1176 u64 size = 0;
1177 int ret;
1178
1179 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1180 if (ret)
1181 return ret;
1182
1183 *snap_size = size;
1184 }
1185 return 0;
602adf40
YS
1186}
1187
2ad3d716
AE
1188static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1189 u64 *snap_features)
602adf40 1190{
2ad3d716
AE
1191 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1192 if (snap_id == CEPH_NOSNAP) {
1193 *snap_features = rbd_dev->header.features;
1194 } else if (rbd_dev->image_format == 1) {
1195 *snap_features = 0; /* No features for format 1 */
602adf40 1196 } else {
2ad3d716
AE
1197 u64 features = 0;
1198 int ret;
8b0241f8 1199
2ad3d716
AE
1200 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1201 if (ret)
1202 return ret;
1203
1204 *snap_features = features;
1205 }
1206 return 0;
1207}
1208
1209static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1210{
8f4b7d98 1211 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1212 u64 size = 0;
1213 u64 features = 0;
1214 int ret;
1215
2ad3d716
AE
1216 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1217 if (ret)
1218 return ret;
1219 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1220 if (ret)
1221 return ret;
1222
1223 rbd_dev->mapping.size = size;
1224 rbd_dev->mapping.features = features;
1225
8b0241f8 1226 return 0;
602adf40
YS
1227}
1228
d1cf5788
AE
1229static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1230{
1231 rbd_dev->mapping.size = 0;
1232 rbd_dev->mapping.features = 0;
200a6a8b
AE
1233}
1234
65ccfe21
AE
1235static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1236{
5bc3fb17 1237 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
602adf40 1238
65ccfe21
AE
1239 return offset & (segment_size - 1);
1240}
1241
1242static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1243 u64 offset, u64 length)
1244{
5bc3fb17 1245 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
65ccfe21
AE
1246
1247 offset &= segment_size - 1;
1248
aafb230e 1249 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1250 if (offset + length > segment_size)
1251 length = segment_size - offset;
1252
1253 return length;
602adf40
YS
1254}
1255
1256/*
1257 * bio helpers
1258 */
1259
1260static void bio_chain_put(struct bio *chain)
1261{
1262 struct bio *tmp;
1263
1264 while (chain) {
1265 tmp = chain;
1266 chain = chain->bi_next;
1267 bio_put(tmp);
1268 }
1269}
1270
1271/*
1272 * zeros a bio chain, starting at specific offset
1273 */
1274static void zero_bio_chain(struct bio *chain, int start_ofs)
1275{
7988613b
KO
1276 struct bio_vec bv;
1277 struct bvec_iter iter;
602adf40
YS
1278 unsigned long flags;
1279 void *buf;
602adf40
YS
1280 int pos = 0;
1281
1282 while (chain) {
7988613b
KO
1283 bio_for_each_segment(bv, chain, iter) {
1284 if (pos + bv.bv_len > start_ofs) {
602adf40 1285 int remainder = max(start_ofs - pos, 0);
7988613b 1286 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1287 memset(buf + remainder, 0,
7988613b
KO
1288 bv.bv_len - remainder);
1289 flush_dcache_page(bv.bv_page);
85b5aaa6 1290 bvec_kunmap_irq(buf, &flags);
602adf40 1291 }
7988613b 1292 pos += bv.bv_len;
602adf40
YS
1293 }
1294
1295 chain = chain->bi_next;
1296 }
1297}
1298
b9434c5b
AE
1299/*
1300 * similar to zero_bio_chain(), zeros data defined by a page array,
1301 * starting at the given byte offset from the start of the array and
1302 * continuing up to the given end offset. The pages array is
1303 * assumed to be big enough to hold all bytes up to the end.
1304 */
1305static void zero_pages(struct page **pages, u64 offset, u64 end)
1306{
1307 struct page **page = &pages[offset >> PAGE_SHIFT];
1308
1309 rbd_assert(end > offset);
1310 rbd_assert(end - offset <= (u64)SIZE_MAX);
1311 while (offset < end) {
1312 size_t page_offset;
1313 size_t length;
1314 unsigned long flags;
1315 void *kaddr;
1316
491205a8
GU
1317 page_offset = offset & ~PAGE_MASK;
1318 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1319 local_irq_save(flags);
1320 kaddr = kmap_atomic(*page);
1321 memset(kaddr + page_offset, 0, length);
e2156054 1322 flush_dcache_page(*page);
b9434c5b
AE
1323 kunmap_atomic(kaddr);
1324 local_irq_restore(flags);
1325
1326 offset += length;
1327 page++;
1328 }
1329}
1330
602adf40 1331/*
f7760dad
AE
1332 * Clone a portion of a bio, starting at the given byte offset
1333 * and continuing for the number of bytes indicated.
602adf40 1334 */
f7760dad
AE
1335static struct bio *bio_clone_range(struct bio *bio_src,
1336 unsigned int offset,
1337 unsigned int len,
1338 gfp_t gfpmask)
602adf40 1339{
f7760dad
AE
1340 struct bio *bio;
1341
f856dc36 1342 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
f7760dad
AE
1343 if (!bio)
1344 return NULL; /* ENOMEM */
602adf40 1345
5341a627 1346 bio_advance(bio, offset);
4f024f37 1347 bio->bi_iter.bi_size = len;
f7760dad
AE
1348
1349 return bio;
1350}
1351
1352/*
1353 * Clone a portion of a bio chain, starting at the given byte offset
1354 * into the first bio in the source chain and continuing for the
1355 * number of bytes indicated. The result is another bio chain of
1356 * exactly the given length, or a null pointer on error.
1357 *
1358 * The bio_src and offset parameters are both in-out. On entry they
1359 * refer to the first source bio and the offset into that bio where
1360 * the start of data to be cloned is located.
1361 *
1362 * On return, bio_src is updated to refer to the bio in the source
1363 * chain that contains first un-cloned byte, and *offset will
1364 * contain the offset of that byte within that bio.
1365 */
1366static struct bio *bio_chain_clone_range(struct bio **bio_src,
1367 unsigned int *offset,
1368 unsigned int len,
1369 gfp_t gfpmask)
1370{
1371 struct bio *bi = *bio_src;
1372 unsigned int off = *offset;
1373 struct bio *chain = NULL;
1374 struct bio **end;
1375
1376 /* Build up a chain of clone bios up to the limit */
1377
4f024f37 1378 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1379 return NULL; /* Nothing to clone */
602adf40 1380
f7760dad
AE
1381 end = &chain;
1382 while (len) {
1383 unsigned int bi_size;
1384 struct bio *bio;
1385
f5400b7a
AE
1386 if (!bi) {
1387 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1388 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1389 }
4f024f37 1390 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1391 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1392 if (!bio)
1393 goto out_err; /* ENOMEM */
1394
1395 *end = bio;
1396 end = &bio->bi_next;
602adf40 1397
f7760dad 1398 off += bi_size;
4f024f37 1399 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1400 bi = bi->bi_next;
1401 off = 0;
1402 }
1403 len -= bi_size;
1404 }
1405 *bio_src = bi;
1406 *offset = off;
1407
1408 return chain;
1409out_err:
1410 bio_chain_put(chain);
602adf40 1411
602adf40
YS
1412 return NULL;
1413}
1414
926f9b3f
AE
1415/*
1416 * The default/initial value for all object request flags is 0. For
1417 * each flag, once its value is set to 1 it is never reset to 0
1418 * again.
1419 */
57acbaa7 1420static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1421{
57acbaa7 1422 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1423 struct rbd_device *rbd_dev;
1424
57acbaa7 1425 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1426 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1427 obj_request);
1428 }
1429}
1430
57acbaa7 1431static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1432{
1433 smp_mb();
57acbaa7 1434 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1435}
1436
57acbaa7 1437static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1438{
57acbaa7
AE
1439 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1440 struct rbd_device *rbd_dev = NULL;
6365d33a 1441
57acbaa7
AE
1442 if (obj_request_img_data_test(obj_request))
1443 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1444 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1445 obj_request);
1446 }
1447}
1448
57acbaa7 1449static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1450{
1451 smp_mb();
57acbaa7 1452 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1453}
1454
5679c59f
AE
1455/*
1456 * This sets the KNOWN flag after (possibly) setting the EXISTS
1457 * flag. The latter is set based on the "exists" value provided.
1458 *
1459 * Note that for our purposes once an object exists it never goes
1460 * away again. It's possible that the response from two existence
1461 * checks are separated by the creation of the target object, and
1462 * the first ("doesn't exist") response arrives *after* the second
1463 * ("does exist"). In that case we ignore the second one.
1464 */
1465static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1466 bool exists)
1467{
1468 if (exists)
1469 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1470 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1471 smp_mb();
1472}
1473
1474static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1475{
1476 smp_mb();
1477 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1478}
1479
1480static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1481{
1482 smp_mb();
1483 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1484}
1485
9638556a
ID
1486static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1487{
1488 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1489
1490 return obj_request->img_offset <
1491 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1492}
1493
bf0d5f50
AE
1494static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1495{
37206ee5 1496 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1497 kref_read(&obj_request->kref));
bf0d5f50
AE
1498 kref_get(&obj_request->kref);
1499}
1500
1501static void rbd_obj_request_destroy(struct kref *kref);
1502static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1503{
1504 rbd_assert(obj_request != NULL);
37206ee5 1505 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1506 kref_read(&obj_request->kref));
bf0d5f50
AE
1507 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1508}
1509
0f2d5be7
AE
1510static void rbd_img_request_get(struct rbd_img_request *img_request)
1511{
1512 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1513 kref_read(&img_request->kref));
0f2d5be7
AE
1514 kref_get(&img_request->kref);
1515}
1516
e93f3152
AE
1517static bool img_request_child_test(struct rbd_img_request *img_request);
1518static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1519static void rbd_img_request_destroy(struct kref *kref);
1520static void rbd_img_request_put(struct rbd_img_request *img_request)
1521{
1522 rbd_assert(img_request != NULL);
37206ee5 1523 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1524 kref_read(&img_request->kref));
e93f3152
AE
1525 if (img_request_child_test(img_request))
1526 kref_put(&img_request->kref, rbd_parent_request_destroy);
1527 else
1528 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1529}
1530
1531static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1532 struct rbd_obj_request *obj_request)
1533{
25dcf954
AE
1534 rbd_assert(obj_request->img_request == NULL);
1535
b155e86c 1536 /* Image request now owns object's original reference */
bf0d5f50 1537 obj_request->img_request = img_request;
25dcf954 1538 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1539 rbd_assert(!obj_request_img_data_test(obj_request));
1540 obj_request_img_data_set(obj_request);
bf0d5f50 1541 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1542 img_request->obj_request_count++;
1543 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1544 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1545 obj_request->which);
bf0d5f50
AE
1546}
1547
1548static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1549 struct rbd_obj_request *obj_request)
1550{
1551 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1552
37206ee5
AE
1553 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1554 obj_request->which);
bf0d5f50 1555 list_del(&obj_request->links);
25dcf954
AE
1556 rbd_assert(img_request->obj_request_count > 0);
1557 img_request->obj_request_count--;
1558 rbd_assert(obj_request->which == img_request->obj_request_count);
1559 obj_request->which = BAD_WHICH;
6365d33a 1560 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1561 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1562 obj_request->img_request = NULL;
25dcf954 1563 obj_request->callback = NULL;
bf0d5f50
AE
1564 rbd_obj_request_put(obj_request);
1565}
1566
1567static bool obj_request_type_valid(enum obj_request_type type)
1568{
1569 switch (type) {
9969ebc5 1570 case OBJ_REQUEST_NODATA:
bf0d5f50 1571 case OBJ_REQUEST_BIO:
788e2df3 1572 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1573 return true;
1574 default:
1575 return false;
1576 }
1577}
1578
4a17dadc
ID
1579static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1580
980917fc 1581static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1582{
980917fc
ID
1583 struct ceph_osd_request *osd_req = obj_request->osd_req;
1584
a90bb0c1
ID
1585 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1586 obj_request, obj_request->object_no, obj_request->offset,
67e2b652 1587 obj_request->length, osd_req);
4a17dadc
ID
1588 if (obj_request_img_data_test(obj_request)) {
1589 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1590 rbd_img_request_get(obj_request->img_request);
1591 }
980917fc 1592 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1593}
1594
1595static void rbd_img_request_complete(struct rbd_img_request *img_request)
1596{
55f27e09 1597
37206ee5 1598 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1599
1600 /*
1601 * If no error occurred, compute the aggregate transfer
1602 * count for the image request. We could instead use
1603 * atomic64_cmpxchg() to update it as each object request
1604 * completes; not clear which way is better off hand.
1605 */
1606 if (!img_request->result) {
1607 struct rbd_obj_request *obj_request;
1608 u64 xferred = 0;
1609
1610 for_each_obj_request(img_request, obj_request)
1611 xferred += obj_request->xferred;
1612 img_request->xferred = xferred;
1613 }
1614
bf0d5f50
AE
1615 if (img_request->callback)
1616 img_request->callback(img_request);
1617 else
1618 rbd_img_request_put(img_request);
1619}
1620
0c425248
AE
1621/*
1622 * The default/initial value for all image request flags is 0. Each
1623 * is conditionally set to 1 at image request initialization time
1624 * and currently never change thereafter.
1625 */
1626static void img_request_write_set(struct rbd_img_request *img_request)
1627{
1628 set_bit(IMG_REQ_WRITE, &img_request->flags);
1629 smp_mb();
1630}
1631
1632static bool img_request_write_test(struct rbd_img_request *img_request)
1633{
1634 smp_mb();
1635 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1636}
1637
90e98c52
GZ
1638/*
1639 * Set the discard flag when the img_request is an discard request
1640 */
1641static void img_request_discard_set(struct rbd_img_request *img_request)
1642{
1643 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1644 smp_mb();
1645}
1646
1647static bool img_request_discard_test(struct rbd_img_request *img_request)
1648{
1649 smp_mb();
1650 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1651}
1652
9849e986
AE
1653static void img_request_child_set(struct rbd_img_request *img_request)
1654{
1655 set_bit(IMG_REQ_CHILD, &img_request->flags);
1656 smp_mb();
1657}
1658
e93f3152
AE
1659static void img_request_child_clear(struct rbd_img_request *img_request)
1660{
1661 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1662 smp_mb();
1663}
1664
9849e986
AE
1665static bool img_request_child_test(struct rbd_img_request *img_request)
1666{
1667 smp_mb();
1668 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1669}
1670
d0b2e944
AE
1671static void img_request_layered_set(struct rbd_img_request *img_request)
1672{
1673 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1674 smp_mb();
1675}
1676
a2acd00e
AE
1677static void img_request_layered_clear(struct rbd_img_request *img_request)
1678{
1679 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1680 smp_mb();
1681}
1682
d0b2e944
AE
1683static bool img_request_layered_test(struct rbd_img_request *img_request)
1684{
1685 smp_mb();
1686 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1687}
1688
3b434a2a
JD
1689static enum obj_operation_type
1690rbd_img_request_op_type(struct rbd_img_request *img_request)
1691{
1692 if (img_request_write_test(img_request))
1693 return OBJ_OP_WRITE;
1694 else if (img_request_discard_test(img_request))
1695 return OBJ_OP_DISCARD;
1696 else
1697 return OBJ_OP_READ;
1698}
1699
6e2a4505
AE
1700static void
1701rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1702{
b9434c5b
AE
1703 u64 xferred = obj_request->xferred;
1704 u64 length = obj_request->length;
1705
6e2a4505
AE
1706 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1707 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1708 xferred, length);
6e2a4505 1709 /*
17c1cc1d
JD
1710 * ENOENT means a hole in the image. We zero-fill the entire
1711 * length of the request. A short read also implies zero-fill
1712 * to the end of the request. An error requires the whole
1713 * length of the request to be reported finished with an error
1714 * to the block layer. In each case we update the xferred
1715 * count to indicate the whole request was satisfied.
6e2a4505 1716 */
b9434c5b 1717 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1718 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1719 if (obj_request->type == OBJ_REQUEST_BIO)
1720 zero_bio_chain(obj_request->bio_list, 0);
1721 else
1722 zero_pages(obj_request->pages, 0, length);
6e2a4505 1723 obj_request->result = 0;
b9434c5b
AE
1724 } else if (xferred < length && !obj_request->result) {
1725 if (obj_request->type == OBJ_REQUEST_BIO)
1726 zero_bio_chain(obj_request->bio_list, xferred);
1727 else
1728 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1729 }
17c1cc1d 1730 obj_request->xferred = length;
6e2a4505
AE
1731 obj_request_done_set(obj_request);
1732}
1733
bf0d5f50
AE
1734static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1735{
37206ee5
AE
1736 dout("%s: obj %p cb %p\n", __func__, obj_request,
1737 obj_request->callback);
bf0d5f50
AE
1738 if (obj_request->callback)
1739 obj_request->callback(obj_request);
788e2df3
AE
1740 else
1741 complete_all(&obj_request->completion);
bf0d5f50
AE
1742}
1743
0dcc685e
ID
1744static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1745{
1746 obj_request->result = err;
1747 obj_request->xferred = 0;
1748 /*
1749 * kludge - mirror rbd_obj_request_submit() to match a put in
1750 * rbd_img_obj_callback()
1751 */
1752 if (obj_request_img_data_test(obj_request)) {
1753 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1754 rbd_img_request_get(obj_request->img_request);
1755 }
1756 obj_request_done_set(obj_request);
1757 rbd_obj_request_complete(obj_request);
1758}
1759
c47f9371 1760static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1761{
57acbaa7 1762 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1763 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1764 bool layered = false;
1765
1766 if (obj_request_img_data_test(obj_request)) {
1767 img_request = obj_request->img_request;
1768 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1769 rbd_dev = img_request->rbd_dev;
57acbaa7 1770 }
8b3e1a56
AE
1771
1772 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1773 obj_request, img_request, obj_request->result,
1774 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1775 if (layered && obj_request->result == -ENOENT &&
1776 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1777 rbd_img_parent_read(obj_request);
1778 else if (img_request)
6e2a4505
AE
1779 rbd_img_obj_request_read_callback(obj_request);
1780 else
1781 obj_request_done_set(obj_request);
bf0d5f50
AE
1782}
1783
c47f9371 1784static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1785{
1b83bef2
SW
1786 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1787 obj_request->result, obj_request->length);
1788 /*
8b3e1a56
AE
1789 * There is no such thing as a successful short write. Set
1790 * it to our originally-requested length.
1b83bef2
SW
1791 */
1792 obj_request->xferred = obj_request->length;
07741308 1793 obj_request_done_set(obj_request);
bf0d5f50
AE
1794}
1795
90e98c52
GZ
1796static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1797{
1798 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1799 obj_request->result, obj_request->length);
1800 /*
1801 * There is no such thing as a successful short discard. Set
1802 * it to our originally-requested length.
1803 */
1804 obj_request->xferred = obj_request->length;
d0265de7
JD
1805 /* discarding a non-existent object is not a problem */
1806 if (obj_request->result == -ENOENT)
1807 obj_request->result = 0;
90e98c52
GZ
1808 obj_request_done_set(obj_request);
1809}
1810
fbfab539
AE
1811/*
1812 * For a simple stat call there's nothing to do. We'll do more if
1813 * this is part of a write sequence for a layered image.
1814 */
c47f9371 1815static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1816{
37206ee5 1817 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1818 obj_request_done_set(obj_request);
1819}
1820
2761713d
ID
1821static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1822{
1823 dout("%s: obj %p\n", __func__, obj_request);
1824
1825 if (obj_request_img_data_test(obj_request))
1826 rbd_osd_copyup_callback(obj_request);
1827 else
1828 obj_request_done_set(obj_request);
1829}
1830
85e084fe 1831static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1832{
1833 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1834 u16 opcode;
1835
85e084fe 1836 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1837 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1838 if (obj_request_img_data_test(obj_request)) {
1839 rbd_assert(obj_request->img_request);
1840 rbd_assert(obj_request->which != BAD_WHICH);
1841 } else {
1842 rbd_assert(obj_request->which == BAD_WHICH);
1843 }
bf0d5f50 1844
1b83bef2
SW
1845 if (osd_req->r_result < 0)
1846 obj_request->result = osd_req->r_result;
bf0d5f50 1847
c47f9371
AE
1848 /*
1849 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1850 * passed to the block layer, which just supports a 32-bit
1851 * length field.
c47f9371 1852 */
7665d85b 1853 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1854 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1855
79528734 1856 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1857 switch (opcode) {
1858 case CEPH_OSD_OP_READ:
c47f9371 1859 rbd_osd_read_callback(obj_request);
bf0d5f50 1860 break;
0ccd5926 1861 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1862 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1863 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1864 /* fall through */
bf0d5f50 1865 case CEPH_OSD_OP_WRITE:
e30b7577 1866 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1867 rbd_osd_write_callback(obj_request);
bf0d5f50 1868 break;
fbfab539 1869 case CEPH_OSD_OP_STAT:
c47f9371 1870 rbd_osd_stat_callback(obj_request);
fbfab539 1871 break;
90e98c52
GZ
1872 case CEPH_OSD_OP_DELETE:
1873 case CEPH_OSD_OP_TRUNCATE:
1874 case CEPH_OSD_OP_ZERO:
1875 rbd_osd_discard_callback(obj_request);
1876 break;
36be9a76 1877 case CEPH_OSD_OP_CALL:
2761713d
ID
1878 rbd_osd_call_callback(obj_request);
1879 break;
bf0d5f50 1880 default:
a90bb0c1
ID
1881 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1882 obj_request->object_no, opcode);
bf0d5f50
AE
1883 break;
1884 }
1885
07741308 1886 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1887 rbd_obj_request_complete(obj_request);
1888}
1889
9d4df01f 1890static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1891{
8c042b0d 1892 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1893
7c84883a
ID
1894 rbd_assert(obj_request_img_data_test(obj_request));
1895 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1896}
1897
1898static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1899{
9d4df01f 1900 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1901
1134e091 1902 ktime_get_real_ts(&osd_req->r_mtime);
bb873b53 1903 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1904}
1905
bc81207e
ID
1906static struct ceph_osd_request *
1907__rbd_osd_req_create(struct rbd_device *rbd_dev,
1908 struct ceph_snap_context *snapc,
1909 int num_ops, unsigned int flags,
1910 struct rbd_obj_request *obj_request)
1911{
1912 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1913 struct ceph_osd_request *req;
a90bb0c1
ID
1914 const char *name_format = rbd_dev->image_format == 1 ?
1915 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e
ID
1916
1917 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1918 if (!req)
1919 return NULL;
1920
1921 req->r_flags = flags;
1922 req->r_callback = rbd_osd_req_callback;
1923 req->r_priv = obj_request;
1924
1925 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
a90bb0c1
ID
1926 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1927 rbd_dev->header.object_prefix, obj_request->object_no))
bc81207e
ID
1928 goto err_req;
1929
1930 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1931 goto err_req;
1932
1933 return req;
1934
1935err_req:
1936 ceph_osdc_put_request(req);
1937 return NULL;
1938}
1939
0ccd5926
ID
1940/*
1941 * Create an osd request. A read request has one osd op (read).
1942 * A write request has either one (watch) or two (hint+write) osd ops.
1943 * (All rbd data writes are prefixed with an allocation hint op, but
1944 * technically osd watch is a write request, hence this distinction.)
1945 */
bf0d5f50
AE
1946static struct ceph_osd_request *rbd_osd_req_create(
1947 struct rbd_device *rbd_dev,
6d2940c8 1948 enum obj_operation_type op_type,
deb236b3 1949 unsigned int num_ops,
430c28c3 1950 struct rbd_obj_request *obj_request)
bf0d5f50 1951{
bf0d5f50 1952 struct ceph_snap_context *snapc = NULL;
bf0d5f50 1953
90e98c52
GZ
1954 if (obj_request_img_data_test(obj_request) &&
1955 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 1956 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
1957 if (op_type == OBJ_OP_WRITE) {
1958 rbd_assert(img_request_write_test(img_request));
1959 } else {
1960 rbd_assert(img_request_discard_test(img_request));
1961 }
6d2940c8 1962 snapc = img_request->snapc;
bf0d5f50
AE
1963 }
1964
6d2940c8 1965 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3 1966
bc81207e
ID
1967 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1968 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
54ea0046 1969 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
bf0d5f50
AE
1970}
1971
0eefd470 1972/*
d3246fb0
JD
1973 * Create a copyup osd request based on the information in the object
1974 * request supplied. A copyup request has two or three osd ops, a
1975 * copyup method call, potentially a hint op, and a write or truncate
1976 * or zero op.
0eefd470
AE
1977 */
1978static struct ceph_osd_request *
1979rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1980{
1981 struct rbd_img_request *img_request;
d3246fb0 1982 int num_osd_ops = 3;
0eefd470
AE
1983
1984 rbd_assert(obj_request_img_data_test(obj_request));
1985 img_request = obj_request->img_request;
1986 rbd_assert(img_request);
d3246fb0
JD
1987 rbd_assert(img_request_write_test(img_request) ||
1988 img_request_discard_test(img_request));
0eefd470 1989
d3246fb0
JD
1990 if (img_request_discard_test(img_request))
1991 num_osd_ops = 2;
1992
bc81207e
ID
1993 return __rbd_osd_req_create(img_request->rbd_dev,
1994 img_request->snapc, num_osd_ops,
54ea0046 1995 CEPH_OSD_FLAG_WRITE, obj_request);
0eefd470
AE
1996}
1997
bf0d5f50
AE
1998static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1999{
2000 ceph_osdc_put_request(osd_req);
2001}
2002
6c696d85
ID
2003static struct rbd_obj_request *
2004rbd_obj_request_create(enum obj_request_type type)
bf0d5f50
AE
2005{
2006 struct rbd_obj_request *obj_request;
bf0d5f50
AE
2007
2008 rbd_assert(obj_request_type_valid(type));
2009
5a60e876 2010 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 2011 if (!obj_request)
f907ad55 2012 return NULL;
f907ad55 2013
bf0d5f50
AE
2014 obj_request->which = BAD_WHICH;
2015 obj_request->type = type;
2016 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2017 init_completion(&obj_request->completion);
bf0d5f50
AE
2018 kref_init(&obj_request->kref);
2019
67e2b652 2020 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
2021 return obj_request;
2022}
2023
2024static void rbd_obj_request_destroy(struct kref *kref)
2025{
2026 struct rbd_obj_request *obj_request;
2027
2028 obj_request = container_of(kref, struct rbd_obj_request, kref);
2029
37206ee5
AE
2030 dout("%s: obj %p\n", __func__, obj_request);
2031
bf0d5f50
AE
2032 rbd_assert(obj_request->img_request == NULL);
2033 rbd_assert(obj_request->which == BAD_WHICH);
2034
2035 if (obj_request->osd_req)
2036 rbd_osd_req_destroy(obj_request->osd_req);
2037
2038 rbd_assert(obj_request_type_valid(obj_request->type));
2039 switch (obj_request->type) {
9969ebc5
AE
2040 case OBJ_REQUEST_NODATA:
2041 break; /* Nothing to do */
bf0d5f50
AE
2042 case OBJ_REQUEST_BIO:
2043 if (obj_request->bio_list)
2044 bio_chain_put(obj_request->bio_list);
2045 break;
788e2df3 2046 case OBJ_REQUEST_PAGES:
04dc923c
ID
2047 /* img_data requests don't own their page array */
2048 if (obj_request->pages &&
2049 !obj_request_img_data_test(obj_request))
788e2df3
AE
2050 ceph_release_page_vector(obj_request->pages,
2051 obj_request->page_count);
2052 break;
bf0d5f50
AE
2053 }
2054
868311b1 2055 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2056}
2057
fb65d228
AE
2058/* It's OK to call this for a device with no parent */
2059
2060static void rbd_spec_put(struct rbd_spec *spec);
2061static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2062{
2063 rbd_dev_remove_parent(rbd_dev);
2064 rbd_spec_put(rbd_dev->parent_spec);
2065 rbd_dev->parent_spec = NULL;
2066 rbd_dev->parent_overlap = 0;
2067}
2068
a2acd00e
AE
2069/*
2070 * Parent image reference counting is used to determine when an
2071 * image's parent fields can be safely torn down--after there are no
2072 * more in-flight requests to the parent image. When the last
2073 * reference is dropped, cleaning them up is safe.
2074 */
2075static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2076{
2077 int counter;
2078
2079 if (!rbd_dev->parent_spec)
2080 return;
2081
2082 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2083 if (counter > 0)
2084 return;
2085
2086 /* Last reference; clean up parent data structures */
2087
2088 if (!counter)
2089 rbd_dev_unparent(rbd_dev);
2090 else
9584d508 2091 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2092}
2093
2094/*
2095 * If an image has a non-zero parent overlap, get a reference to its
2096 * parent.
2097 *
2098 * Returns true if the rbd device has a parent with a non-zero
2099 * overlap and a reference for it was successfully taken, or
2100 * false otherwise.
2101 */
2102static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2103{
ae43e9d0 2104 int counter = 0;
a2acd00e
AE
2105
2106 if (!rbd_dev->parent_spec)
2107 return false;
2108
ae43e9d0
ID
2109 down_read(&rbd_dev->header_rwsem);
2110 if (rbd_dev->parent_overlap)
2111 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2112 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2113
2114 if (counter < 0)
9584d508 2115 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2116
ae43e9d0 2117 return counter > 0;
a2acd00e
AE
2118}
2119
bf0d5f50
AE
2120/*
2121 * Caller is responsible for filling in the list of object requests
2122 * that comprises the image request, and the Linux request pointer
2123 * (if there is one).
2124 */
cc344fa1
AE
2125static struct rbd_img_request *rbd_img_request_create(
2126 struct rbd_device *rbd_dev,
bf0d5f50 2127 u64 offset, u64 length,
6d2940c8 2128 enum obj_operation_type op_type,
4e752f0a 2129 struct ceph_snap_context *snapc)
bf0d5f50
AE
2130{
2131 struct rbd_img_request *img_request;
bf0d5f50 2132
7a716aac 2133 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2134 if (!img_request)
2135 return NULL;
2136
bf0d5f50
AE
2137 img_request->rq = NULL;
2138 img_request->rbd_dev = rbd_dev;
2139 img_request->offset = offset;
2140 img_request->length = length;
0c425248 2141 img_request->flags = 0;
90e98c52
GZ
2142 if (op_type == OBJ_OP_DISCARD) {
2143 img_request_discard_set(img_request);
2144 img_request->snapc = snapc;
2145 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2146 img_request_write_set(img_request);
4e752f0a 2147 img_request->snapc = snapc;
0c425248 2148 } else {
bf0d5f50 2149 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2150 }
a2acd00e 2151 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2152 img_request_layered_set(img_request);
bf0d5f50
AE
2153 spin_lock_init(&img_request->completion_lock);
2154 img_request->next_completion = 0;
2155 img_request->callback = NULL;
a5a337d4 2156 img_request->result = 0;
bf0d5f50
AE
2157 img_request->obj_request_count = 0;
2158 INIT_LIST_HEAD(&img_request->obj_requests);
2159 kref_init(&img_request->kref);
2160
37206ee5 2161 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2162 obj_op_name(op_type), offset, length, img_request);
37206ee5 2163
bf0d5f50
AE
2164 return img_request;
2165}
2166
2167static void rbd_img_request_destroy(struct kref *kref)
2168{
2169 struct rbd_img_request *img_request;
2170 struct rbd_obj_request *obj_request;
2171 struct rbd_obj_request *next_obj_request;
2172
2173 img_request = container_of(kref, struct rbd_img_request, kref);
2174
37206ee5
AE
2175 dout("%s: img %p\n", __func__, img_request);
2176
bf0d5f50
AE
2177 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2178 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2179 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2180
a2acd00e
AE
2181 if (img_request_layered_test(img_request)) {
2182 img_request_layered_clear(img_request);
2183 rbd_dev_parent_put(img_request->rbd_dev);
2184 }
2185
bef95455
JD
2186 if (img_request_write_test(img_request) ||
2187 img_request_discard_test(img_request))
812164f8 2188 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2189
1c2a9dfe 2190 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2191}
2192
e93f3152
AE
2193static struct rbd_img_request *rbd_parent_request_create(
2194 struct rbd_obj_request *obj_request,
2195 u64 img_offset, u64 length)
2196{
2197 struct rbd_img_request *parent_request;
2198 struct rbd_device *rbd_dev;
2199
2200 rbd_assert(obj_request->img_request);
2201 rbd_dev = obj_request->img_request->rbd_dev;
2202
4e752f0a 2203 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2204 length, OBJ_OP_READ, NULL);
e93f3152
AE
2205 if (!parent_request)
2206 return NULL;
2207
2208 img_request_child_set(parent_request);
2209 rbd_obj_request_get(obj_request);
2210 parent_request->obj_request = obj_request;
2211
2212 return parent_request;
2213}
2214
2215static void rbd_parent_request_destroy(struct kref *kref)
2216{
2217 struct rbd_img_request *parent_request;
2218 struct rbd_obj_request *orig_request;
2219
2220 parent_request = container_of(kref, struct rbd_img_request, kref);
2221 orig_request = parent_request->obj_request;
2222
2223 parent_request->obj_request = NULL;
2224 rbd_obj_request_put(orig_request);
2225 img_request_child_clear(parent_request);
2226
2227 rbd_img_request_destroy(kref);
2228}
2229
1217857f
AE
2230static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2231{
6365d33a 2232 struct rbd_img_request *img_request;
1217857f
AE
2233 unsigned int xferred;
2234 int result;
8b3e1a56 2235 bool more;
1217857f 2236
6365d33a
AE
2237 rbd_assert(obj_request_img_data_test(obj_request));
2238 img_request = obj_request->img_request;
2239
1217857f
AE
2240 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2241 xferred = (unsigned int)obj_request->xferred;
2242 result = obj_request->result;
2243 if (result) {
2244 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2245 enum obj_operation_type op_type;
2246
90e98c52
GZ
2247 if (img_request_discard_test(img_request))
2248 op_type = OBJ_OP_DISCARD;
2249 else if (img_request_write_test(img_request))
2250 op_type = OBJ_OP_WRITE;
2251 else
2252 op_type = OBJ_OP_READ;
1217857f 2253
9584d508 2254 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2255 obj_op_name(op_type), obj_request->length,
2256 obj_request->img_offset, obj_request->offset);
9584d508 2257 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2258 result, xferred);
2259 if (!img_request->result)
2260 img_request->result = result;
082a75da
ID
2261 /*
2262 * Need to end I/O on the entire obj_request worth of
2263 * bytes in case of error.
2264 */
2265 xferred = obj_request->length;
1217857f
AE
2266 }
2267
8b3e1a56
AE
2268 if (img_request_child_test(img_request)) {
2269 rbd_assert(img_request->obj_request != NULL);
2270 more = obj_request->which < img_request->obj_request_count - 1;
2271 } else {
2a842aca
CH
2272 blk_status_t status = errno_to_blk_status(result);
2273
8b3e1a56 2274 rbd_assert(img_request->rq != NULL);
7ad18afa 2275
2a842aca 2276 more = blk_update_request(img_request->rq, status, xferred);
7ad18afa 2277 if (!more)
2a842aca 2278 __blk_mq_end_request(img_request->rq, status);
8b3e1a56
AE
2279 }
2280
2281 return more;
1217857f
AE
2282}
2283
2169238d
AE
2284static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2285{
2286 struct rbd_img_request *img_request;
2287 u32 which = obj_request->which;
2288 bool more = true;
2289
6365d33a 2290 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2291 img_request = obj_request->img_request;
2292
2293 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2294 rbd_assert(img_request != NULL);
2169238d
AE
2295 rbd_assert(img_request->obj_request_count > 0);
2296 rbd_assert(which != BAD_WHICH);
2297 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2298
2299 spin_lock_irq(&img_request->completion_lock);
2300 if (which != img_request->next_completion)
2301 goto out;
2302
2303 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2304 rbd_assert(more);
2305 rbd_assert(which < img_request->obj_request_count);
2306
2307 if (!obj_request_done_test(obj_request))
2308 break;
1217857f 2309 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2310 which++;
2311 }
2312
2313 rbd_assert(more ^ (which == img_request->obj_request_count));
2314 img_request->next_completion = which;
2315out:
2316 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2317 rbd_img_request_put(img_request);
2169238d
AE
2318
2319 if (!more)
2320 rbd_img_request_complete(img_request);
2321}
2322
3b434a2a
JD
2323/*
2324 * Add individual osd ops to the given ceph_osd_request and prepare
2325 * them for submission. num_ops is the current number of
2326 * osd operations already to the object request.
2327 */
2328static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2329 struct ceph_osd_request *osd_request,
2330 enum obj_operation_type op_type,
2331 unsigned int num_ops)
2332{
2333 struct rbd_img_request *img_request = obj_request->img_request;
2334 struct rbd_device *rbd_dev = img_request->rbd_dev;
2335 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2336 u64 offset = obj_request->offset;
2337 u64 length = obj_request->length;
2338 u64 img_end;
2339 u16 opcode;
2340
2341 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2342 if (!offset && length == object_size &&
2343 (!img_request_layered_test(img_request) ||
2344 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2345 opcode = CEPH_OSD_OP_DELETE;
2346 } else if ((offset + length == object_size)) {
2347 opcode = CEPH_OSD_OP_TRUNCATE;
2348 } else {
2349 down_read(&rbd_dev->header_rwsem);
2350 img_end = rbd_dev->header.image_size;
2351 up_read(&rbd_dev->header_rwsem);
2352
2353 if (obj_request->img_offset + length == img_end)
2354 opcode = CEPH_OSD_OP_TRUNCATE;
2355 else
2356 opcode = CEPH_OSD_OP_ZERO;
2357 }
2358 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2359 if (!offset && length == object_size)
2360 opcode = CEPH_OSD_OP_WRITEFULL;
2361 else
2362 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2363 osd_req_op_alloc_hint_init(osd_request, num_ops,
2364 object_size, object_size);
2365 num_ops++;
2366 } else {
2367 opcode = CEPH_OSD_OP_READ;
2368 }
2369
7e868b6e 2370 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2371 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2372 else
2373 osd_req_op_extent_init(osd_request, num_ops, opcode,
2374 offset, length, 0, 0);
2375
3b434a2a
JD
2376 if (obj_request->type == OBJ_REQUEST_BIO)
2377 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2378 obj_request->bio_list, length);
2379 else if (obj_request->type == OBJ_REQUEST_PAGES)
2380 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2381 obj_request->pages, length,
2382 offset & ~PAGE_MASK, false, false);
2383
2384 /* Discards are also writes */
2385 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2386 rbd_osd_req_format_write(obj_request);
2387 else
2388 rbd_osd_req_format_read(obj_request);
2389}
2390
f1a4739f
AE
2391/*
2392 * Split up an image request into one or more object requests, each
2393 * to a different object. The "type" parameter indicates whether
2394 * "data_desc" is the pointer to the head of a list of bio
2395 * structures, or the base of a page array. In either case this
2396 * function assumes data_desc describes memory sufficient to hold
2397 * all data described by the image request.
2398 */
2399static int rbd_img_request_fill(struct rbd_img_request *img_request,
2400 enum obj_request_type type,
2401 void *data_desc)
bf0d5f50
AE
2402{
2403 struct rbd_device *rbd_dev = img_request->rbd_dev;
2404 struct rbd_obj_request *obj_request = NULL;
2405 struct rbd_obj_request *next_obj_request;
a158073c 2406 struct bio *bio_list = NULL;
f1a4739f 2407 unsigned int bio_offset = 0;
a158073c 2408 struct page **pages = NULL;
6d2940c8 2409 enum obj_operation_type op_type;
7da22d29 2410 u64 img_offset;
bf0d5f50 2411 u64 resid;
bf0d5f50 2412
f1a4739f
AE
2413 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2414 (int)type, data_desc);
37206ee5 2415
7da22d29 2416 img_offset = img_request->offset;
bf0d5f50 2417 resid = img_request->length;
4dda41d3 2418 rbd_assert(resid > 0);
3b434a2a 2419 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2420
2421 if (type == OBJ_REQUEST_BIO) {
2422 bio_list = data_desc;
4f024f37
KO
2423 rbd_assert(img_offset ==
2424 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2425 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2426 pages = data_desc;
2427 }
2428
bf0d5f50 2429 while (resid) {
2fa12320 2430 struct ceph_osd_request *osd_req;
a90bb0c1 2431 u64 object_no = img_offset >> rbd_dev->header.obj_order;
67e2b652
ID
2432 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2433 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2434
6c696d85 2435 obj_request = rbd_obj_request_create(type);
bf0d5f50
AE
2436 if (!obj_request)
2437 goto out_unwind;
62054da6 2438
a90bb0c1 2439 obj_request->object_no = object_no;
67e2b652
ID
2440 obj_request->offset = offset;
2441 obj_request->length = length;
2442
03507db6
JD
2443 /*
2444 * set obj_request->img_request before creating the
2445 * osd_request so that it gets the right snapc
2446 */
2447 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2448
f1a4739f
AE
2449 if (type == OBJ_REQUEST_BIO) {
2450 unsigned int clone_size;
2451
2452 rbd_assert(length <= (u64)UINT_MAX);
2453 clone_size = (unsigned int)length;
2454 obj_request->bio_list =
2455 bio_chain_clone_range(&bio_list,
2456 &bio_offset,
2457 clone_size,
2224d879 2458 GFP_NOIO);
f1a4739f 2459 if (!obj_request->bio_list)
62054da6 2460 goto out_unwind;
90e98c52 2461 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2462 unsigned int page_count;
2463
2464 obj_request->pages = pages;
2465 page_count = (u32)calc_pages_for(offset, length);
2466 obj_request->page_count = page_count;
2467 if ((offset + length) & ~PAGE_MASK)
2468 page_count--; /* more on last page */
2469 pages += page_count;
2470 }
bf0d5f50 2471
6d2940c8
GZ
2472 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2473 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2474 obj_request);
2fa12320 2475 if (!osd_req)
62054da6 2476 goto out_unwind;
3b434a2a 2477
2fa12320 2478 obj_request->osd_req = osd_req;
2169238d 2479 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2480 obj_request->img_offset = img_offset;
9d4df01f 2481
3b434a2a 2482 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2483
7da22d29 2484 img_offset += length;
bf0d5f50
AE
2485 resid -= length;
2486 }
2487
2488 return 0;
2489
bf0d5f50
AE
2490out_unwind:
2491 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2492 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2493
2494 return -ENOMEM;
2495}
2496
0eefd470 2497static void
2761713d 2498rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2499{
2500 struct rbd_img_request *img_request;
2501 struct rbd_device *rbd_dev;
ebda6408 2502 struct page **pages;
0eefd470
AE
2503 u32 page_count;
2504
2761713d
ID
2505 dout("%s: obj %p\n", __func__, obj_request);
2506
d3246fb0
JD
2507 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2508 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2509 rbd_assert(obj_request_img_data_test(obj_request));
2510 img_request = obj_request->img_request;
2511 rbd_assert(img_request);
2512
2513 rbd_dev = img_request->rbd_dev;
2514 rbd_assert(rbd_dev);
0eefd470 2515
ebda6408
AE
2516 pages = obj_request->copyup_pages;
2517 rbd_assert(pages != NULL);
0eefd470 2518 obj_request->copyup_pages = NULL;
ebda6408
AE
2519 page_count = obj_request->copyup_page_count;
2520 rbd_assert(page_count);
2521 obj_request->copyup_page_count = 0;
2522 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2523
2524 /*
2525 * We want the transfer count to reflect the size of the
2526 * original write request. There is no such thing as a
2527 * successful short write, so if the request was successful
2528 * we can just set it to the originally-requested length.
2529 */
2530 if (!obj_request->result)
2531 obj_request->xferred = obj_request->length;
2532
2761713d 2533 obj_request_done_set(obj_request);
0eefd470
AE
2534}
2535
3d7efd18
AE
2536static void
2537rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2538{
2539 struct rbd_obj_request *orig_request;
0eefd470 2540 struct ceph_osd_request *osd_req;
0eefd470 2541 struct rbd_device *rbd_dev;
3d7efd18 2542 struct page **pages;
d3246fb0 2543 enum obj_operation_type op_type;
ebda6408 2544 u32 page_count;
bbea1c1a 2545 int img_result;
ebda6408 2546 u64 parent_length;
3d7efd18
AE
2547
2548 rbd_assert(img_request_child_test(img_request));
2549
2550 /* First get what we need from the image request */
2551
2552 pages = img_request->copyup_pages;
2553 rbd_assert(pages != NULL);
2554 img_request->copyup_pages = NULL;
ebda6408
AE
2555 page_count = img_request->copyup_page_count;
2556 rbd_assert(page_count);
2557 img_request->copyup_page_count = 0;
3d7efd18
AE
2558
2559 orig_request = img_request->obj_request;
2560 rbd_assert(orig_request != NULL);
b91f09f1 2561 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2562 img_result = img_request->result;
ebda6408 2563 parent_length = img_request->length;
fa355112 2564 rbd_assert(img_result || parent_length == img_request->xferred);
91c6febb 2565 rbd_img_request_put(img_request);
3d7efd18 2566
91c6febb
AE
2567 rbd_assert(orig_request->img_request);
2568 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2569 rbd_assert(rbd_dev);
0eefd470 2570
bbea1c1a
AE
2571 /*
2572 * If the overlap has become 0 (most likely because the
2573 * image has been flattened) we need to free the pages
2574 * and re-submit the original write request.
2575 */
2576 if (!rbd_dev->parent_overlap) {
bbea1c1a 2577 ceph_release_page_vector(pages, page_count);
980917fc
ID
2578 rbd_obj_request_submit(orig_request);
2579 return;
bbea1c1a 2580 }
0eefd470 2581
bbea1c1a 2582 if (img_result)
0eefd470 2583 goto out_err;
0eefd470 2584
8785b1d4
AE
2585 /*
2586 * The original osd request is of no use to use any more.
0ccd5926 2587 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2588 * request. Allocate the new copyup osd request for the
2589 * original request, and release the old one.
2590 */
bbea1c1a 2591 img_result = -ENOMEM;
0eefd470
AE
2592 osd_req = rbd_osd_req_create_copyup(orig_request);
2593 if (!osd_req)
2594 goto out_err;
8785b1d4 2595 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2596 orig_request->osd_req = osd_req;
2597 orig_request->copyup_pages = pages;
ebda6408 2598 orig_request->copyup_page_count = page_count;
3d7efd18 2599
0eefd470 2600 /* Initialize the copyup op */
3d7efd18 2601
0eefd470 2602 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2603 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2604 false, false);
3d7efd18 2605
d3246fb0 2606 /* Add the other op(s) */
0eefd470 2607
d3246fb0
JD
2608 op_type = rbd_img_request_op_type(orig_request->img_request);
2609 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2610
2611 /* All set, send it off. */
2612
980917fc
ID
2613 rbd_obj_request_submit(orig_request);
2614 return;
0eefd470 2615
0eefd470 2616out_err:
fa355112 2617 ceph_release_page_vector(pages, page_count);
0dcc685e 2618 rbd_obj_request_error(orig_request, img_result);
3d7efd18
AE
2619}
2620
2621/*
2622 * Read from the parent image the range of data that covers the
2623 * entire target of the given object request. This is used for
2624 * satisfying a layered image write request when the target of an
2625 * object request from the image request does not exist.
2626 *
2627 * A page array big enough to hold the returned data is allocated
2628 * and supplied to rbd_img_request_fill() as the "data descriptor."
2629 * When the read completes, this page array will be transferred to
2630 * the original object request for the copyup operation.
2631 *
c2e82414
ID
2632 * If an error occurs, it is recorded as the result of the original
2633 * object request in rbd_img_obj_exists_callback().
3d7efd18
AE
2634 */
2635static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2636{
058aa991 2637 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
3d7efd18 2638 struct rbd_img_request *parent_request = NULL;
3d7efd18
AE
2639 u64 img_offset;
2640 u64 length;
2641 struct page **pages = NULL;
2642 u32 page_count;
2643 int result;
2644
3d7efd18
AE
2645 rbd_assert(rbd_dev->parent != NULL);
2646
2647 /*
2648 * Determine the byte range covered by the object in the
2649 * child image to which the original request was to be sent.
2650 */
2651 img_offset = obj_request->img_offset - obj_request->offset;
5bc3fb17 2652 length = rbd_obj_bytes(&rbd_dev->header);
3d7efd18 2653
a9e8ba2c
AE
2654 /*
2655 * There is no defined parent data beyond the parent
2656 * overlap, so limit what we read at that boundary if
2657 * necessary.
2658 */
2659 if (img_offset + length > rbd_dev->parent_overlap) {
2660 rbd_assert(img_offset < rbd_dev->parent_overlap);
2661 length = rbd_dev->parent_overlap - img_offset;
2662 }
2663
3d7efd18
AE
2664 /*
2665 * Allocate a page array big enough to receive the data read
2666 * from the parent.
2667 */
2668 page_count = (u32)calc_pages_for(0, length);
1e37f2f8 2669 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
3d7efd18
AE
2670 if (IS_ERR(pages)) {
2671 result = PTR_ERR(pages);
2672 pages = NULL;
2673 goto out_err;
2674 }
2675
2676 result = -ENOMEM;
e93f3152
AE
2677 parent_request = rbd_parent_request_create(obj_request,
2678 img_offset, length);
3d7efd18
AE
2679 if (!parent_request)
2680 goto out_err;
3d7efd18
AE
2681
2682 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2683 if (result)
2684 goto out_err;
058aa991 2685
3d7efd18 2686 parent_request->copyup_pages = pages;
ebda6408 2687 parent_request->copyup_page_count = page_count;
3d7efd18 2688 parent_request->callback = rbd_img_obj_parent_read_full_callback;
058aa991 2689
3d7efd18
AE
2690 result = rbd_img_request_submit(parent_request);
2691 if (!result)
2692 return 0;
2693
2694 parent_request->copyup_pages = NULL;
ebda6408 2695 parent_request->copyup_page_count = 0;
3d7efd18
AE
2696 parent_request->obj_request = NULL;
2697 rbd_obj_request_put(obj_request);
2698out_err:
2699 if (pages)
2700 ceph_release_page_vector(pages, page_count);
2701 if (parent_request)
2702 rbd_img_request_put(parent_request);
3d7efd18
AE
2703 return result;
2704}
2705
c5b5ef6c
AE
2706static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2707{
c5b5ef6c 2708 struct rbd_obj_request *orig_request;
638f5abe 2709 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2710 int result;
2711
2712 rbd_assert(!obj_request_img_data_test(obj_request));
2713
2714 /*
2715 * All we need from the object request is the original
2716 * request and the result of the STAT op. Grab those, then
2717 * we're done with the request.
2718 */
2719 orig_request = obj_request->obj_request;
2720 obj_request->obj_request = NULL;
912c317d 2721 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2722 rbd_assert(orig_request);
2723 rbd_assert(orig_request->img_request);
2724
2725 result = obj_request->result;
2726 obj_request->result = 0;
2727
2728 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2729 obj_request, orig_request, result,
2730 obj_request->xferred, obj_request->length);
2731 rbd_obj_request_put(obj_request);
2732
638f5abe
AE
2733 /*
2734 * If the overlap has become 0 (most likely because the
980917fc
ID
2735 * image has been flattened) we need to re-submit the
2736 * original request.
638f5abe
AE
2737 */
2738 rbd_dev = orig_request->img_request->rbd_dev;
2739 if (!rbd_dev->parent_overlap) {
980917fc
ID
2740 rbd_obj_request_submit(orig_request);
2741 return;
638f5abe 2742 }
c5b5ef6c
AE
2743
2744 /*
2745 * Our only purpose here is to determine whether the object
2746 * exists, and we don't want to treat the non-existence as
2747 * an error. If something else comes back, transfer the
2748 * error to the original request and complete it now.
2749 */
2750 if (!result) {
2751 obj_request_existence_set(orig_request, true);
2752 } else if (result == -ENOENT) {
2753 obj_request_existence_set(orig_request, false);
c2e82414
ID
2754 } else {
2755 goto fail_orig_request;
c5b5ef6c
AE
2756 }
2757
2758 /*
2759 * Resubmit the original request now that we have recorded
2760 * whether the target object exists.
2761 */
c2e82414
ID
2762 result = rbd_img_obj_request_submit(orig_request);
2763 if (result)
2764 goto fail_orig_request;
2765
2766 return;
2767
2768fail_orig_request:
0dcc685e 2769 rbd_obj_request_error(orig_request, result);
c5b5ef6c
AE
2770}
2771
2772static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2773{
058aa991 2774 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
c5b5ef6c 2775 struct rbd_obj_request *stat_request;
710214e3 2776 struct page **pages;
c5b5ef6c
AE
2777 u32 page_count;
2778 size_t size;
2779 int ret;
2780
6c696d85 2781 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
710214e3
ID
2782 if (!stat_request)
2783 return -ENOMEM;
2784
a90bb0c1
ID
2785 stat_request->object_no = obj_request->object_no;
2786
710214e3
ID
2787 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2788 stat_request);
2789 if (!stat_request->osd_req) {
2790 ret = -ENOMEM;
2791 goto fail_stat_request;
2792 }
2793
c5b5ef6c
AE
2794 /*
2795 * The response data for a STAT call consists of:
2796 * le64 length;
2797 * struct {
2798 * le32 tv_sec;
2799 * le32 tv_nsec;
2800 * } mtime;
2801 */
2802 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2803 page_count = (u32)calc_pages_for(0, size);
1e37f2f8 2804 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
710214e3
ID
2805 if (IS_ERR(pages)) {
2806 ret = PTR_ERR(pages);
2807 goto fail_stat_request;
2808 }
c5b5ef6c 2809
710214e3
ID
2810 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2811 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2812 false, false);
c5b5ef6c
AE
2813
2814 rbd_obj_request_get(obj_request);
2815 stat_request->obj_request = obj_request;
2816 stat_request->pages = pages;
2817 stat_request->page_count = page_count;
c5b5ef6c
AE
2818 stat_request->callback = rbd_img_obj_exists_callback;
2819
980917fc
ID
2820 rbd_obj_request_submit(stat_request);
2821 return 0;
c5b5ef6c 2822
710214e3
ID
2823fail_stat_request:
2824 rbd_obj_request_put(stat_request);
c5b5ef6c
AE
2825 return ret;
2826}
2827
70d045f6 2828static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d 2829{
058aa991
ID
2830 struct rbd_img_request *img_request = obj_request->img_request;
2831 struct rbd_device *rbd_dev = img_request->rbd_dev;
b454e36d 2832
70d045f6 2833 /* Reads */
1c220881
JD
2834 if (!img_request_write_test(img_request) &&
2835 !img_request_discard_test(img_request))
70d045f6
ID
2836 return true;
2837
2838 /* Non-layered writes */
2839 if (!img_request_layered_test(img_request))
2840 return true;
2841
b454e36d 2842 /*
70d045f6
ID
2843 * Layered writes outside of the parent overlap range don't
2844 * share any data with the parent.
b454e36d 2845 */
70d045f6
ID
2846 if (!obj_request_overlaps_parent(obj_request))
2847 return true;
b454e36d 2848
c622d226
GZ
2849 /*
2850 * Entire-object layered writes - we will overwrite whatever
2851 * parent data there is anyway.
2852 */
2853 if (!obj_request->offset &&
2854 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2855 return true;
2856
70d045f6
ID
2857 /*
2858 * If the object is known to already exist, its parent data has
2859 * already been copied.
2860 */
2861 if (obj_request_known_test(obj_request) &&
2862 obj_request_exists_test(obj_request))
2863 return true;
2864
2865 return false;
2866}
2867
2868static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2869{
058aa991
ID
2870 rbd_assert(obj_request_img_data_test(obj_request));
2871 rbd_assert(obj_request_type_valid(obj_request->type));
2872 rbd_assert(obj_request->img_request);
b454e36d 2873
70d045f6 2874 if (img_obj_request_simple(obj_request)) {
980917fc
ID
2875 rbd_obj_request_submit(obj_request);
2876 return 0;
b454e36d
AE
2877 }
2878
2879 /*
3d7efd18
AE
2880 * It's a layered write. The target object might exist but
2881 * we may not know that yet. If we know it doesn't exist,
2882 * start by reading the data for the full target object from
2883 * the parent so we can use it for a copyup to the target.
b454e36d 2884 */
70d045f6 2885 if (obj_request_known_test(obj_request))
3d7efd18
AE
2886 return rbd_img_obj_parent_read_full(obj_request);
2887
2888 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2889
2890 return rbd_img_obj_exists_submit(obj_request);
2891}
2892
bf0d5f50
AE
2893static int rbd_img_request_submit(struct rbd_img_request *img_request)
2894{
bf0d5f50 2895 struct rbd_obj_request *obj_request;
46faeed4 2896 struct rbd_obj_request *next_obj_request;
663ae2cc 2897 int ret = 0;
bf0d5f50 2898
37206ee5 2899 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 2900
663ae2cc
ID
2901 rbd_img_request_get(img_request);
2902 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 2903 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 2904 if (ret)
663ae2cc 2905 goto out_put_ireq;
bf0d5f50
AE
2906 }
2907
663ae2cc
ID
2908out_put_ireq:
2909 rbd_img_request_put(img_request);
2910 return ret;
bf0d5f50 2911}
8b3e1a56
AE
2912
2913static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2914{
2915 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2916 struct rbd_device *rbd_dev;
2917 u64 obj_end;
02c74fba
AE
2918 u64 img_xferred;
2919 int img_result;
8b3e1a56
AE
2920
2921 rbd_assert(img_request_child_test(img_request));
2922
02c74fba
AE
2923 /* First get what we need from the image request and release it */
2924
8b3e1a56 2925 obj_request = img_request->obj_request;
02c74fba
AE
2926 img_xferred = img_request->xferred;
2927 img_result = img_request->result;
2928 rbd_img_request_put(img_request);
2929
2930 /*
2931 * If the overlap has become 0 (most likely because the
2932 * image has been flattened) we need to re-submit the
2933 * original request.
2934 */
a9e8ba2c
AE
2935 rbd_assert(obj_request);
2936 rbd_assert(obj_request->img_request);
02c74fba
AE
2937 rbd_dev = obj_request->img_request->rbd_dev;
2938 if (!rbd_dev->parent_overlap) {
980917fc
ID
2939 rbd_obj_request_submit(obj_request);
2940 return;
02c74fba 2941 }
a9e8ba2c 2942
02c74fba 2943 obj_request->result = img_result;
a9e8ba2c
AE
2944 if (obj_request->result)
2945 goto out;
2946
2947 /*
2948 * We need to zero anything beyond the parent overlap
2949 * boundary. Since rbd_img_obj_request_read_callback()
2950 * will zero anything beyond the end of a short read, an
2951 * easy way to do this is to pretend the data from the
2952 * parent came up short--ending at the overlap boundary.
2953 */
2954 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2955 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2956 if (obj_end > rbd_dev->parent_overlap) {
2957 u64 xferred = 0;
2958
2959 if (obj_request->img_offset < rbd_dev->parent_overlap)
2960 xferred = rbd_dev->parent_overlap -
2961 obj_request->img_offset;
8b3e1a56 2962
02c74fba 2963 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2964 } else {
02c74fba 2965 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2966 }
2967out:
8b3e1a56
AE
2968 rbd_img_obj_request_read_callback(obj_request);
2969 rbd_obj_request_complete(obj_request);
2970}
2971
2972static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2973{
8b3e1a56
AE
2974 struct rbd_img_request *img_request;
2975 int result;
2976
2977 rbd_assert(obj_request_img_data_test(obj_request));
2978 rbd_assert(obj_request->img_request != NULL);
2979 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2980 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2981
8b3e1a56 2982 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2983 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2984 obj_request->img_offset,
e93f3152 2985 obj_request->length);
8b3e1a56
AE
2986 result = -ENOMEM;
2987 if (!img_request)
2988 goto out_err;
2989
5b2ab72d
AE
2990 if (obj_request->type == OBJ_REQUEST_BIO)
2991 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2992 obj_request->bio_list);
2993 else
2994 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2995 obj_request->pages);
8b3e1a56
AE
2996 if (result)
2997 goto out_err;
2998
2999 img_request->callback = rbd_img_parent_read_callback;
3000 result = rbd_img_request_submit(img_request);
3001 if (result)
3002 goto out_err;
3003
3004 return;
3005out_err:
3006 if (img_request)
3007 rbd_img_request_put(img_request);
3008 obj_request->result = result;
3009 obj_request->xferred = 0;
3010 obj_request_done_set(obj_request);
3011}
bf0d5f50 3012
ed95b21a 3013static const struct rbd_client_id rbd_empty_cid;
b8d70035 3014
ed95b21a
ID
3015static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3016 const struct rbd_client_id *rhs)
3017{
3018 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3019}
3020
3021static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3022{
3023 struct rbd_client_id cid;
3024
3025 mutex_lock(&rbd_dev->watch_mutex);
3026 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3027 cid.handle = rbd_dev->watch_cookie;
3028 mutex_unlock(&rbd_dev->watch_mutex);
3029 return cid;
3030}
3031
3032/*
3033 * lock_rwsem must be held for write
3034 */
3035static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3036 const struct rbd_client_id *cid)
3037{
3038 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3039 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3040 cid->gid, cid->handle);
3041 rbd_dev->owner_cid = *cid; /* struct */
3042}
3043
3044static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3045{
3046 mutex_lock(&rbd_dev->watch_mutex);
3047 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3048 mutex_unlock(&rbd_dev->watch_mutex);
3049}
3050
3051/*
3052 * lock_rwsem must be held for write
3053 */
3054static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 3055{
922dab61 3056 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
3057 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3058 char cookie[32];
e627db08 3059 int ret;
b8d70035 3060
cbbfb0ff
ID
3061 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3062 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 3063
ed95b21a
ID
3064 format_lock_cookie(rbd_dev, cookie);
3065 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3066 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3067 RBD_LOCK_TAG, "", 0);
e627db08 3068 if (ret)
ed95b21a 3069 return ret;
b8d70035 3070
ed95b21a 3071 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
cbbfb0ff 3072 strcpy(rbd_dev->lock_cookie, cookie);
ed95b21a
ID
3073 rbd_set_owner_cid(rbd_dev, &cid);
3074 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3075 return 0;
b8d70035
AE
3076}
3077
ed95b21a
ID
3078/*
3079 * lock_rwsem must be held for write
3080 */
bbead745 3081static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 3082{
922dab61 3083 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
3084 int ret;
3085
cbbfb0ff
ID
3086 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3087 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 3088
ed95b21a 3089 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 3090 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
3091 if (ret && ret != -ENOENT)
3092 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 3093
bbead745
ID
3094 /* treat errors as the image is unlocked */
3095 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 3096 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
3097 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3098 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
3099}
3100
ed95b21a
ID
3101static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3102 enum rbd_notify_op notify_op,
3103 struct page ***preply_pages,
3104 size_t *preply_len)
9969ebc5
AE
3105{
3106 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
3107 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3108 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3109 char buf[buf_size];
3110 void *p = buf;
9969ebc5 3111
ed95b21a 3112 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 3113
ed95b21a
ID
3114 /* encode *LockPayload NotifyMessage (op + ClientId) */
3115 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3116 ceph_encode_32(&p, notify_op);
3117 ceph_encode_64(&p, cid.gid);
3118 ceph_encode_64(&p, cid.handle);
8eb87565 3119
ed95b21a
ID
3120 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3121 &rbd_dev->header_oloc, buf, buf_size,
3122 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
3123}
3124
ed95b21a
ID
3125static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3126 enum rbd_notify_op notify_op)
b30a01f2 3127{
ed95b21a
ID
3128 struct page **reply_pages;
3129 size_t reply_len;
b30a01f2 3130
ed95b21a
ID
3131 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3132 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3133}
b30a01f2 3134
ed95b21a
ID
3135static void rbd_notify_acquired_lock(struct work_struct *work)
3136{
3137 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3138 acquired_lock_work);
76756a51 3139
ed95b21a 3140 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
3141}
3142
ed95b21a 3143static void rbd_notify_released_lock(struct work_struct *work)
c525f036 3144{
ed95b21a
ID
3145 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3146 released_lock_work);
811c6688 3147
ed95b21a 3148 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
3149}
3150
ed95b21a 3151static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 3152{
ed95b21a
ID
3153 struct page **reply_pages;
3154 size_t reply_len;
3155 bool lock_owner_responded = false;
36be9a76
AE
3156 int ret;
3157
ed95b21a 3158 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 3159
ed95b21a
ID
3160 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3161 &reply_pages, &reply_len);
3162 if (ret && ret != -ETIMEDOUT) {
3163 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 3164 goto out;
ed95b21a 3165 }
36be9a76 3166
ed95b21a
ID
3167 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3168 void *p = page_address(reply_pages[0]);
3169 void *const end = p + reply_len;
3170 u32 n;
36be9a76 3171
ed95b21a
ID
3172 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3173 while (n--) {
3174 u8 struct_v;
3175 u32 len;
36be9a76 3176
ed95b21a
ID
3177 ceph_decode_need(&p, end, 8 + 8, e_inval);
3178 p += 8 + 8; /* skip gid and cookie */
04017e29 3179
ed95b21a
ID
3180 ceph_decode_32_safe(&p, end, len, e_inval);
3181 if (!len)
3182 continue;
3183
3184 if (lock_owner_responded) {
3185 rbd_warn(rbd_dev,
3186 "duplicate lock owners detected");
3187 ret = -EIO;
3188 goto out;
3189 }
3190
3191 lock_owner_responded = true;
3192 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3193 &struct_v, &len);
3194 if (ret) {
3195 rbd_warn(rbd_dev,
3196 "failed to decode ResponseMessage: %d",
3197 ret);
3198 goto e_inval;
3199 }
3200
3201 ret = ceph_decode_32(&p);
3202 }
3203 }
3204
3205 if (!lock_owner_responded) {
3206 rbd_warn(rbd_dev, "no lock owners detected");
3207 ret = -ETIMEDOUT;
3208 }
3209
3210out:
3211 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3212 return ret;
3213
3214e_inval:
3215 ret = -EINVAL;
3216 goto out;
3217}
3218
3219static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3220{
3221 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3222
3223 cancel_delayed_work(&rbd_dev->lock_dwork);
3224 if (wake_all)
3225 wake_up_all(&rbd_dev->lock_waitq);
3226 else
3227 wake_up(&rbd_dev->lock_waitq);
3228}
3229
3230static int get_lock_owner_info(struct rbd_device *rbd_dev,
3231 struct ceph_locker **lockers, u32 *num_lockers)
3232{
3233 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3234 u8 lock_type;
3235 char *lock_tag;
3236 int ret;
3237
3238 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3239
3240 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3241 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3242 &lock_type, &lock_tag, lockers, num_lockers);
3243 if (ret)
3244 return ret;
3245
3246 if (*num_lockers == 0) {
3247 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3248 goto out;
3249 }
3250
3251 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3252 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3253 lock_tag);
3254 ret = -EBUSY;
3255 goto out;
3256 }
3257
3258 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3259 rbd_warn(rbd_dev, "shared lock type detected");
3260 ret = -EBUSY;
3261 goto out;
3262 }
3263
3264 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3265 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3266 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3267 (*lockers)[0].id.cookie);
3268 ret = -EBUSY;
3269 goto out;
3270 }
3271
3272out:
3273 kfree(lock_tag);
3274 return ret;
3275}
3276
3277static int find_watcher(struct rbd_device *rbd_dev,
3278 const struct ceph_locker *locker)
3279{
3280 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3281 struct ceph_watch_item *watchers;
3282 u32 num_watchers;
3283 u64 cookie;
3284 int i;
3285 int ret;
3286
3287 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3288 &rbd_dev->header_oloc, &watchers,
3289 &num_watchers);
3290 if (ret)
3291 return ret;
3292
3293 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3294 for (i = 0; i < num_watchers; i++) {
3295 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3296 sizeof(locker->info.addr)) &&
3297 watchers[i].cookie == cookie) {
3298 struct rbd_client_id cid = {
3299 .gid = le64_to_cpu(watchers[i].name.num),
3300 .handle = cookie,
3301 };
3302
3303 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3304 rbd_dev, cid.gid, cid.handle);
3305 rbd_set_owner_cid(rbd_dev, &cid);
3306 ret = 1;
3307 goto out;
3308 }
3309 }
3310
3311 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3312 ret = 0;
3313out:
3314 kfree(watchers);
3315 return ret;
3316}
3317
3318/*
3319 * lock_rwsem must be held for write
3320 */
3321static int rbd_try_lock(struct rbd_device *rbd_dev)
3322{
3323 struct ceph_client *client = rbd_dev->rbd_client->client;
3324 struct ceph_locker *lockers;
3325 u32 num_lockers;
3326 int ret;
3327
3328 for (;;) {
3329 ret = rbd_lock(rbd_dev);
3330 if (ret != -EBUSY)
3331 return ret;
3332
3333 /* determine if the current lock holder is still alive */
3334 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3335 if (ret)
3336 return ret;
3337
3338 if (num_lockers == 0)
3339 goto again;
3340
3341 ret = find_watcher(rbd_dev, lockers);
3342 if (ret) {
3343 if (ret > 0)
3344 ret = 0; /* have to request lock */
3345 goto out;
3346 }
3347
3348 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3349 ENTITY_NAME(lockers[0].id.name));
3350
3351 ret = ceph_monc_blacklist_add(&client->monc,
3352 &lockers[0].info.addr);
3353 if (ret) {
3354 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3355 ENTITY_NAME(lockers[0].id.name), ret);
3356 goto out;
3357 }
3358
3359 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3360 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3361 lockers[0].id.cookie,
3362 &lockers[0].id.name);
3363 if (ret && ret != -ENOENT)
3364 goto out;
3365
3366again:
3367 ceph_free_lockers(lockers, num_lockers);
3368 }
3369
3370out:
3371 ceph_free_lockers(lockers, num_lockers);
3372 return ret;
3373}
3374
3375/*
3376 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3377 */
3378static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3379 int *pret)
3380{
3381 enum rbd_lock_state lock_state;
3382
3383 down_read(&rbd_dev->lock_rwsem);
3384 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3385 rbd_dev->lock_state);
3386 if (__rbd_is_lock_owner(rbd_dev)) {
3387 lock_state = rbd_dev->lock_state;
3388 up_read(&rbd_dev->lock_rwsem);
3389 return lock_state;
3390 }
3391
3392 up_read(&rbd_dev->lock_rwsem);
3393 down_write(&rbd_dev->lock_rwsem);
3394 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3395 rbd_dev->lock_state);
3396 if (!__rbd_is_lock_owner(rbd_dev)) {
3397 *pret = rbd_try_lock(rbd_dev);
3398 if (*pret)
3399 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3400 }
3401
3402 lock_state = rbd_dev->lock_state;
3403 up_write(&rbd_dev->lock_rwsem);
3404 return lock_state;
3405}
3406
3407static void rbd_acquire_lock(struct work_struct *work)
3408{
3409 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3410 struct rbd_device, lock_dwork);
3411 enum rbd_lock_state lock_state;
37f13252 3412 int ret = 0;
ed95b21a
ID
3413
3414 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3415again:
3416 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3417 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3418 if (lock_state == RBD_LOCK_STATE_LOCKED)
3419 wake_requests(rbd_dev, true);
3420 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3421 rbd_dev, lock_state, ret);
3422 return;
3423 }
3424
3425 ret = rbd_request_lock(rbd_dev);
3426 if (ret == -ETIMEDOUT) {
3427 goto again; /* treat this as a dead client */
e010dd0a
ID
3428 } else if (ret == -EROFS) {
3429 rbd_warn(rbd_dev, "peer will not release lock");
3430 /*
3431 * If this is rbd_add_acquire_lock(), we want to fail
3432 * immediately -- reuse BLACKLISTED flag. Otherwise we
3433 * want to block.
3434 */
3435 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3436 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3437 /* wake "rbd map --exclusive" process */
3438 wake_requests(rbd_dev, false);
3439 }
ed95b21a
ID
3440 } else if (ret < 0) {
3441 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3442 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3443 RBD_RETRY_DELAY);
3444 } else {
3445 /*
3446 * lock owner acked, but resend if we don't see them
3447 * release the lock
3448 */
3449 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3450 rbd_dev);
3451 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3452 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3453 }
3454}
3455
3456/*
3457 * lock_rwsem must be held for write
3458 */
3459static bool rbd_release_lock(struct rbd_device *rbd_dev)
3460{
3461 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3462 rbd_dev->lock_state);
3463 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3464 return false;
3465
3466 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3467 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3468 /*
ed95b21a 3469 * Ensure that all in-flight IO is flushed.
52bb1f9b 3470 *
ed95b21a
ID
3471 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3472 * may be shared with other devices.
52bb1f9b 3473 */
ed95b21a
ID
3474 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3475 up_read(&rbd_dev->lock_rwsem);
3476
3477 down_write(&rbd_dev->lock_rwsem);
3478 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3479 rbd_dev->lock_state);
3480 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3481 return false;
3482
bbead745
ID
3483 rbd_unlock(rbd_dev);
3484 /*
3485 * Give others a chance to grab the lock - we would re-acquire
3486 * almost immediately if we got new IO during ceph_osdc_sync()
3487 * otherwise. We need to ack our own notifications, so this
3488 * lock_dwork will be requeued from rbd_wait_state_locked()
3489 * after wake_requests() in rbd_handle_released_lock().
3490 */
3491 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3492 return true;
3493}
3494
3495static void rbd_release_lock_work(struct work_struct *work)
3496{
3497 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3498 unlock_work);
3499
3500 down_write(&rbd_dev->lock_rwsem);
3501 rbd_release_lock(rbd_dev);
3502 up_write(&rbd_dev->lock_rwsem);
3503}
3504
3505static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3506 void **p)
3507{
3508 struct rbd_client_id cid = { 0 };
3509
3510 if (struct_v >= 2) {
3511 cid.gid = ceph_decode_64(p);
3512 cid.handle = ceph_decode_64(p);
3513 }
3514
3515 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3516 cid.handle);
3517 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3518 down_write(&rbd_dev->lock_rwsem);
3519 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3520 /*
3521 * we already know that the remote client is
3522 * the owner
3523 */
3524 up_write(&rbd_dev->lock_rwsem);
3525 return;
3526 }
3527
3528 rbd_set_owner_cid(rbd_dev, &cid);
3529 downgrade_write(&rbd_dev->lock_rwsem);
3530 } else {
3531 down_read(&rbd_dev->lock_rwsem);
3532 }
3533
3534 if (!__rbd_is_lock_owner(rbd_dev))
3535 wake_requests(rbd_dev, false);
3536 up_read(&rbd_dev->lock_rwsem);
3537}
3538
3539static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3540 void **p)
3541{
3542 struct rbd_client_id cid = { 0 };
3543
3544 if (struct_v >= 2) {
3545 cid.gid = ceph_decode_64(p);
3546 cid.handle = ceph_decode_64(p);
3547 }
3548
3549 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3550 cid.handle);
3551 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3552 down_write(&rbd_dev->lock_rwsem);
3553 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3554 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3555 __func__, rbd_dev, cid.gid, cid.handle,
3556 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3557 up_write(&rbd_dev->lock_rwsem);
3558 return;
3559 }
3560
3561 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3562 downgrade_write(&rbd_dev->lock_rwsem);
3563 } else {
3564 down_read(&rbd_dev->lock_rwsem);
3565 }
3566
3567 if (!__rbd_is_lock_owner(rbd_dev))
3568 wake_requests(rbd_dev, false);
3569 up_read(&rbd_dev->lock_rwsem);
3570}
3571
3b77faa0
ID
3572/*
3573 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3574 * ResponseMessage is needed.
3575 */
3576static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3577 void **p)
ed95b21a
ID
3578{
3579 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3580 struct rbd_client_id cid = { 0 };
3b77faa0 3581 int result = 1;
ed95b21a
ID
3582
3583 if (struct_v >= 2) {
3584 cid.gid = ceph_decode_64(p);
3585 cid.handle = ceph_decode_64(p);
3586 }
3587
3588 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3589 cid.handle);
3590 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3591 return result;
ed95b21a
ID
3592
3593 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3594 if (__rbd_is_lock_owner(rbd_dev)) {
3595 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3596 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3597 goto out_unlock;
3598
3599 /*
3600 * encode ResponseMessage(0) so the peer can detect
3601 * a missing owner
3602 */
3603 result = 0;
3604
3605 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3606 if (!rbd_dev->opts->exclusive) {
3607 dout("%s rbd_dev %p queueing unlock_work\n",
3608 __func__, rbd_dev);
3609 queue_work(rbd_dev->task_wq,
3610 &rbd_dev->unlock_work);
3611 } else {
3612 /* refuse to release the lock */
3613 result = -EROFS;
3614 }
ed95b21a
ID
3615 }
3616 }
3b77faa0
ID
3617
3618out_unlock:
ed95b21a 3619 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3620 return result;
ed95b21a
ID
3621}
3622
3623static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3624 u64 notify_id, u64 cookie, s32 *result)
3625{
3626 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3627 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3628 char buf[buf_size];
3629 int ret;
3630
3631 if (result) {
3632 void *p = buf;
3633
3634 /* encode ResponseMessage */
3635 ceph_start_encoding(&p, 1, 1,
3636 buf_size - CEPH_ENCODING_START_BLK_LEN);
3637 ceph_encode_32(&p, *result);
3638 } else {
3639 buf_size = 0;
3640 }
b8d70035 3641
922dab61
ID
3642 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3643 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3644 buf, buf_size);
52bb1f9b 3645 if (ret)
ed95b21a
ID
3646 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3647}
3648
3649static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3650 u64 cookie)
3651{
3652 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3653 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3654}
3655
3656static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3657 u64 notify_id, u64 cookie, s32 result)
3658{
3659 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3660 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3661}
3662
3663static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3664 u64 notifier_id, void *data, size_t data_len)
3665{
3666 struct rbd_device *rbd_dev = arg;
3667 void *p = data;
3668 void *const end = p + data_len;
d4c2269b 3669 u8 struct_v = 0;
ed95b21a
ID
3670 u32 len;
3671 u32 notify_op;
3672 int ret;
3673
3674 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3675 __func__, rbd_dev, cookie, notify_id, data_len);
3676 if (data_len) {
3677 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3678 &struct_v, &len);
3679 if (ret) {
3680 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3681 ret);
3682 return;
3683 }
3684
3685 notify_op = ceph_decode_32(&p);
3686 } else {
3687 /* legacy notification for header updates */
3688 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3689 len = 0;
3690 }
3691
3692 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3693 switch (notify_op) {
3694 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3695 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3696 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3697 break;
3698 case RBD_NOTIFY_OP_RELEASED_LOCK:
3699 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3700 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3701 break;
3702 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3703 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3704 if (ret <= 0)
ed95b21a 3705 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3706 cookie, ret);
ed95b21a
ID
3707 else
3708 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3709 break;
3710 case RBD_NOTIFY_OP_HEADER_UPDATE:
3711 ret = rbd_dev_refresh(rbd_dev);
3712 if (ret)
3713 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3714
3715 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3716 break;
3717 default:
3718 if (rbd_is_lock_owner(rbd_dev))
3719 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3720 cookie, -EOPNOTSUPP);
3721 else
3722 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3723 break;
3724 }
b8d70035
AE
3725}
3726
99d16943
ID
3727static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3728
922dab61 3729static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3730{
922dab61 3731 struct rbd_device *rbd_dev = arg;
bb040aa0 3732
922dab61 3733 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3734
ed95b21a
ID
3735 down_write(&rbd_dev->lock_rwsem);
3736 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3737 up_write(&rbd_dev->lock_rwsem);
3738
99d16943
ID
3739 mutex_lock(&rbd_dev->watch_mutex);
3740 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3741 __rbd_unregister_watch(rbd_dev);
3742 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3743
99d16943 3744 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3745 }
99d16943 3746 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3747}
3748
9969ebc5 3749/*
99d16943 3750 * watch_mutex must be locked
9969ebc5 3751 */
99d16943 3752static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3753{
3754 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3755 struct ceph_osd_linger_request *handle;
9969ebc5 3756
922dab61 3757 rbd_assert(!rbd_dev->watch_handle);
99d16943 3758 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3759
922dab61
ID
3760 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3761 &rbd_dev->header_oloc, rbd_watch_cb,
3762 rbd_watch_errcb, rbd_dev);
3763 if (IS_ERR(handle))
3764 return PTR_ERR(handle);
8eb87565 3765
922dab61 3766 rbd_dev->watch_handle = handle;
b30a01f2 3767 return 0;
b30a01f2
ID
3768}
3769
99d16943
ID
3770/*
3771 * watch_mutex must be locked
3772 */
3773static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3774{
922dab61
ID
3775 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3776 int ret;
b30a01f2 3777
99d16943
ID
3778 rbd_assert(rbd_dev->watch_handle);
3779 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3780
922dab61
ID
3781 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3782 if (ret)
3783 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3784
922dab61 3785 rbd_dev->watch_handle = NULL;
c525f036
ID
3786}
3787
99d16943
ID
3788static int rbd_register_watch(struct rbd_device *rbd_dev)
3789{
3790 int ret;
3791
3792 mutex_lock(&rbd_dev->watch_mutex);
3793 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3794 ret = __rbd_register_watch(rbd_dev);
3795 if (ret)
3796 goto out;
3797
3798 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3799 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3800
3801out:
3802 mutex_unlock(&rbd_dev->watch_mutex);
3803 return ret;
3804}
3805
3806static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3807{
99d16943
ID
3808 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3809
3810 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3811 cancel_work_sync(&rbd_dev->acquired_lock_work);
3812 cancel_work_sync(&rbd_dev->released_lock_work);
3813 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3814 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3815}
3816
3817static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3818{
ed95b21a 3819 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3820 cancel_tasks_sync(rbd_dev);
3821
3822 mutex_lock(&rbd_dev->watch_mutex);
3823 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3824 __rbd_unregister_watch(rbd_dev);
3825 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3826 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3827
811c6688 3828 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3829}
3830
14bb211d
ID
3831/*
3832 * lock_rwsem must be held for write
3833 */
3834static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3835{
3836 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3837 char cookie[32];
3838 int ret;
3839
3840 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3841
3842 format_lock_cookie(rbd_dev, cookie);
3843 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3844 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3845 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3846 RBD_LOCK_TAG, cookie);
3847 if (ret) {
3848 if (ret != -EOPNOTSUPP)
3849 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3850 ret);
3851
3852 /*
3853 * Lock cookie cannot be updated on older OSDs, so do
3854 * a manual release and queue an acquire.
3855 */
3856 if (rbd_release_lock(rbd_dev))
3857 queue_delayed_work(rbd_dev->task_wq,
3858 &rbd_dev->lock_dwork, 0);
3859 } else {
3860 strcpy(rbd_dev->lock_cookie, cookie);
3861 }
3862}
3863
99d16943
ID
3864static void rbd_reregister_watch(struct work_struct *work)
3865{
3866 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3867 struct rbd_device, watch_dwork);
3868 int ret;
3869
3870 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3871
3872 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3873 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3874 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3875 return;
87c0fded 3876 }
99d16943
ID
3877
3878 ret = __rbd_register_watch(rbd_dev);
3879 if (ret) {
3880 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3881 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3882 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3883 wake_requests(rbd_dev, true);
87c0fded 3884 } else {
99d16943
ID
3885 queue_delayed_work(rbd_dev->task_wq,
3886 &rbd_dev->watch_dwork,
3887 RBD_RETRY_DELAY);
87c0fded
ID
3888 }
3889 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3890 return;
99d16943
ID
3891 }
3892
3893 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3894 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3895 mutex_unlock(&rbd_dev->watch_mutex);
3896
14bb211d
ID
3897 down_write(&rbd_dev->lock_rwsem);
3898 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3899 rbd_reacquire_lock(rbd_dev);
3900 up_write(&rbd_dev->lock_rwsem);
3901
99d16943
ID
3902 ret = rbd_dev_refresh(rbd_dev);
3903 if (ret)
3904 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
99d16943
ID
3905}
3906
36be9a76 3907/*
f40eb349
AE
3908 * Synchronous osd object method call. Returns the number of bytes
3909 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3910 */
3911static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3912 struct ceph_object_id *oid,
3913 struct ceph_object_locator *oloc,
36be9a76 3914 const char *method_name,
4157976b 3915 const void *outbound,
36be9a76 3916 size_t outbound_size,
4157976b 3917 void *inbound,
e2a58ee5 3918 size_t inbound_size)
36be9a76 3919{
ecd4a68a
ID
3920 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3921 struct page *req_page = NULL;
3922 struct page *reply_page;
36be9a76
AE
3923 int ret;
3924
3925 /*
6010a451
AE
3926 * Method calls are ultimately read operations. The result
3927 * should placed into the inbound buffer provided. They
3928 * also supply outbound data--parameters for the object
3929 * method. Currently if this is present it will be a
3930 * snapshot id.
36be9a76 3931 */
ecd4a68a
ID
3932 if (outbound) {
3933 if (outbound_size > PAGE_SIZE)
3934 return -E2BIG;
36be9a76 3935
ecd4a68a
ID
3936 req_page = alloc_page(GFP_KERNEL);
3937 if (!req_page)
3938 return -ENOMEM;
04017e29 3939
ecd4a68a 3940 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3941 }
36be9a76 3942
ecd4a68a
ID
3943 reply_page = alloc_page(GFP_KERNEL);
3944 if (!reply_page) {
3945 if (req_page)
3946 __free_page(req_page);
3947 return -ENOMEM;
3948 }
57385b51 3949
ecd4a68a
ID
3950 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3951 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3952 reply_page, &inbound_size);
3953 if (!ret) {
3954 memcpy(inbound, page_address(reply_page), inbound_size);
3955 ret = inbound_size;
3956 }
36be9a76 3957
ecd4a68a
ID
3958 if (req_page)
3959 __free_page(req_page);
3960 __free_page(reply_page);
36be9a76
AE
3961 return ret;
3962}
3963
ed95b21a
ID
3964/*
3965 * lock_rwsem must be held for read
3966 */
3967static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3968{
3969 DEFINE_WAIT(wait);
3970
3971 do {
3972 /*
3973 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3974 * and cancel_delayed_work() in wake_requests().
3975 */
3976 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3977 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3978 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3979 TASK_UNINTERRUPTIBLE);
3980 up_read(&rbd_dev->lock_rwsem);
3981 schedule();
3982 down_read(&rbd_dev->lock_rwsem);
87c0fded
ID
3983 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3984 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3985
ed95b21a
ID
3986 finish_wait(&rbd_dev->lock_waitq, &wait);
3987}
3988
7ad18afa 3989static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 3990{
7ad18afa
CH
3991 struct request *rq = blk_mq_rq_from_pdu(work);
3992 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 3993 struct rbd_img_request *img_request;
4e752f0a 3994 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
3995 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3996 u64 length = blk_rq_bytes(rq);
6d2940c8 3997 enum obj_operation_type op_type;
4e752f0a 3998 u64 mapping_size;
80de1912 3999 bool must_be_locked;
bf0d5f50
AE
4000 int result;
4001
aebf526b
CH
4002 switch (req_op(rq)) {
4003 case REQ_OP_DISCARD:
6ac56951 4004 case REQ_OP_WRITE_ZEROES:
90e98c52 4005 op_type = OBJ_OP_DISCARD;
aebf526b
CH
4006 break;
4007 case REQ_OP_WRITE:
6d2940c8 4008 op_type = OBJ_OP_WRITE;
aebf526b
CH
4009 break;
4010 case REQ_OP_READ:
6d2940c8 4011 op_type = OBJ_OP_READ;
aebf526b
CH
4012 break;
4013 default:
4014 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4015 result = -EIO;
4016 goto err;
4017 }
6d2940c8 4018
bc1ecc65 4019 /* Ignore/skip any zero-length requests */
bf0d5f50 4020
bc1ecc65
ID
4021 if (!length) {
4022 dout("%s: zero-length request\n", __func__);
4023 result = 0;
4024 goto err_rq;
4025 }
bf0d5f50 4026
9568c93e
ID
4027 rbd_assert(op_type == OBJ_OP_READ ||
4028 rbd_dev->spec->snap_id == CEPH_NOSNAP);
4dda41d3 4029
bc1ecc65
ID
4030 /*
4031 * Quit early if the mapped snapshot no longer exists. It's
4032 * still possible the snapshot will have disappeared by the
4033 * time our request arrives at the osd, but there's no sense in
4034 * sending it if we already know.
4035 */
4036 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4037 dout("request for non-existent snapshot");
4038 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4039 result = -ENXIO;
4040 goto err_rq;
4041 }
4dda41d3 4042
bc1ecc65
ID
4043 if (offset && length > U64_MAX - offset + 1) {
4044 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4045 length);
4046 result = -EINVAL;
4047 goto err_rq; /* Shouldn't happen */
4048 }
4dda41d3 4049
7ad18afa
CH
4050 blk_mq_start_request(rq);
4051
4e752f0a
JD
4052 down_read(&rbd_dev->header_rwsem);
4053 mapping_size = rbd_dev->mapping.size;
6d2940c8 4054 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
4055 snapc = rbd_dev->header.snapc;
4056 ceph_get_snap_context(snapc);
4057 }
4058 up_read(&rbd_dev->header_rwsem);
4059
4060 if (offset + length > mapping_size) {
bc1ecc65 4061 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 4062 length, mapping_size);
bc1ecc65
ID
4063 result = -EIO;
4064 goto err_rq;
4065 }
bf0d5f50 4066
f9bebd58
ID
4067 must_be_locked =
4068 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4069 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
4070 if (must_be_locked) {
4071 down_read(&rbd_dev->lock_rwsem);
87c0fded 4072 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
e010dd0a
ID
4073 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4074 if (rbd_dev->opts->exclusive) {
4075 rbd_warn(rbd_dev, "exclusive lock required");
4076 result = -EROFS;
4077 goto err_unlock;
4078 }
ed95b21a 4079 rbd_wait_state_locked(rbd_dev);
e010dd0a 4080 }
87c0fded
ID
4081 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4082 result = -EBLACKLISTED;
4083 goto err_unlock;
4084 }
ed95b21a
ID
4085 }
4086
6d2940c8 4087 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 4088 snapc);
bc1ecc65
ID
4089 if (!img_request) {
4090 result = -ENOMEM;
ed95b21a 4091 goto err_unlock;
bc1ecc65
ID
4092 }
4093 img_request->rq = rq;
70b16db8 4094 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 4095
90e98c52
GZ
4096 if (op_type == OBJ_OP_DISCARD)
4097 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4098 NULL);
4099 else
4100 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4101 rq->bio);
bc1ecc65
ID
4102 if (result)
4103 goto err_img_request;
bf0d5f50 4104
bc1ecc65
ID
4105 result = rbd_img_request_submit(img_request);
4106 if (result)
4107 goto err_img_request;
bf0d5f50 4108
ed95b21a
ID
4109 if (must_be_locked)
4110 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 4111 return;
bf0d5f50 4112
bc1ecc65
ID
4113err_img_request:
4114 rbd_img_request_put(img_request);
ed95b21a
ID
4115err_unlock:
4116 if (must_be_locked)
4117 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
4118err_rq:
4119 if (result)
4120 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 4121 obj_op_name(op_type), length, offset, result);
e96a650a 4122 ceph_put_snap_context(snapc);
7ad18afa 4123err:
2a842aca 4124 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 4125}
bf0d5f50 4126
fc17b653 4127static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 4128 const struct blk_mq_queue_data *bd)
bc1ecc65 4129{
7ad18afa
CH
4130 struct request *rq = bd->rq;
4131 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 4132
7ad18afa 4133 queue_work(rbd_wq, work);
fc17b653 4134 return BLK_STS_OK;
bf0d5f50
AE
4135}
4136
602adf40
YS
4137static void rbd_free_disk(struct rbd_device *rbd_dev)
4138{
5769ed0c
ID
4139 blk_cleanup_queue(rbd_dev->disk->queue);
4140 blk_mq_free_tag_set(&rbd_dev->tag_set);
4141 put_disk(rbd_dev->disk);
a0cab924 4142 rbd_dev->disk = NULL;
602adf40
YS
4143}
4144
788e2df3 4145static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
4146 struct ceph_object_id *oid,
4147 struct ceph_object_locator *oloc,
4148 void *buf, int buf_len)
788e2df3
AE
4149
4150{
fe5478e0
ID
4151 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4152 struct ceph_osd_request *req;
4153 struct page **pages;
4154 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
4155 int ret;
4156
fe5478e0
ID
4157 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4158 if (!req)
4159 return -ENOMEM;
788e2df3 4160
fe5478e0
ID
4161 ceph_oid_copy(&req->r_base_oid, oid);
4162 ceph_oloc_copy(&req->r_base_oloc, oloc);
4163 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 4164
fe5478e0 4165 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 4166 if (ret)
fe5478e0 4167 goto out_req;
788e2df3 4168
fe5478e0
ID
4169 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4170 if (IS_ERR(pages)) {
4171 ret = PTR_ERR(pages);
4172 goto out_req;
4173 }
1ceae7ef 4174
fe5478e0
ID
4175 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4176 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4177 true);
4178
4179 ceph_osdc_start_request(osdc, req, false);
4180 ret = ceph_osdc_wait_request(osdc, req);
4181 if (ret >= 0)
4182 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 4183
fe5478e0
ID
4184out_req:
4185 ceph_osdc_put_request(req);
788e2df3
AE
4186 return ret;
4187}
4188
602adf40 4189/*
662518b1
AE
4190 * Read the complete header for the given rbd device. On successful
4191 * return, the rbd_dev->header field will contain up-to-date
4192 * information about the image.
602adf40 4193 */
99a41ebc 4194static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 4195{
4156d998 4196 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 4197 u32 snap_count = 0;
4156d998
AE
4198 u64 names_size = 0;
4199 u32 want_count;
4200 int ret;
602adf40 4201
00f1f36f 4202 /*
4156d998
AE
4203 * The complete header will include an array of its 64-bit
4204 * snapshot ids, followed by the names of those snapshots as
4205 * a contiguous block of NUL-terminated strings. Note that
4206 * the number of snapshots could change by the time we read
4207 * it in, in which case we re-read it.
00f1f36f 4208 */
4156d998
AE
4209 do {
4210 size_t size;
4211
4212 kfree(ondisk);
4213
4214 size = sizeof (*ondisk);
4215 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4216 size += names_size;
4217 ondisk = kmalloc(size, GFP_KERNEL);
4218 if (!ondisk)
662518b1 4219 return -ENOMEM;
4156d998 4220
fe5478e0
ID
4221 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4222 &rbd_dev->header_oloc, ondisk, size);
4156d998 4223 if (ret < 0)
662518b1 4224 goto out;
c0cd10db 4225 if ((size_t)ret < size) {
4156d998 4226 ret = -ENXIO;
06ecc6cb
AE
4227 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4228 size, ret);
662518b1 4229 goto out;
4156d998
AE
4230 }
4231 if (!rbd_dev_ondisk_valid(ondisk)) {
4232 ret = -ENXIO;
06ecc6cb 4233 rbd_warn(rbd_dev, "invalid header");
662518b1 4234 goto out;
81e759fb 4235 }
602adf40 4236
4156d998
AE
4237 names_size = le64_to_cpu(ondisk->snap_names_len);
4238 want_count = snap_count;
4239 snap_count = le32_to_cpu(ondisk->snap_count);
4240 } while (snap_count != want_count);
00f1f36f 4241
662518b1
AE
4242 ret = rbd_header_from_disk(rbd_dev, ondisk);
4243out:
4156d998
AE
4244 kfree(ondisk);
4245
4246 return ret;
602adf40
YS
4247}
4248
15228ede
AE
4249/*
4250 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4251 * has disappeared from the (just updated) snapshot context.
4252 */
4253static void rbd_exists_validate(struct rbd_device *rbd_dev)
4254{
4255 u64 snap_id;
4256
4257 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4258 return;
4259
4260 snap_id = rbd_dev->spec->snap_id;
4261 if (snap_id == CEPH_NOSNAP)
4262 return;
4263
4264 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4265 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4266}
4267
9875201e
JD
4268static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4269{
4270 sector_t size;
9875201e
JD
4271
4272 /*
811c6688
ID
4273 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4274 * try to update its size. If REMOVING is set, updating size
4275 * is just useless work since the device can't be opened.
9875201e 4276 */
811c6688
ID
4277 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4278 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
4279 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4280 dout("setting size to %llu sectors", (unsigned long long)size);
4281 set_capacity(rbd_dev->disk, size);
4282 revalidate_disk(rbd_dev->disk);
4283 }
4284}
4285
cc4a38bd 4286static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 4287{
e627db08 4288 u64 mapping_size;
1fe5e993
AE
4289 int ret;
4290
cfbf6377 4291 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 4292 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
4293
4294 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4295 if (ret)
73e39e4d 4296 goto out;
15228ede 4297
e8f59b59
ID
4298 /*
4299 * If there is a parent, see if it has disappeared due to the
4300 * mapped image getting flattened.
4301 */
4302 if (rbd_dev->parent) {
4303 ret = rbd_dev_v2_parent_info(rbd_dev);
4304 if (ret)
73e39e4d 4305 goto out;
e8f59b59
ID
4306 }
4307
5ff1108c 4308 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4309 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4310 } else {
4311 /* validate mapped snapshot's EXISTS flag */
4312 rbd_exists_validate(rbd_dev);
4313 }
15228ede 4314
73e39e4d 4315out:
cfbf6377 4316 up_write(&rbd_dev->header_rwsem);
73e39e4d 4317 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4318 rbd_dev_update_size(rbd_dev);
1fe5e993 4319
73e39e4d 4320 return ret;
1fe5e993
AE
4321}
4322
d6296d39
CH
4323static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4324 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
4325{
4326 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4327
4328 INIT_WORK(work, rbd_queue_workfn);
4329 return 0;
4330}
4331
f363b089 4332static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 4333 .queue_rq = rbd_queue_rq,
7ad18afa
CH
4334 .init_request = rbd_init_request,
4335};
4336
602adf40
YS
4337static int rbd_init_disk(struct rbd_device *rbd_dev)
4338{
4339 struct gendisk *disk;
4340 struct request_queue *q;
593a9e7b 4341 u64 segment_size;
7ad18afa 4342 int err;
602adf40 4343
602adf40 4344 /* create gendisk info */
7e513d43
ID
4345 disk = alloc_disk(single_major ?
4346 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4347 RBD_MINORS_PER_MAJOR);
602adf40 4348 if (!disk)
1fcdb8aa 4349 return -ENOMEM;
602adf40 4350
f0f8cef5 4351 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4352 rbd_dev->dev_id);
602adf40 4353 disk->major = rbd_dev->major;
dd82fff1 4354 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4355 if (single_major)
4356 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4357 disk->fops = &rbd_bd_ops;
4358 disk->private_data = rbd_dev;
4359
7ad18afa
CH
4360 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4361 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4362 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4363 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4364 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4365 rbd_dev->tag_set.nr_hw_queues = 1;
4366 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4367
4368 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4369 if (err)
602adf40 4370 goto out_disk;
029bcbd8 4371
7ad18afa
CH
4372 q = blk_mq_init_queue(&rbd_dev->tag_set);
4373 if (IS_ERR(q)) {
4374 err = PTR_ERR(q);
4375 goto out_tag_set;
4376 }
4377
d8a2c89c
ID
4378 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4379 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4380
029bcbd8 4381 /* set io sizes to object size */
593a9e7b
AE
4382 segment_size = rbd_obj_bytes(&rbd_dev->header);
4383 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 4384 q->limits.max_sectors = queue_max_hw_sectors(q);
d3834fef 4385 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
593a9e7b
AE
4386 blk_queue_max_segment_size(q, segment_size);
4387 blk_queue_io_min(q, segment_size);
4388 blk_queue_io_opt(q, segment_size);
029bcbd8 4389
90e98c52
GZ
4390 /* enable the discard support */
4391 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4392 q->limits.discard_granularity = segment_size;
2bb4cd5c 4393 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
6ac56951 4394 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
90e98c52 4395
bae818ee 4396 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 4397 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 4398
5769ed0c
ID
4399 /*
4400 * disk_release() expects a queue ref from add_disk() and will
4401 * put it. Hold an extra ref until add_disk() is called.
4402 */
4403 WARN_ON(!blk_get_queue(q));
602adf40 4404 disk->queue = q;
602adf40
YS
4405 q->queuedata = rbd_dev;
4406
4407 rbd_dev->disk = disk;
602adf40 4408
602adf40 4409 return 0;
7ad18afa
CH
4410out_tag_set:
4411 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4412out_disk:
4413 put_disk(disk);
7ad18afa 4414 return err;
602adf40
YS
4415}
4416
dfc5606d
YS
4417/*
4418 sysfs
4419*/
4420
593a9e7b
AE
4421static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4422{
4423 return container_of(dev, struct rbd_device, dev);
4424}
4425
dfc5606d
YS
4426static ssize_t rbd_size_show(struct device *dev,
4427 struct device_attribute *attr, char *buf)
4428{
593a9e7b 4429 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4430
fc71d833
AE
4431 return sprintf(buf, "%llu\n",
4432 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4433}
4434
34b13184
AE
4435/*
4436 * Note this shows the features for whatever's mapped, which is not
4437 * necessarily the base image.
4438 */
4439static ssize_t rbd_features_show(struct device *dev,
4440 struct device_attribute *attr, char *buf)
4441{
4442 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4443
4444 return sprintf(buf, "0x%016llx\n",
fc71d833 4445 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4446}
4447
dfc5606d
YS
4448static ssize_t rbd_major_show(struct device *dev,
4449 struct device_attribute *attr, char *buf)
4450{
593a9e7b 4451 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4452
fc71d833
AE
4453 if (rbd_dev->major)
4454 return sprintf(buf, "%d\n", rbd_dev->major);
4455
4456 return sprintf(buf, "(none)\n");
dd82fff1
ID
4457}
4458
4459static ssize_t rbd_minor_show(struct device *dev,
4460 struct device_attribute *attr, char *buf)
4461{
4462 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4463
dd82fff1 4464 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4465}
4466
005a07bf
ID
4467static ssize_t rbd_client_addr_show(struct device *dev,
4468 struct device_attribute *attr, char *buf)
4469{
4470 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4471 struct ceph_entity_addr *client_addr =
4472 ceph_client_addr(rbd_dev->rbd_client->client);
4473
4474 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4475 le32_to_cpu(client_addr->nonce));
4476}
4477
dfc5606d
YS
4478static ssize_t rbd_client_id_show(struct device *dev,
4479 struct device_attribute *attr, char *buf)
602adf40 4480{
593a9e7b 4481 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4482
1dbb4399 4483 return sprintf(buf, "client%lld\n",
033268a5 4484 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4485}
4486
267fb90b
MC
4487static ssize_t rbd_cluster_fsid_show(struct device *dev,
4488 struct device_attribute *attr, char *buf)
4489{
4490 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4491
4492 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4493}
4494
0d6d1e9c
MC
4495static ssize_t rbd_config_info_show(struct device *dev,
4496 struct device_attribute *attr, char *buf)
4497{
4498 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4499
4500 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4501}
4502
dfc5606d
YS
4503static ssize_t rbd_pool_show(struct device *dev,
4504 struct device_attribute *attr, char *buf)
602adf40 4505{
593a9e7b 4506 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4507
0d7dbfce 4508 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4509}
4510
9bb2f334
AE
4511static ssize_t rbd_pool_id_show(struct device *dev,
4512 struct device_attribute *attr, char *buf)
4513{
4514 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4515
0d7dbfce 4516 return sprintf(buf, "%llu\n",
fc71d833 4517 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4518}
4519
dfc5606d
YS
4520static ssize_t rbd_name_show(struct device *dev,
4521 struct device_attribute *attr, char *buf)
4522{
593a9e7b 4523 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4524
a92ffdf8
AE
4525 if (rbd_dev->spec->image_name)
4526 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4527
4528 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4529}
4530
589d30e0
AE
4531static ssize_t rbd_image_id_show(struct device *dev,
4532 struct device_attribute *attr, char *buf)
4533{
4534 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4535
0d7dbfce 4536 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4537}
4538
34b13184
AE
4539/*
4540 * Shows the name of the currently-mapped snapshot (or
4541 * RBD_SNAP_HEAD_NAME for the base image).
4542 */
dfc5606d
YS
4543static ssize_t rbd_snap_show(struct device *dev,
4544 struct device_attribute *attr,
4545 char *buf)
4546{
593a9e7b 4547 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4548
0d7dbfce 4549 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4550}
4551
92a58671
MC
4552static ssize_t rbd_snap_id_show(struct device *dev,
4553 struct device_attribute *attr, char *buf)
4554{
4555 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4556
4557 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4558}
4559
86b00e0d 4560/*
ff96128f
ID
4561 * For a v2 image, shows the chain of parent images, separated by empty
4562 * lines. For v1 images or if there is no parent, shows "(no parent
4563 * image)".
86b00e0d
AE
4564 */
4565static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4566 struct device_attribute *attr,
4567 char *buf)
86b00e0d
AE
4568{
4569 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4570 ssize_t count = 0;
86b00e0d 4571
ff96128f 4572 if (!rbd_dev->parent)
86b00e0d
AE
4573 return sprintf(buf, "(no parent image)\n");
4574
ff96128f
ID
4575 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4576 struct rbd_spec *spec = rbd_dev->parent_spec;
4577
4578 count += sprintf(&buf[count], "%s"
4579 "pool_id %llu\npool_name %s\n"
4580 "image_id %s\nimage_name %s\n"
4581 "snap_id %llu\nsnap_name %s\n"
4582 "overlap %llu\n",
4583 !count ? "" : "\n", /* first? */
4584 spec->pool_id, spec->pool_name,
4585 spec->image_id, spec->image_name ?: "(unknown)",
4586 spec->snap_id, spec->snap_name,
4587 rbd_dev->parent_overlap);
4588 }
4589
4590 return count;
86b00e0d
AE
4591}
4592
dfc5606d
YS
4593static ssize_t rbd_image_refresh(struct device *dev,
4594 struct device_attribute *attr,
4595 const char *buf,
4596 size_t size)
4597{
593a9e7b 4598 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4599 int ret;
602adf40 4600
cc4a38bd 4601 ret = rbd_dev_refresh(rbd_dev);
e627db08 4602 if (ret)
52bb1f9b 4603 return ret;
b813623a 4604
52bb1f9b 4605 return size;
dfc5606d 4606}
602adf40 4607
dfc5606d 4608static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4609static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4610static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4611static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4612static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4613static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4614static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4615static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4616static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4617static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4618static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4619static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4620static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4621static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4622static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4623static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4624
4625static struct attribute *rbd_attrs[] = {
4626 &dev_attr_size.attr,
34b13184 4627 &dev_attr_features.attr,
dfc5606d 4628 &dev_attr_major.attr,
dd82fff1 4629 &dev_attr_minor.attr,
005a07bf 4630 &dev_attr_client_addr.attr,
dfc5606d 4631 &dev_attr_client_id.attr,
267fb90b 4632 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4633 &dev_attr_config_info.attr,
dfc5606d 4634 &dev_attr_pool.attr,
9bb2f334 4635 &dev_attr_pool_id.attr,
dfc5606d 4636 &dev_attr_name.attr,
589d30e0 4637 &dev_attr_image_id.attr,
dfc5606d 4638 &dev_attr_current_snap.attr,
92a58671 4639 &dev_attr_snap_id.attr,
86b00e0d 4640 &dev_attr_parent.attr,
dfc5606d 4641 &dev_attr_refresh.attr,
dfc5606d
YS
4642 NULL
4643};
4644
4645static struct attribute_group rbd_attr_group = {
4646 .attrs = rbd_attrs,
4647};
4648
4649static const struct attribute_group *rbd_attr_groups[] = {
4650 &rbd_attr_group,
4651 NULL
4652};
4653
6cac4695 4654static void rbd_dev_release(struct device *dev);
dfc5606d 4655
b9942bc9 4656static const struct device_type rbd_device_type = {
dfc5606d
YS
4657 .name = "rbd",
4658 .groups = rbd_attr_groups,
6cac4695 4659 .release = rbd_dev_release,
dfc5606d
YS
4660};
4661
8b8fb99c
AE
4662static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4663{
4664 kref_get(&spec->kref);
4665
4666 return spec;
4667}
4668
4669static void rbd_spec_free(struct kref *kref);
4670static void rbd_spec_put(struct rbd_spec *spec)
4671{
4672 if (spec)
4673 kref_put(&spec->kref, rbd_spec_free);
4674}
4675
4676static struct rbd_spec *rbd_spec_alloc(void)
4677{
4678 struct rbd_spec *spec;
4679
4680 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4681 if (!spec)
4682 return NULL;
04077599
ID
4683
4684 spec->pool_id = CEPH_NOPOOL;
4685 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4686 kref_init(&spec->kref);
4687
8b8fb99c
AE
4688 return spec;
4689}
4690
4691static void rbd_spec_free(struct kref *kref)
4692{
4693 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4694
4695 kfree(spec->pool_name);
4696 kfree(spec->image_id);
4697 kfree(spec->image_name);
4698 kfree(spec->snap_name);
4699 kfree(spec);
4700}
4701
1643dfa4 4702static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4703{
99d16943 4704 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4705 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4706
c41d13a3 4707 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4708 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4709 kfree(rbd_dev->config_info);
c41d13a3 4710
dd5ac32d
ID
4711 rbd_put_client(rbd_dev->rbd_client);
4712 rbd_spec_put(rbd_dev->spec);
4713 kfree(rbd_dev->opts);
4714 kfree(rbd_dev);
1643dfa4
ID
4715}
4716
4717static void rbd_dev_release(struct device *dev)
4718{
4719 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4720 bool need_put = !!rbd_dev->opts;
4721
4722 if (need_put) {
4723 destroy_workqueue(rbd_dev->task_wq);
4724 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4725 }
4726
4727 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4728
4729 /*
4730 * This is racy, but way better than putting module outside of
4731 * the release callback. The race window is pretty small, so
4732 * doing something similar to dm (dm-builtin.c) is overkill.
4733 */
4734 if (need_put)
4735 module_put(THIS_MODULE);
4736}
4737
1643dfa4
ID
4738static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4739 struct rbd_spec *spec)
c53d5893
AE
4740{
4741 struct rbd_device *rbd_dev;
4742
1643dfa4 4743 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4744 if (!rbd_dev)
4745 return NULL;
4746
4747 spin_lock_init(&rbd_dev->lock);
4748 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4749 init_rwsem(&rbd_dev->header_rwsem);
4750
7e97332e 4751 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4752 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4753 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4754
99d16943
ID
4755 mutex_init(&rbd_dev->watch_mutex);
4756 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4757 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4758
ed95b21a
ID
4759 init_rwsem(&rbd_dev->lock_rwsem);
4760 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4761 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4762 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4763 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4764 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4765 init_waitqueue_head(&rbd_dev->lock_waitq);
4766
dd5ac32d
ID
4767 rbd_dev->dev.bus = &rbd_bus_type;
4768 rbd_dev->dev.type = &rbd_device_type;
4769 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4770 device_initialize(&rbd_dev->dev);
4771
c53d5893 4772 rbd_dev->rbd_client = rbdc;
d147543d 4773 rbd_dev->spec = spec;
0903e875 4774
1643dfa4
ID
4775 return rbd_dev;
4776}
4777
4778/*
4779 * Create a mapping rbd_dev.
4780 */
4781static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4782 struct rbd_spec *spec,
4783 struct rbd_options *opts)
4784{
4785 struct rbd_device *rbd_dev;
4786
4787 rbd_dev = __rbd_dev_create(rbdc, spec);
4788 if (!rbd_dev)
4789 return NULL;
4790
4791 rbd_dev->opts = opts;
4792
4793 /* get an id and fill in device name */
4794 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4795 minor_to_rbd_dev_id(1 << MINORBITS),
4796 GFP_KERNEL);
4797 if (rbd_dev->dev_id < 0)
4798 goto fail_rbd_dev;
4799
4800 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4801 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4802 rbd_dev->name);
4803 if (!rbd_dev->task_wq)
4804 goto fail_dev_id;
dd5ac32d 4805
1643dfa4
ID
4806 /* we have a ref from do_rbd_add() */
4807 __module_get(THIS_MODULE);
dd5ac32d 4808
1643dfa4 4809 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4810 return rbd_dev;
1643dfa4
ID
4811
4812fail_dev_id:
4813 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4814fail_rbd_dev:
4815 rbd_dev_free(rbd_dev);
4816 return NULL;
c53d5893
AE
4817}
4818
4819static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4820{
dd5ac32d
ID
4821 if (rbd_dev)
4822 put_device(&rbd_dev->dev);
c53d5893
AE
4823}
4824
9d475de5
AE
4825/*
4826 * Get the size and object order for an image snapshot, or if
4827 * snap_id is CEPH_NOSNAP, gets this information for the base
4828 * image.
4829 */
4830static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4831 u8 *order, u64 *snap_size)
4832{
4833 __le64 snapid = cpu_to_le64(snap_id);
4834 int ret;
4835 struct {
4836 u8 order;
4837 __le64 size;
4838 } __attribute__ ((packed)) size_buf = { 0 };
4839
ecd4a68a
ID
4840 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4841 &rbd_dev->header_oloc, "get_size",
4842 &snapid, sizeof(snapid),
4843 &size_buf, sizeof(size_buf));
36be9a76 4844 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4845 if (ret < 0)
4846 return ret;
57385b51
AE
4847 if (ret < sizeof (size_buf))
4848 return -ERANGE;
9d475de5 4849
c3545579 4850 if (order) {
c86f86e9 4851 *order = size_buf.order;
c3545579
JD
4852 dout(" order %u", (unsigned int)*order);
4853 }
9d475de5
AE
4854 *snap_size = le64_to_cpu(size_buf.size);
4855
c3545579
JD
4856 dout(" snap_id 0x%016llx snap_size = %llu\n",
4857 (unsigned long long)snap_id,
57385b51 4858 (unsigned long long)*snap_size);
9d475de5
AE
4859
4860 return 0;
4861}
4862
4863static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4864{
4865 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4866 &rbd_dev->header.obj_order,
4867 &rbd_dev->header.image_size);
4868}
4869
1e130199
AE
4870static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4871{
4872 void *reply_buf;
4873 int ret;
4874 void *p;
4875
4876 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4877 if (!reply_buf)
4878 return -ENOMEM;
4879
ecd4a68a
ID
4880 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4881 &rbd_dev->header_oloc, "get_object_prefix",
4882 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4883 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4884 if (ret < 0)
4885 goto out;
4886
4887 p = reply_buf;
4888 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4889 p + ret, NULL, GFP_NOIO);
4890 ret = 0;
1e130199
AE
4891
4892 if (IS_ERR(rbd_dev->header.object_prefix)) {
4893 ret = PTR_ERR(rbd_dev->header.object_prefix);
4894 rbd_dev->header.object_prefix = NULL;
4895 } else {
4896 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4897 }
1e130199
AE
4898out:
4899 kfree(reply_buf);
4900
4901 return ret;
4902}
4903
b1b5402a
AE
4904static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4905 u64 *snap_features)
4906{
4907 __le64 snapid = cpu_to_le64(snap_id);
4908 struct {
4909 __le64 features;
4910 __le64 incompat;
4157976b 4911 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4912 u64 unsup;
b1b5402a
AE
4913 int ret;
4914
ecd4a68a
ID
4915 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4916 &rbd_dev->header_oloc, "get_features",
4917 &snapid, sizeof(snapid),
4918 &features_buf, sizeof(features_buf));
36be9a76 4919 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4920 if (ret < 0)
4921 return ret;
57385b51
AE
4922 if (ret < sizeof (features_buf))
4923 return -ERANGE;
d889140c 4924
d3767f0f
ID
4925 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4926 if (unsup) {
4927 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4928 unsup);
b8f5c6ed 4929 return -ENXIO;
d3767f0f 4930 }
d889140c 4931
b1b5402a
AE
4932 *snap_features = le64_to_cpu(features_buf.features);
4933
4934 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4935 (unsigned long long)snap_id,
4936 (unsigned long long)*snap_features,
4937 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4938
4939 return 0;
4940}
4941
4942static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4943{
4944 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4945 &rbd_dev->header.features);
4946}
4947
86b00e0d
AE
4948static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4949{
4950 struct rbd_spec *parent_spec;
4951 size_t size;
4952 void *reply_buf = NULL;
4953 __le64 snapid;
4954 void *p;
4955 void *end;
642a2537 4956 u64 pool_id;
86b00e0d 4957 char *image_id;
3b5cf2a2 4958 u64 snap_id;
86b00e0d 4959 u64 overlap;
86b00e0d
AE
4960 int ret;
4961
4962 parent_spec = rbd_spec_alloc();
4963 if (!parent_spec)
4964 return -ENOMEM;
4965
4966 size = sizeof (__le64) + /* pool_id */
4967 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4968 sizeof (__le64) + /* snap_id */
4969 sizeof (__le64); /* overlap */
4970 reply_buf = kmalloc(size, GFP_KERNEL);
4971 if (!reply_buf) {
4972 ret = -ENOMEM;
4973 goto out_err;
4974 }
4975
4d9b67cd 4976 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
4977 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4978 &rbd_dev->header_oloc, "get_parent",
4979 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4980 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
4981 if (ret < 0)
4982 goto out_err;
4983
86b00e0d 4984 p = reply_buf;
57385b51
AE
4985 end = reply_buf + ret;
4986 ret = -ERANGE;
642a2537 4987 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
4988 if (pool_id == CEPH_NOPOOL) {
4989 /*
4990 * Either the parent never existed, or we have
4991 * record of it but the image got flattened so it no
4992 * longer has a parent. When the parent of a
4993 * layered image disappears we immediately set the
4994 * overlap to 0. The effect of this is that all new
4995 * requests will be treated as if the image had no
4996 * parent.
4997 */
4998 if (rbd_dev->parent_overlap) {
4999 rbd_dev->parent_overlap = 0;
392a9dad
AE
5000 rbd_dev_parent_put(rbd_dev);
5001 pr_info("%s: clone image has been flattened\n",
5002 rbd_dev->disk->disk_name);
5003 }
5004
86b00e0d 5005 goto out; /* No parent? No problem. */
392a9dad 5006 }
86b00e0d 5007
0903e875
AE
5008 /* The ceph file layout needs to fit pool id in 32 bits */
5009
5010 ret = -EIO;
642a2537 5011 if (pool_id > (u64)U32_MAX) {
9584d508 5012 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 5013 (unsigned long long)pool_id, U32_MAX);
57385b51 5014 goto out_err;
c0cd10db 5015 }
0903e875 5016
979ed480 5017 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
5018 if (IS_ERR(image_id)) {
5019 ret = PTR_ERR(image_id);
5020 goto out_err;
5021 }
3b5cf2a2 5022 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
5023 ceph_decode_64_safe(&p, end, overlap, out_err);
5024
3b5cf2a2
AE
5025 /*
5026 * The parent won't change (except when the clone is
5027 * flattened, already handled that). So we only need to
5028 * record the parent spec we have not already done so.
5029 */
5030 if (!rbd_dev->parent_spec) {
5031 parent_spec->pool_id = pool_id;
5032 parent_spec->image_id = image_id;
5033 parent_spec->snap_id = snap_id;
70cf49cf
AE
5034 rbd_dev->parent_spec = parent_spec;
5035 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
5036 } else {
5037 kfree(image_id);
3b5cf2a2
AE
5038 }
5039
5040 /*
cf32bd9c
ID
5041 * We always update the parent overlap. If it's zero we issue
5042 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 5043 */
3b5cf2a2 5044 if (!overlap) {
3b5cf2a2 5045 if (parent_spec) {
cf32bd9c
ID
5046 /* refresh, careful to warn just once */
5047 if (rbd_dev->parent_overlap)
5048 rbd_warn(rbd_dev,
5049 "clone now standalone (overlap became 0)");
3b5cf2a2 5050 } else {
cf32bd9c
ID
5051 /* initial probe */
5052 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 5053 }
70cf49cf 5054 }
cf32bd9c
ID
5055 rbd_dev->parent_overlap = overlap;
5056
86b00e0d
AE
5057out:
5058 ret = 0;
5059out_err:
5060 kfree(reply_buf);
5061 rbd_spec_put(parent_spec);
5062
5063 return ret;
5064}
5065
cc070d59
AE
5066static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5067{
5068 struct {
5069 __le64 stripe_unit;
5070 __le64 stripe_count;
5071 } __attribute__ ((packed)) striping_info_buf = { 0 };
5072 size_t size = sizeof (striping_info_buf);
5073 void *p;
5074 u64 obj_size;
5075 u64 stripe_unit;
5076 u64 stripe_count;
5077 int ret;
5078
ecd4a68a
ID
5079 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5080 &rbd_dev->header_oloc, "get_stripe_unit_count",
5081 NULL, 0, &striping_info_buf, size);
cc070d59
AE
5082 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5083 if (ret < 0)
5084 return ret;
5085 if (ret < size)
5086 return -ERANGE;
5087
5088 /*
5089 * We don't actually support the "fancy striping" feature
5090 * (STRIPINGV2) yet, but if the striping sizes are the
5091 * defaults the behavior is the same as before. So find
5092 * out, and only fail if the image has non-default values.
5093 */
5094 ret = -EINVAL;
5bc3fb17 5095 obj_size = rbd_obj_bytes(&rbd_dev->header);
cc070d59
AE
5096 p = &striping_info_buf;
5097 stripe_unit = ceph_decode_64(&p);
5098 if (stripe_unit != obj_size) {
5099 rbd_warn(rbd_dev, "unsupported stripe unit "
5100 "(got %llu want %llu)",
5101 stripe_unit, obj_size);
5102 return -EINVAL;
5103 }
5104 stripe_count = ceph_decode_64(&p);
5105 if (stripe_count != 1) {
5106 rbd_warn(rbd_dev, "unsupported stripe count "
5107 "(got %llu want 1)", stripe_count);
5108 return -EINVAL;
5109 }
500d0c0f
AE
5110 rbd_dev->header.stripe_unit = stripe_unit;
5111 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
5112
5113 return 0;
5114}
5115
7e97332e
ID
5116static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5117{
5118 __le64 data_pool_id;
5119 int ret;
5120
5121 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5122 &rbd_dev->header_oloc, "get_data_pool",
5123 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5124 if (ret < 0)
5125 return ret;
5126 if (ret < sizeof(data_pool_id))
5127 return -EBADMSG;
5128
5129 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5130 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5131 return 0;
5132}
5133
9e15b77d
AE
5134static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5135{
ecd4a68a 5136 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
5137 size_t image_id_size;
5138 char *image_id;
5139 void *p;
5140 void *end;
5141 size_t size;
5142 void *reply_buf = NULL;
5143 size_t len = 0;
5144 char *image_name = NULL;
5145 int ret;
5146
5147 rbd_assert(!rbd_dev->spec->image_name);
5148
69e7a02f
AE
5149 len = strlen(rbd_dev->spec->image_id);
5150 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
5151 image_id = kmalloc(image_id_size, GFP_KERNEL);
5152 if (!image_id)
5153 return NULL;
5154
5155 p = image_id;
4157976b 5156 end = image_id + image_id_size;
57385b51 5157 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
5158
5159 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5160 reply_buf = kmalloc(size, GFP_KERNEL);
5161 if (!reply_buf)
5162 goto out;
5163
ecd4a68a
ID
5164 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5165 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5166 "dir_get_name", image_id, image_id_size,
5167 reply_buf, size);
9e15b77d
AE
5168 if (ret < 0)
5169 goto out;
5170 p = reply_buf;
f40eb349
AE
5171 end = reply_buf + ret;
5172
9e15b77d
AE
5173 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5174 if (IS_ERR(image_name))
5175 image_name = NULL;
5176 else
5177 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5178out:
5179 kfree(reply_buf);
5180 kfree(image_id);
5181
5182 return image_name;
5183}
5184
2ad3d716
AE
5185static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5186{
5187 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5188 const char *snap_name;
5189 u32 which = 0;
5190
5191 /* Skip over names until we find the one we are looking for */
5192
5193 snap_name = rbd_dev->header.snap_names;
5194 while (which < snapc->num_snaps) {
5195 if (!strcmp(name, snap_name))
5196 return snapc->snaps[which];
5197 snap_name += strlen(snap_name) + 1;
5198 which++;
5199 }
5200 return CEPH_NOSNAP;
5201}
5202
5203static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5204{
5205 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5206 u32 which;
5207 bool found = false;
5208 u64 snap_id;
5209
5210 for (which = 0; !found && which < snapc->num_snaps; which++) {
5211 const char *snap_name;
5212
5213 snap_id = snapc->snaps[which];
5214 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5215 if (IS_ERR(snap_name)) {
5216 /* ignore no-longer existing snapshots */
5217 if (PTR_ERR(snap_name) == -ENOENT)
5218 continue;
5219 else
5220 break;
5221 }
2ad3d716
AE
5222 found = !strcmp(name, snap_name);
5223 kfree(snap_name);
5224 }
5225 return found ? snap_id : CEPH_NOSNAP;
5226}
5227
5228/*
5229 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5230 * no snapshot by that name is found, or if an error occurs.
5231 */
5232static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5233{
5234 if (rbd_dev->image_format == 1)
5235 return rbd_v1_snap_id_by_name(rbd_dev, name);
5236
5237 return rbd_v2_snap_id_by_name(rbd_dev, name);
5238}
5239
9e15b77d 5240/*
04077599
ID
5241 * An image being mapped will have everything but the snap id.
5242 */
5243static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5244{
5245 struct rbd_spec *spec = rbd_dev->spec;
5246
5247 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5248 rbd_assert(spec->image_id && spec->image_name);
5249 rbd_assert(spec->snap_name);
5250
5251 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5252 u64 snap_id;
5253
5254 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5255 if (snap_id == CEPH_NOSNAP)
5256 return -ENOENT;
5257
5258 spec->snap_id = snap_id;
5259 } else {
5260 spec->snap_id = CEPH_NOSNAP;
5261 }
5262
5263 return 0;
5264}
5265
5266/*
5267 * A parent image will have all ids but none of the names.
e1d4213f 5268 *
04077599
ID
5269 * All names in an rbd spec are dynamically allocated. It's OK if we
5270 * can't figure out the name for an image id.
9e15b77d 5271 */
04077599 5272static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5273{
2e9f7f1c
AE
5274 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5275 struct rbd_spec *spec = rbd_dev->spec;
5276 const char *pool_name;
5277 const char *image_name;
5278 const char *snap_name;
9e15b77d
AE
5279 int ret;
5280
04077599
ID
5281 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5282 rbd_assert(spec->image_id);
5283 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5284
2e9f7f1c 5285 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5286
2e9f7f1c
AE
5287 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5288 if (!pool_name) {
5289 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5290 return -EIO;
5291 }
2e9f7f1c
AE
5292 pool_name = kstrdup(pool_name, GFP_KERNEL);
5293 if (!pool_name)
9e15b77d
AE
5294 return -ENOMEM;
5295
5296 /* Fetch the image name; tolerate failure here */
5297
2e9f7f1c
AE
5298 image_name = rbd_dev_image_name(rbd_dev);
5299 if (!image_name)
06ecc6cb 5300 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5301
04077599 5302 /* Fetch the snapshot name */
9e15b77d 5303
2e9f7f1c 5304 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5305 if (IS_ERR(snap_name)) {
5306 ret = PTR_ERR(snap_name);
9e15b77d 5307 goto out_err;
2e9f7f1c
AE
5308 }
5309
5310 spec->pool_name = pool_name;
5311 spec->image_name = image_name;
5312 spec->snap_name = snap_name;
9e15b77d
AE
5313
5314 return 0;
04077599 5315
9e15b77d 5316out_err:
2e9f7f1c
AE
5317 kfree(image_name);
5318 kfree(pool_name);
9e15b77d
AE
5319 return ret;
5320}
5321
cc4a38bd 5322static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5323{
5324 size_t size;
5325 int ret;
5326 void *reply_buf;
5327 void *p;
5328 void *end;
5329 u64 seq;
5330 u32 snap_count;
5331 struct ceph_snap_context *snapc;
5332 u32 i;
5333
5334 /*
5335 * We'll need room for the seq value (maximum snapshot id),
5336 * snapshot count, and array of that many snapshot ids.
5337 * For now we have a fixed upper limit on the number we're
5338 * prepared to receive.
5339 */
5340 size = sizeof (__le64) + sizeof (__le32) +
5341 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5342 reply_buf = kzalloc(size, GFP_KERNEL);
5343 if (!reply_buf)
5344 return -ENOMEM;
5345
ecd4a68a
ID
5346 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5347 &rbd_dev->header_oloc, "get_snapcontext",
5348 NULL, 0, reply_buf, size);
36be9a76 5349 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5350 if (ret < 0)
5351 goto out;
5352
35d489f9 5353 p = reply_buf;
57385b51
AE
5354 end = reply_buf + ret;
5355 ret = -ERANGE;
35d489f9
AE
5356 ceph_decode_64_safe(&p, end, seq, out);
5357 ceph_decode_32_safe(&p, end, snap_count, out);
5358
5359 /*
5360 * Make sure the reported number of snapshot ids wouldn't go
5361 * beyond the end of our buffer. But before checking that,
5362 * make sure the computed size of the snapshot context we
5363 * allocate is representable in a size_t.
5364 */
5365 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5366 / sizeof (u64)) {
5367 ret = -EINVAL;
5368 goto out;
5369 }
5370 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5371 goto out;
468521c1 5372 ret = 0;
35d489f9 5373
812164f8 5374 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5375 if (!snapc) {
5376 ret = -ENOMEM;
5377 goto out;
5378 }
35d489f9 5379 snapc->seq = seq;
35d489f9
AE
5380 for (i = 0; i < snap_count; i++)
5381 snapc->snaps[i] = ceph_decode_64(&p);
5382
49ece554 5383 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5384 rbd_dev->header.snapc = snapc;
5385
5386 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5387 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5388out:
5389 kfree(reply_buf);
5390
57385b51 5391 return ret;
35d489f9
AE
5392}
5393
54cac61f
AE
5394static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5395 u64 snap_id)
b8b1e2db
AE
5396{
5397 size_t size;
5398 void *reply_buf;
54cac61f 5399 __le64 snapid;
b8b1e2db
AE
5400 int ret;
5401 void *p;
5402 void *end;
b8b1e2db
AE
5403 char *snap_name;
5404
5405 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5406 reply_buf = kmalloc(size, GFP_KERNEL);
5407 if (!reply_buf)
5408 return ERR_PTR(-ENOMEM);
5409
54cac61f 5410 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
5411 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5412 &rbd_dev->header_oloc, "get_snapshot_name",
5413 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5414 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5415 if (ret < 0) {
5416 snap_name = ERR_PTR(ret);
b8b1e2db 5417 goto out;
f40eb349 5418 }
b8b1e2db
AE
5419
5420 p = reply_buf;
f40eb349 5421 end = reply_buf + ret;
e5c35534 5422 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5423 if (IS_ERR(snap_name))
b8b1e2db 5424 goto out;
b8b1e2db 5425
f40eb349 5426 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5427 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5428out:
5429 kfree(reply_buf);
5430
f40eb349 5431 return snap_name;
b8b1e2db
AE
5432}
5433
2df3fac7 5434static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5435{
2df3fac7 5436 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5437 int ret;
117973fb 5438
1617e40c
JD
5439 ret = rbd_dev_v2_image_size(rbd_dev);
5440 if (ret)
cfbf6377 5441 return ret;
1617e40c 5442
2df3fac7
AE
5443 if (first_time) {
5444 ret = rbd_dev_v2_header_onetime(rbd_dev);
5445 if (ret)
cfbf6377 5446 return ret;
2df3fac7
AE
5447 }
5448
cc4a38bd 5449 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5450 if (ret && first_time) {
5451 kfree(rbd_dev->header.object_prefix);
5452 rbd_dev->header.object_prefix = NULL;
5453 }
117973fb
AE
5454
5455 return ret;
5456}
5457
a720ae09
ID
5458static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5459{
5460 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5461
5462 if (rbd_dev->image_format == 1)
5463 return rbd_dev_v1_header_info(rbd_dev);
5464
5465 return rbd_dev_v2_header_info(rbd_dev);
5466}
5467
e28fff26
AE
5468/*
5469 * Skips over white space at *buf, and updates *buf to point to the
5470 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5471 * the token (string of non-white space characters) found. Note
5472 * that *buf must be terminated with '\0'.
e28fff26
AE
5473 */
5474static inline size_t next_token(const char **buf)
5475{
5476 /*
5477 * These are the characters that produce nonzero for
5478 * isspace() in the "C" and "POSIX" locales.
5479 */
5480 const char *spaces = " \f\n\r\t\v";
5481
5482 *buf += strspn(*buf, spaces); /* Find start of token */
5483
5484 return strcspn(*buf, spaces); /* Return token length */
5485}
5486
ea3352f4
AE
5487/*
5488 * Finds the next token in *buf, dynamically allocates a buffer big
5489 * enough to hold a copy of it, and copies the token into the new
5490 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5491 * that a duplicate buffer is created even for a zero-length token.
5492 *
5493 * Returns a pointer to the newly-allocated duplicate, or a null
5494 * pointer if memory for the duplicate was not available. If
5495 * the lenp argument is a non-null pointer, the length of the token
5496 * (not including the '\0') is returned in *lenp.
5497 *
5498 * If successful, the *buf pointer will be updated to point beyond
5499 * the end of the found token.
5500 *
5501 * Note: uses GFP_KERNEL for allocation.
5502 */
5503static inline char *dup_token(const char **buf, size_t *lenp)
5504{
5505 char *dup;
5506 size_t len;
5507
5508 len = next_token(buf);
4caf35f9 5509 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5510 if (!dup)
5511 return NULL;
ea3352f4
AE
5512 *(dup + len) = '\0';
5513 *buf += len;
5514
5515 if (lenp)
5516 *lenp = len;
5517
5518 return dup;
5519}
5520
a725f65e 5521/*
859c31df
AE
5522 * Parse the options provided for an "rbd add" (i.e., rbd image
5523 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5524 * and the data written is passed here via a NUL-terminated buffer.
5525 * Returns 0 if successful or an error code otherwise.
d22f76e7 5526 *
859c31df
AE
5527 * The information extracted from these options is recorded in
5528 * the other parameters which return dynamically-allocated
5529 * structures:
5530 * ceph_opts
5531 * The address of a pointer that will refer to a ceph options
5532 * structure. Caller must release the returned pointer using
5533 * ceph_destroy_options() when it is no longer needed.
5534 * rbd_opts
5535 * Address of an rbd options pointer. Fully initialized by
5536 * this function; caller must release with kfree().
5537 * spec
5538 * Address of an rbd image specification pointer. Fully
5539 * initialized by this function based on parsed options.
5540 * Caller must release with rbd_spec_put().
5541 *
5542 * The options passed take this form:
5543 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5544 * where:
5545 * <mon_addrs>
5546 * A comma-separated list of one or more monitor addresses.
5547 * A monitor address is an ip address, optionally followed
5548 * by a port number (separated by a colon).
5549 * I.e.: ip1[:port1][,ip2[:port2]...]
5550 * <options>
5551 * A comma-separated list of ceph and/or rbd options.
5552 * <pool_name>
5553 * The name of the rados pool containing the rbd image.
5554 * <image_name>
5555 * The name of the image in that pool to map.
5556 * <snap_id>
5557 * An optional snapshot id. If provided, the mapping will
5558 * present data from the image at the time that snapshot was
5559 * created. The image head is used if no snapshot id is
5560 * provided. Snapshot mappings are always read-only.
a725f65e 5561 */
859c31df 5562static int rbd_add_parse_args(const char *buf,
dc79b113 5563 struct ceph_options **ceph_opts,
859c31df
AE
5564 struct rbd_options **opts,
5565 struct rbd_spec **rbd_spec)
e28fff26 5566{
d22f76e7 5567 size_t len;
859c31df 5568 char *options;
0ddebc0c 5569 const char *mon_addrs;
ecb4dc22 5570 char *snap_name;
0ddebc0c 5571 size_t mon_addrs_size;
859c31df 5572 struct rbd_spec *spec = NULL;
4e9afeba 5573 struct rbd_options *rbd_opts = NULL;
859c31df 5574 struct ceph_options *copts;
dc79b113 5575 int ret;
e28fff26
AE
5576
5577 /* The first four tokens are required */
5578
7ef3214a 5579 len = next_token(&buf);
4fb5d671
AE
5580 if (!len) {
5581 rbd_warn(NULL, "no monitor address(es) provided");
5582 return -EINVAL;
5583 }
0ddebc0c 5584 mon_addrs = buf;
f28e565a 5585 mon_addrs_size = len + 1;
7ef3214a 5586 buf += len;
a725f65e 5587
dc79b113 5588 ret = -EINVAL;
f28e565a
AE
5589 options = dup_token(&buf, NULL);
5590 if (!options)
dc79b113 5591 return -ENOMEM;
4fb5d671
AE
5592 if (!*options) {
5593 rbd_warn(NULL, "no options provided");
5594 goto out_err;
5595 }
e28fff26 5596
859c31df
AE
5597 spec = rbd_spec_alloc();
5598 if (!spec)
f28e565a 5599 goto out_mem;
859c31df
AE
5600
5601 spec->pool_name = dup_token(&buf, NULL);
5602 if (!spec->pool_name)
5603 goto out_mem;
4fb5d671
AE
5604 if (!*spec->pool_name) {
5605 rbd_warn(NULL, "no pool name provided");
5606 goto out_err;
5607 }
e28fff26 5608
69e7a02f 5609 spec->image_name = dup_token(&buf, NULL);
859c31df 5610 if (!spec->image_name)
f28e565a 5611 goto out_mem;
4fb5d671
AE
5612 if (!*spec->image_name) {
5613 rbd_warn(NULL, "no image name provided");
5614 goto out_err;
5615 }
d4b125e9 5616
f28e565a
AE
5617 /*
5618 * Snapshot name is optional; default is to use "-"
5619 * (indicating the head/no snapshot).
5620 */
3feeb894 5621 len = next_token(&buf);
820a5f3e 5622 if (!len) {
3feeb894
AE
5623 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5624 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5625 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5626 ret = -ENAMETOOLONG;
f28e565a 5627 goto out_err;
849b4260 5628 }
ecb4dc22
AE
5629 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5630 if (!snap_name)
f28e565a 5631 goto out_mem;
ecb4dc22
AE
5632 *(snap_name + len) = '\0';
5633 spec->snap_name = snap_name;
e5c35534 5634
0ddebc0c 5635 /* Initialize all rbd options to the defaults */
e28fff26 5636
4e9afeba
AE
5637 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5638 if (!rbd_opts)
5639 goto out_mem;
5640
5641 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5642 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5643 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
e010dd0a 5644 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
d22f76e7 5645
859c31df 5646 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5647 mon_addrs + mon_addrs_size - 1,
4e9afeba 5648 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5649 if (IS_ERR(copts)) {
5650 ret = PTR_ERR(copts);
dc79b113
AE
5651 goto out_err;
5652 }
859c31df
AE
5653 kfree(options);
5654
5655 *ceph_opts = copts;
4e9afeba 5656 *opts = rbd_opts;
859c31df 5657 *rbd_spec = spec;
0ddebc0c 5658
dc79b113 5659 return 0;
f28e565a 5660out_mem:
dc79b113 5661 ret = -ENOMEM;
d22f76e7 5662out_err:
859c31df
AE
5663 kfree(rbd_opts);
5664 rbd_spec_put(spec);
f28e565a 5665 kfree(options);
d22f76e7 5666
dc79b113 5667 return ret;
a725f65e
AE
5668}
5669
30ba1f02
ID
5670/*
5671 * Return pool id (>= 0) or a negative error code.
5672 */
5673static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5674{
a319bf56 5675 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5676 u64 newest_epoch;
30ba1f02
ID
5677 int tries = 0;
5678 int ret;
5679
5680again:
5681 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5682 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5683 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5684 &newest_epoch);
30ba1f02
ID
5685 if (ret < 0)
5686 return ret;
5687
5688 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5689 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5690 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5691 newest_epoch,
5692 opts->mount_timeout);
30ba1f02
ID
5693 goto again;
5694 } else {
5695 /* the osdmap we have is new enough */
5696 return -ENOENT;
5697 }
5698 }
5699
5700 return ret;
5701}
5702
e010dd0a
ID
5703static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5704{
5705 down_write(&rbd_dev->lock_rwsem);
5706 if (__rbd_is_lock_owner(rbd_dev))
5707 rbd_unlock(rbd_dev);
5708 up_write(&rbd_dev->lock_rwsem);
5709}
5710
5711static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5712{
5713 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5714 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5715 return -EINVAL;
5716 }
5717
5718 /* FIXME: "rbd map --exclusive" should be in interruptible */
5719 down_read(&rbd_dev->lock_rwsem);
5720 rbd_wait_state_locked(rbd_dev);
5721 up_read(&rbd_dev->lock_rwsem);
5722 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5723 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5724 return -EROFS;
5725 }
5726
5727 return 0;
5728}
5729
589d30e0
AE
5730/*
5731 * An rbd format 2 image has a unique identifier, distinct from the
5732 * name given to it by the user. Internally, that identifier is
5733 * what's used to specify the names of objects related to the image.
5734 *
5735 * A special "rbd id" object is used to map an rbd image name to its
5736 * id. If that object doesn't exist, then there is no v2 rbd image
5737 * with the supplied name.
5738 *
5739 * This function will record the given rbd_dev's image_id field if
5740 * it can be determined, and in that case will return 0. If any
5741 * errors occur a negative errno will be returned and the rbd_dev's
5742 * image_id field will be unchanged (and should be NULL).
5743 */
5744static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5745{
5746 int ret;
5747 size_t size;
ecd4a68a 5748 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5749 void *response;
c0fba368 5750 char *image_id;
2f82ee54 5751
2c0d0a10
AE
5752 /*
5753 * When probing a parent image, the image id is already
5754 * known (and the image name likely is not). There's no
c0fba368
AE
5755 * need to fetch the image id again in this case. We
5756 * do still need to set the image format though.
2c0d0a10 5757 */
c0fba368
AE
5758 if (rbd_dev->spec->image_id) {
5759 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5760
2c0d0a10 5761 return 0;
c0fba368 5762 }
2c0d0a10 5763
589d30e0
AE
5764 /*
5765 * First, see if the format 2 image id file exists, and if
5766 * so, get the image's persistent id from it.
5767 */
ecd4a68a
ID
5768 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5769 rbd_dev->spec->image_name);
5770 if (ret)
5771 return ret;
5772
5773 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5774
5775 /* Response will be an encoded string, which includes a length */
5776
5777 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5778 response = kzalloc(size, GFP_NOIO);
5779 if (!response) {
5780 ret = -ENOMEM;
5781 goto out;
5782 }
5783
c0fba368
AE
5784 /* If it doesn't exist we'll assume it's a format 1 image */
5785
ecd4a68a
ID
5786 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5787 "get_id", NULL, 0,
5788 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5789 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5790 if (ret == -ENOENT) {
5791 image_id = kstrdup("", GFP_KERNEL);
5792 ret = image_id ? 0 : -ENOMEM;
5793 if (!ret)
5794 rbd_dev->image_format = 1;
7dd440c9 5795 } else if (ret >= 0) {
c0fba368
AE
5796 void *p = response;
5797
5798 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5799 NULL, GFP_NOIO);
461f758a 5800 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5801 if (!ret)
5802 rbd_dev->image_format = 2;
c0fba368
AE
5803 }
5804
5805 if (!ret) {
5806 rbd_dev->spec->image_id = image_id;
5807 dout("image_id is %s\n", image_id);
589d30e0
AE
5808 }
5809out:
5810 kfree(response);
ecd4a68a 5811 ceph_oid_destroy(&oid);
589d30e0
AE
5812 return ret;
5813}
5814
3abef3b3
AE
5815/*
5816 * Undo whatever state changes are made by v1 or v2 header info
5817 * call.
5818 */
6fd48b3b
AE
5819static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5820{
5821 struct rbd_image_header *header;
5822
e69b8d41 5823 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5824
5825 /* Free dynamic fields from the header, then zero it out */
5826
5827 header = &rbd_dev->header;
812164f8 5828 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5829 kfree(header->snap_sizes);
5830 kfree(header->snap_names);
5831 kfree(header->object_prefix);
5832 memset(header, 0, sizeof (*header));
5833}
5834
2df3fac7 5835static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5836{
5837 int ret;
a30b71b9 5838
1e130199 5839 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5840 if (ret)
b1b5402a
AE
5841 goto out_err;
5842
2df3fac7
AE
5843 /*
5844 * Get the and check features for the image. Currently the
5845 * features are assumed to never change.
5846 */
b1b5402a 5847 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5848 if (ret)
9d475de5 5849 goto out_err;
35d489f9 5850
cc070d59
AE
5851 /* If the image supports fancy striping, get its parameters */
5852
5853 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5854 ret = rbd_dev_v2_striping_info(rbd_dev);
5855 if (ret < 0)
5856 goto out_err;
5857 }
a30b71b9 5858
7e97332e
ID
5859 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5860 ret = rbd_dev_v2_data_pool(rbd_dev);
5861 if (ret)
5862 goto out_err;
5863 }
5864
263423f8 5865 rbd_init_layout(rbd_dev);
35152979 5866 return 0;
263423f8 5867
9d475de5 5868out_err:
642a2537 5869 rbd_dev->header.features = 0;
1e130199
AE
5870 kfree(rbd_dev->header.object_prefix);
5871 rbd_dev->header.object_prefix = NULL;
9d475de5 5872 return ret;
a30b71b9
AE
5873}
5874
6d69bb53
ID
5875/*
5876 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5877 * rbd_dev_image_probe() recursion depth, which means it's also the
5878 * length of the already discovered part of the parent chain.
5879 */
5880static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5881{
2f82ee54 5882 struct rbd_device *parent = NULL;
124afba2
AE
5883 int ret;
5884
5885 if (!rbd_dev->parent_spec)
5886 return 0;
124afba2 5887
6d69bb53
ID
5888 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5889 pr_info("parent chain is too long (%d)\n", depth);
5890 ret = -EINVAL;
5891 goto out_err;
5892 }
5893
1643dfa4 5894 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5895 if (!parent) {
5896 ret = -ENOMEM;
124afba2 5897 goto out_err;
1f2c6651
ID
5898 }
5899
5900 /*
5901 * Images related by parent/child relationships always share
5902 * rbd_client and spec/parent_spec, so bump their refcounts.
5903 */
5904 __rbd_get_client(rbd_dev->rbd_client);
5905 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5906
6d69bb53 5907 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5908 if (ret < 0)
5909 goto out_err;
1f2c6651 5910
124afba2 5911 rbd_dev->parent = parent;
a2acd00e 5912 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5913 return 0;
1f2c6651 5914
124afba2 5915out_err:
1f2c6651 5916 rbd_dev_unparent(rbd_dev);
1761b229 5917 rbd_dev_destroy(parent);
124afba2
AE
5918 return ret;
5919}
5920
5769ed0c
ID
5921static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5922{
5923 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5924 rbd_dev_mapping_clear(rbd_dev);
5925 rbd_free_disk(rbd_dev);
5926 if (!single_major)
5927 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5928}
5929
811c6688
ID
5930/*
5931 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5932 * upon return.
5933 */
200a6a8b 5934static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5935{
83a06263 5936 int ret;
d1cf5788 5937
9b60e70b 5938 /* Record our major and minor device numbers. */
83a06263 5939
9b60e70b
ID
5940 if (!single_major) {
5941 ret = register_blkdev(0, rbd_dev->name);
5942 if (ret < 0)
1643dfa4 5943 goto err_out_unlock;
9b60e70b
ID
5944
5945 rbd_dev->major = ret;
5946 rbd_dev->minor = 0;
5947 } else {
5948 rbd_dev->major = rbd_major;
5949 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5950 }
83a06263
AE
5951
5952 /* Set up the blkdev mapping. */
5953
5954 ret = rbd_init_disk(rbd_dev);
5955 if (ret)
5956 goto err_out_blkdev;
5957
f35a4dee 5958 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5959 if (ret)
5960 goto err_out_disk;
bc1ecc65 5961
f35a4dee 5962 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
9568c93e 5963 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
f35a4dee 5964
5769ed0c 5965 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 5966 if (ret)
f5ee37bd 5967 goto err_out_mapping;
83a06263 5968
129b79d4 5969 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5970 up_write(&rbd_dev->header_rwsem);
5769ed0c 5971 return 0;
2f82ee54 5972
f35a4dee
AE
5973err_out_mapping:
5974 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5975err_out_disk:
5976 rbd_free_disk(rbd_dev);
5977err_out_blkdev:
9b60e70b
ID
5978 if (!single_major)
5979 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5980err_out_unlock:
5981 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5982 return ret;
5983}
5984
332bb12d
AE
5985static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5986{
5987 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5988 int ret;
332bb12d
AE
5989
5990 /* Record the header object name for this rbd image. */
5991
5992 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 5993 if (rbd_dev->image_format == 1)
c41d13a3
ID
5994 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5995 spec->image_name, RBD_SUFFIX);
332bb12d 5996 else
c41d13a3
ID
5997 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5998 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5999
c41d13a3 6000 return ret;
332bb12d
AE
6001}
6002
200a6a8b
AE
6003static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6004{
6fd48b3b 6005 rbd_dev_unprobe(rbd_dev);
fd22aef8
ID
6006 if (rbd_dev->opts)
6007 rbd_unregister_watch(rbd_dev);
6fd48b3b
AE
6008 rbd_dev->image_format = 0;
6009 kfree(rbd_dev->spec->image_id);
6010 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
6011}
6012
a30b71b9
AE
6013/*
6014 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
6015 * device. If this image is the one being mapped (i.e., not a
6016 * parent), initiate a watch on its header object before using that
6017 * object to get detailed information about the rbd image.
a30b71b9 6018 */
6d69bb53 6019static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
6020{
6021 int ret;
6022
6023 /*
3abef3b3
AE
6024 * Get the id from the image id object. Unless there's an
6025 * error, rbd_dev->spec->image_id will be filled in with
6026 * a dynamically-allocated string, and rbd_dev->image_format
6027 * will be set to either 1 or 2.
a30b71b9
AE
6028 */
6029 ret = rbd_dev_image_id(rbd_dev);
6030 if (ret)
c0fba368 6031 return ret;
c0fba368 6032
332bb12d
AE
6033 ret = rbd_dev_header_name(rbd_dev);
6034 if (ret)
6035 goto err_out_format;
6036
6d69bb53 6037 if (!depth) {
99d16943 6038 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
6039 if (ret) {
6040 if (ret == -ENOENT)
6041 pr_info("image %s/%s does not exist\n",
6042 rbd_dev->spec->pool_name,
6043 rbd_dev->spec->image_name);
c41d13a3 6044 goto err_out_format;
1fe48023 6045 }
1f3ef788 6046 }
b644de2b 6047
a720ae09 6048 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 6049 if (ret)
b644de2b 6050 goto err_out_watch;
83a06263 6051
04077599
ID
6052 /*
6053 * If this image is the one being mapped, we have pool name and
6054 * id, image name and id, and snap name - need to fill snap id.
6055 * Otherwise this is a parent image, identified by pool, image
6056 * and snap ids - need to fill in names for those ids.
6057 */
6d69bb53 6058 if (!depth)
04077599
ID
6059 ret = rbd_spec_fill_snap_id(rbd_dev);
6060 else
6061 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
6062 if (ret) {
6063 if (ret == -ENOENT)
6064 pr_info("snap %s/%s@%s does not exist\n",
6065 rbd_dev->spec->pool_name,
6066 rbd_dev->spec->image_name,
6067 rbd_dev->spec->snap_name);
33dca39f 6068 goto err_out_probe;
1fe48023 6069 }
9bb81c9b 6070
e8f59b59
ID
6071 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6072 ret = rbd_dev_v2_parent_info(rbd_dev);
6073 if (ret)
6074 goto err_out_probe;
6075
6076 /*
6077 * Need to warn users if this image is the one being
6078 * mapped and has a parent.
6079 */
6d69bb53 6080 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
6081 rbd_warn(rbd_dev,
6082 "WARNING: kernel layering is EXPERIMENTAL!");
6083 }
6084
6d69bb53 6085 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
6086 if (ret)
6087 goto err_out_probe;
6088
6089 dout("discovered format %u image, header name is %s\n",
c41d13a3 6090 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 6091 return 0;
e8f59b59 6092
6fd48b3b
AE
6093err_out_probe:
6094 rbd_dev_unprobe(rbd_dev);
b644de2b 6095err_out_watch:
6d69bb53 6096 if (!depth)
99d16943 6097 rbd_unregister_watch(rbd_dev);
332bb12d
AE
6098err_out_format:
6099 rbd_dev->image_format = 0;
5655c4d9
AE
6100 kfree(rbd_dev->spec->image_id);
6101 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
6102 return ret;
6103}
6104
9b60e70b
ID
6105static ssize_t do_rbd_add(struct bus_type *bus,
6106 const char *buf,
6107 size_t count)
602adf40 6108{
cb8627c7 6109 struct rbd_device *rbd_dev = NULL;
dc79b113 6110 struct ceph_options *ceph_opts = NULL;
4e9afeba 6111 struct rbd_options *rbd_opts = NULL;
859c31df 6112 struct rbd_spec *spec = NULL;
9d3997fd 6113 struct rbd_client *rbdc;
b51c83c2 6114 int rc;
602adf40
YS
6115
6116 if (!try_module_get(THIS_MODULE))
6117 return -ENODEV;
6118
602adf40 6119 /* parse add command */
859c31df 6120 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 6121 if (rc < 0)
dd5ac32d 6122 goto out;
78cea76e 6123
9d3997fd
AE
6124 rbdc = rbd_get_client(ceph_opts);
6125 if (IS_ERR(rbdc)) {
6126 rc = PTR_ERR(rbdc);
0ddebc0c 6127 goto err_out_args;
9d3997fd 6128 }
602adf40 6129
602adf40 6130 /* pick the pool */
30ba1f02 6131 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
6132 if (rc < 0) {
6133 if (rc == -ENOENT)
6134 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 6135 goto err_out_client;
1fe48023 6136 }
c0cd10db 6137 spec->pool_id = (u64)rc;
859c31df 6138
d147543d 6139 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
6140 if (!rbd_dev) {
6141 rc = -ENOMEM;
bd4ba655 6142 goto err_out_client;
b51c83c2 6143 }
c53d5893
AE
6144 rbdc = NULL; /* rbd_dev now owns this */
6145 spec = NULL; /* rbd_dev now owns this */
d147543d 6146 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 6147
0d6d1e9c
MC
6148 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6149 if (!rbd_dev->config_info) {
6150 rc = -ENOMEM;
6151 goto err_out_rbd_dev;
6152 }
6153
811c6688 6154 down_write(&rbd_dev->header_rwsem);
6d69bb53 6155 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
6156 if (rc < 0) {
6157 up_write(&rbd_dev->header_rwsem);
c53d5893 6158 goto err_out_rbd_dev;
0d6d1e9c 6159 }
05fd6f6f 6160
7ce4eef7 6161 /* If we are mapping a snapshot it must be marked read-only */
7ce4eef7 6162 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9568c93e 6163 rbd_dev->opts->read_only = true;
7ce4eef7 6164
b536f69a 6165 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 6166 if (rc)
8b679ec5 6167 goto err_out_image_probe;
3abef3b3 6168
e010dd0a
ID
6169 if (rbd_dev->opts->exclusive) {
6170 rc = rbd_add_acquire_lock(rbd_dev);
6171 if (rc)
6172 goto err_out_device_setup;
3abef3b3
AE
6173 }
6174
5769ed0c
ID
6175 /* Everything's ready. Announce the disk to the world. */
6176
6177 rc = device_add(&rbd_dev->dev);
6178 if (rc)
e010dd0a 6179 goto err_out_image_lock;
5769ed0c
ID
6180
6181 add_disk(rbd_dev->disk);
6182 /* see rbd_init_disk() */
6183 blk_put_queue(rbd_dev->disk->queue);
6184
6185 spin_lock(&rbd_dev_list_lock);
6186 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6187 spin_unlock(&rbd_dev_list_lock);
6188
6189 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6190 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6191 rbd_dev->header.features);
dd5ac32d
ID
6192 rc = count;
6193out:
6194 module_put(THIS_MODULE);
6195 return rc;
b536f69a 6196
e010dd0a
ID
6197err_out_image_lock:
6198 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
6199err_out_device_setup:
6200 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
6201err_out_image_probe:
6202 rbd_dev_image_release(rbd_dev);
c53d5893
AE
6203err_out_rbd_dev:
6204 rbd_dev_destroy(rbd_dev);
bd4ba655 6205err_out_client:
9d3997fd 6206 rbd_put_client(rbdc);
0ddebc0c 6207err_out_args:
859c31df 6208 rbd_spec_put(spec);
d147543d 6209 kfree(rbd_opts);
dd5ac32d 6210 goto out;
602adf40
YS
6211}
6212
9b60e70b
ID
6213static ssize_t rbd_add(struct bus_type *bus,
6214 const char *buf,
6215 size_t count)
6216{
6217 if (single_major)
6218 return -EINVAL;
6219
6220 return do_rbd_add(bus, buf, count);
6221}
6222
6223static ssize_t rbd_add_single_major(struct bus_type *bus,
6224 const char *buf,
6225 size_t count)
6226{
6227 return do_rbd_add(bus, buf, count);
6228}
6229
05a46afd
AE
6230static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6231{
ad945fc1 6232 while (rbd_dev->parent) {
05a46afd
AE
6233 struct rbd_device *first = rbd_dev;
6234 struct rbd_device *second = first->parent;
6235 struct rbd_device *third;
6236
6237 /*
6238 * Follow to the parent with no grandparent and
6239 * remove it.
6240 */
6241 while (second && (third = second->parent)) {
6242 first = second;
6243 second = third;
6244 }
ad945fc1 6245 rbd_assert(second);
8ad42cd0 6246 rbd_dev_image_release(second);
8b679ec5 6247 rbd_dev_destroy(second);
ad945fc1
AE
6248 first->parent = NULL;
6249 first->parent_overlap = 0;
6250
6251 rbd_assert(first->parent_spec);
05a46afd
AE
6252 rbd_spec_put(first->parent_spec);
6253 first->parent_spec = NULL;
05a46afd
AE
6254 }
6255}
6256
9b60e70b
ID
6257static ssize_t do_rbd_remove(struct bus_type *bus,
6258 const char *buf,
6259 size_t count)
602adf40
YS
6260{
6261 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6262 struct list_head *tmp;
6263 int dev_id;
0276dca6 6264 char opt_buf[6];
82a442d2 6265 bool already = false;
0276dca6 6266 bool force = false;
0d8189e1 6267 int ret;
602adf40 6268
0276dca6
MC
6269 dev_id = -1;
6270 opt_buf[0] = '\0';
6271 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6272 if (dev_id < 0) {
6273 pr_err("dev_id out of range\n");
602adf40 6274 return -EINVAL;
0276dca6
MC
6275 }
6276 if (opt_buf[0] != '\0') {
6277 if (!strcmp(opt_buf, "force")) {
6278 force = true;
6279 } else {
6280 pr_err("bad remove option at '%s'\n", opt_buf);
6281 return -EINVAL;
6282 }
6283 }
602adf40 6284
751cc0e3
AE
6285 ret = -ENOENT;
6286 spin_lock(&rbd_dev_list_lock);
6287 list_for_each(tmp, &rbd_dev_list) {
6288 rbd_dev = list_entry(tmp, struct rbd_device, node);
6289 if (rbd_dev->dev_id == dev_id) {
6290 ret = 0;
6291 break;
6292 }
42382b70 6293 }
751cc0e3
AE
6294 if (!ret) {
6295 spin_lock_irq(&rbd_dev->lock);
0276dca6 6296 if (rbd_dev->open_count && !force)
751cc0e3
AE
6297 ret = -EBUSY;
6298 else
82a442d2
AE
6299 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6300 &rbd_dev->flags);
751cc0e3
AE
6301 spin_unlock_irq(&rbd_dev->lock);
6302 }
6303 spin_unlock(&rbd_dev_list_lock);
82a442d2 6304 if (ret < 0 || already)
1ba0f1e7 6305 return ret;
751cc0e3 6306
0276dca6
MC
6307 if (force) {
6308 /*
6309 * Prevent new IO from being queued and wait for existing
6310 * IO to complete/fail.
6311 */
6312 blk_mq_freeze_queue(rbd_dev->disk->queue);
6313 blk_set_queue_dying(rbd_dev->disk->queue);
6314 }
6315
5769ed0c
ID
6316 del_gendisk(rbd_dev->disk);
6317 spin_lock(&rbd_dev_list_lock);
6318 list_del_init(&rbd_dev->node);
6319 spin_unlock(&rbd_dev_list_lock);
6320 device_del(&rbd_dev->dev);
fca27065 6321
e010dd0a 6322 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 6323 rbd_dev_device_release(rbd_dev);
8ad42cd0 6324 rbd_dev_image_release(rbd_dev);
8b679ec5 6325 rbd_dev_destroy(rbd_dev);
1ba0f1e7 6326 return count;
602adf40
YS
6327}
6328
9b60e70b
ID
6329static ssize_t rbd_remove(struct bus_type *bus,
6330 const char *buf,
6331 size_t count)
6332{
6333 if (single_major)
6334 return -EINVAL;
6335
6336 return do_rbd_remove(bus, buf, count);
6337}
6338
6339static ssize_t rbd_remove_single_major(struct bus_type *bus,
6340 const char *buf,
6341 size_t count)
6342{
6343 return do_rbd_remove(bus, buf, count);
6344}
6345
602adf40
YS
6346/*
6347 * create control files in sysfs
dfc5606d 6348 * /sys/bus/rbd/...
602adf40
YS
6349 */
6350static int rbd_sysfs_init(void)
6351{
dfc5606d 6352 int ret;
602adf40 6353
fed4c143 6354 ret = device_register(&rbd_root_dev);
21079786 6355 if (ret < 0)
dfc5606d 6356 return ret;
602adf40 6357
fed4c143
AE
6358 ret = bus_register(&rbd_bus_type);
6359 if (ret < 0)
6360 device_unregister(&rbd_root_dev);
602adf40 6361
602adf40
YS
6362 return ret;
6363}
6364
6365static void rbd_sysfs_cleanup(void)
6366{
dfc5606d 6367 bus_unregister(&rbd_bus_type);
fed4c143 6368 device_unregister(&rbd_root_dev);
602adf40
YS
6369}
6370
1c2a9dfe
AE
6371static int rbd_slab_init(void)
6372{
6373 rbd_assert(!rbd_img_request_cache);
03d94406 6374 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6375 if (!rbd_img_request_cache)
6376 return -ENOMEM;
6377
6378 rbd_assert(!rbd_obj_request_cache);
03d94406 6379 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6380 if (!rbd_obj_request_cache)
6381 goto out_err;
6382
f856dc36
N
6383 rbd_assert(!rbd_bio_clone);
6384 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6385 if (!rbd_bio_clone)
6386 goto out_err_clone;
6387
6c696d85 6388 return 0;
1c2a9dfe 6389
f856dc36
N
6390out_err_clone:
6391 kmem_cache_destroy(rbd_obj_request_cache);
6392 rbd_obj_request_cache = NULL;
6c696d85 6393out_err:
868311b1
AE
6394 kmem_cache_destroy(rbd_img_request_cache);
6395 rbd_img_request_cache = NULL;
1c2a9dfe
AE
6396 return -ENOMEM;
6397}
6398
6399static void rbd_slab_exit(void)
6400{
868311b1
AE
6401 rbd_assert(rbd_obj_request_cache);
6402 kmem_cache_destroy(rbd_obj_request_cache);
6403 rbd_obj_request_cache = NULL;
6404
1c2a9dfe
AE
6405 rbd_assert(rbd_img_request_cache);
6406 kmem_cache_destroy(rbd_img_request_cache);
6407 rbd_img_request_cache = NULL;
f856dc36
N
6408
6409 rbd_assert(rbd_bio_clone);
6410 bioset_free(rbd_bio_clone);
6411 rbd_bio_clone = NULL;
1c2a9dfe
AE
6412}
6413
cc344fa1 6414static int __init rbd_init(void)
602adf40
YS
6415{
6416 int rc;
6417
1e32d34c
AE
6418 if (!libceph_compatible(NULL)) {
6419 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6420 return -EINVAL;
6421 }
e1b4d96d 6422
1c2a9dfe 6423 rc = rbd_slab_init();
602adf40
YS
6424 if (rc)
6425 return rc;
e1b4d96d 6426
f5ee37bd
ID
6427 /*
6428 * The number of active work items is limited by the number of
f77303bd 6429 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6430 */
6431 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6432 if (!rbd_wq) {
6433 rc = -ENOMEM;
6434 goto err_out_slab;
6435 }
6436
9b60e70b
ID
6437 if (single_major) {
6438 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6439 if (rbd_major < 0) {
6440 rc = rbd_major;
f5ee37bd 6441 goto err_out_wq;
9b60e70b
ID
6442 }
6443 }
6444
1c2a9dfe
AE
6445 rc = rbd_sysfs_init();
6446 if (rc)
9b60e70b
ID
6447 goto err_out_blkdev;
6448
6449 if (single_major)
6450 pr_info("loaded (major %d)\n", rbd_major);
6451 else
6452 pr_info("loaded\n");
1c2a9dfe 6453
e1b4d96d
ID
6454 return 0;
6455
9b60e70b
ID
6456err_out_blkdev:
6457 if (single_major)
6458 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6459err_out_wq:
6460 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6461err_out_slab:
6462 rbd_slab_exit();
1c2a9dfe 6463 return rc;
602adf40
YS
6464}
6465
cc344fa1 6466static void __exit rbd_exit(void)
602adf40 6467{
ffe312cf 6468 ida_destroy(&rbd_dev_id_ida);
602adf40 6469 rbd_sysfs_cleanup();
9b60e70b
ID
6470 if (single_major)
6471 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6472 destroy_workqueue(rbd_wq);
1c2a9dfe 6473 rbd_slab_exit();
602adf40
YS
6474}
6475
6476module_init(rbd_init);
6477module_exit(rbd_exit);
6478
d552c619 6479MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6480MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6481MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6482/* following authorship retained from original osdblk.c */
6483MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6484
90da258b 6485MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6486MODULE_LICENSE("GPL");