rbd: move rbd_get_client() below rbd_put_client()
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
43df3d35 35#include <linux/ceph/striper.h>
602adf40 36#include <linux/ceph/decode.h>
59c2be1e 37#include <linux/parser.h>
30d1cff8 38#include <linux/bsearch.h>
602adf40
YS
39
40#include <linux/kernel.h>
41#include <linux/device.h>
42#include <linux/module.h>
7ad18afa 43#include <linux/blk-mq.h>
602adf40
YS
44#include <linux/fs.h>
45#include <linux/blkdev.h>
1c2a9dfe 46#include <linux/slab.h>
f8a22fc2 47#include <linux/idr.h>
bc1ecc65 48#include <linux/workqueue.h>
602adf40
YS
49
50#include "rbd_types.h"
51
aafb230e
AE
52#define RBD_DEBUG /* Activate rbd_assert() calls */
53
593a9e7b
AE
54/*
55 * The basic unit of block I/O is a sector. It is interpreted in a
56 * number of contexts in Linux (blk, bio, genhd), but the default is
57 * universally 512 bytes. These symbols are just slightly more
58 * meaningful than the bare numbers they represent.
59 */
60#define SECTOR_SHIFT 9
61#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
62
a2acd00e
AE
63/*
64 * Increment the given counter and return its updated value.
65 * If the counter is already 0 it will not be incremented.
66 * If the counter is already at its maximum value returns
67 * -EINVAL without updating it.
68 */
69static int atomic_inc_return_safe(atomic_t *v)
70{
71 unsigned int counter;
72
73 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
74 if (counter <= (unsigned int)INT_MAX)
75 return (int)counter;
76
77 atomic_dec(v);
78
79 return -EINVAL;
80}
81
82/* Decrement the counter. Return the resulting value, or -EINVAL */
83static int atomic_dec_return_safe(atomic_t *v)
84{
85 int counter;
86
87 counter = atomic_dec_return(v);
88 if (counter >= 0)
89 return counter;
90
91 atomic_inc(v);
92
93 return -EINVAL;
94}
95
f0f8cef5 96#define RBD_DRV_NAME "rbd"
602adf40 97
7e513d43
ID
98#define RBD_MINORS_PER_MAJOR 256
99#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 100
6d69bb53
ID
101#define RBD_MAX_PARENT_CHAIN_LEN 16
102
d4b125e9
AE
103#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
104#define RBD_MAX_SNAP_NAME_LEN \
105 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
106
35d489f9 107#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
108
109#define RBD_SNAP_HEAD_NAME "-"
110
9682fc6d
AE
111#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
112
9e15b77d
AE
113/* This allows a single page to hold an image name sent by OSD */
114#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 115#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 116
1e130199 117#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 118
ed95b21a 119#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
120#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
121
d889140c
AE
122/* Feature bits */
123
8767b293
ID
124#define RBD_FEATURE_LAYERING (1ULL<<0)
125#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
126#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
127#define RBD_FEATURE_DATA_POOL (1ULL<<7)
e573427a 128#define RBD_FEATURE_OPERATIONS (1ULL<<8)
8767b293 129
ed95b21a
ID
130#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
131 RBD_FEATURE_STRIPINGV2 | \
7e97332e 132 RBD_FEATURE_EXCLUSIVE_LOCK | \
e573427a
ID
133 RBD_FEATURE_DATA_POOL | \
134 RBD_FEATURE_OPERATIONS)
d889140c
AE
135
136/* Features supported by this (client software) implementation. */
137
770eba6e 138#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 139
81a89793
AE
140/*
141 * An RBD device name will be "rbd#", where the "rbd" comes from
142 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 143 */
602adf40
YS
144#define DEV_NAME_LEN 32
145
146/*
147 * block device image metadata (in-memory version)
148 */
149struct rbd_image_header {
f35a4dee 150 /* These six fields never change for a given rbd image */
849b4260 151 char *object_prefix;
602adf40 152 __u8 obj_order;
f35a4dee
AE
153 u64 stripe_unit;
154 u64 stripe_count;
7e97332e 155 s64 data_pool_id;
f35a4dee 156 u64 features; /* Might be changeable someday? */
602adf40 157
f84344f3
AE
158 /* The remaining fields need to be updated occasionally */
159 u64 image_size;
160 struct ceph_snap_context *snapc;
f35a4dee
AE
161 char *snap_names; /* format 1 only */
162 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
163};
164
0d7dbfce
AE
165/*
166 * An rbd image specification.
167 *
168 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
169 * identify an image. Each rbd_dev structure includes a pointer to
170 * an rbd_spec structure that encapsulates this identity.
171 *
172 * Each of the id's in an rbd_spec has an associated name. For a
173 * user-mapped image, the names are supplied and the id's associated
174 * with them are looked up. For a layered image, a parent image is
175 * defined by the tuple, and the names are looked up.
176 *
177 * An rbd_dev structure contains a parent_spec pointer which is
178 * non-null if the image it represents is a child in a layered
179 * image. This pointer will refer to the rbd_spec structure used
180 * by the parent rbd_dev for its own identity (i.e., the structure
181 * is shared between the parent and child).
182 *
183 * Since these structures are populated once, during the discovery
184 * phase of image construction, they are effectively immutable so
185 * we make no effort to synchronize access to them.
186 *
187 * Note that code herein does not assume the image name is known (it
188 * could be a null pointer).
0d7dbfce
AE
189 */
190struct rbd_spec {
191 u64 pool_id;
ecb4dc22 192 const char *pool_name;
0d7dbfce 193
ecb4dc22
AE
194 const char *image_id;
195 const char *image_name;
0d7dbfce
AE
196
197 u64 snap_id;
ecb4dc22 198 const char *snap_name;
0d7dbfce
AE
199
200 struct kref kref;
201};
202
602adf40 203/*
f0f8cef5 204 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
205 */
206struct rbd_client {
207 struct ceph_client *client;
208 struct kref kref;
209 struct list_head node;
210};
211
bf0d5f50 212struct rbd_img_request;
bf0d5f50 213
9969ebc5 214enum obj_request_type {
a1fbb5e7 215 OBJ_REQUEST_NODATA = 1,
5359a17d 216 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
7e07efb1 217 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
afb97888 218 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
9969ebc5 219};
bf0d5f50 220
6d2940c8 221enum obj_operation_type {
a1fbb5e7 222 OBJ_OP_READ = 1,
6d2940c8 223 OBJ_OP_WRITE,
90e98c52 224 OBJ_OP_DISCARD,
6d2940c8
GZ
225};
226
3da691bf
ID
227/*
228 * Writes go through the following state machine to deal with
229 * layering:
230 *
231 * need copyup
232 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
233 * | ^ |
234 * v \------------------------------/
235 * done
236 * ^
237 * |
238 * RBD_OBJ_WRITE_FLAT
239 *
240 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
241 * there is a parent or not.
242 */
243enum rbd_obj_write_state {
244 RBD_OBJ_WRITE_FLAT = 1,
245 RBD_OBJ_WRITE_GUARD,
246 RBD_OBJ_WRITE_COPYUP,
247};
248
bf0d5f50 249struct rbd_obj_request {
43df3d35 250 struct ceph_object_extent ex;
3da691bf
ID
251 union {
252 bool tried_parent; /* for reads */
253 enum rbd_obj_write_state write_state; /* for writes */
254 };
bf0d5f50 255
51c3509e 256 struct rbd_img_request *img_request;
86bd7998
ID
257 struct ceph_file_extent *img_extents;
258 u32 num_img_extents;
bf0d5f50 259
788e2df3 260 union {
5359a17d 261 struct ceph_bio_iter bio_pos;
788e2df3 262 struct {
7e07efb1
ID
263 struct ceph_bvec_iter bvec_pos;
264 u32 bvec_count;
afb97888 265 u32 bvec_idx;
788e2df3
AE
266 };
267 };
7e07efb1
ID
268 struct bio_vec *copyup_bvecs;
269 u32 copyup_bvec_count;
bf0d5f50
AE
270
271 struct ceph_osd_request *osd_req;
272
273 u64 xferred; /* bytes transferred */
1b83bef2 274 int result;
bf0d5f50 275
bf0d5f50
AE
276 struct kref kref;
277};
278
0c425248 279enum img_req_flags {
9849e986 280 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 281 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
282};
283
bf0d5f50 284struct rbd_img_request {
bf0d5f50 285 struct rbd_device *rbd_dev;
9bb0248d 286 enum obj_operation_type op_type;
ecc633ca 287 enum obj_request_type data_type;
0c425248 288 unsigned long flags;
bf0d5f50 289 union {
9849e986 290 u64 snap_id; /* for reads */
bf0d5f50 291 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
292 };
293 union {
294 struct request *rq; /* block request */
295 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 296 };
15961b44 297 spinlock_t completion_lock;
55f27e09 298 u64 xferred;/* aggregate bytes transferred */
a5a337d4 299 int result; /* first nonzero obj_request result */
bf0d5f50 300
43df3d35 301 struct list_head object_extents; /* obj_req.ex structs */
bf0d5f50 302 u32 obj_request_count;
7114edac 303 u32 pending_count;
bf0d5f50
AE
304
305 struct kref kref;
306};
307
308#define for_each_obj_request(ireq, oreq) \
43df3d35 309 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
43df3d35 311 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 312
99d16943
ID
313enum rbd_watch_state {
314 RBD_WATCH_STATE_UNREGISTERED,
315 RBD_WATCH_STATE_REGISTERED,
316 RBD_WATCH_STATE_ERROR,
317};
318
ed95b21a
ID
319enum rbd_lock_state {
320 RBD_LOCK_STATE_UNLOCKED,
321 RBD_LOCK_STATE_LOCKED,
322 RBD_LOCK_STATE_RELEASING,
323};
324
325/* WatchNotify::ClientId */
326struct rbd_client_id {
327 u64 gid;
328 u64 handle;
329};
330
f84344f3 331struct rbd_mapping {
99c1f08f 332 u64 size;
34b13184 333 u64 features;
f84344f3
AE
334};
335
602adf40
YS
336/*
337 * a single device
338 */
339struct rbd_device {
de71a297 340 int dev_id; /* blkdev unique id */
602adf40
YS
341
342 int major; /* blkdev assigned major */
dd82fff1 343 int minor;
602adf40 344 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 345
a30b71b9 346 u32 image_format; /* Either 1 or 2 */
602adf40
YS
347 struct rbd_client *rbd_client;
348
349 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
350
b82d167b 351 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
352
353 struct rbd_image_header header;
b82d167b 354 unsigned long flags; /* possibly lock protected */
0d7dbfce 355 struct rbd_spec *spec;
d147543d 356 struct rbd_options *opts;
0d6d1e9c 357 char *config_info; /* add{,_single_major} string */
602adf40 358
c41d13a3 359 struct ceph_object_id header_oid;
922dab61 360 struct ceph_object_locator header_oloc;
971f839a 361
1643dfa4 362 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 363
99d16943
ID
364 struct mutex watch_mutex;
365 enum rbd_watch_state watch_state;
922dab61 366 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
367 u64 watch_cookie;
368 struct delayed_work watch_dwork;
59c2be1e 369
ed95b21a
ID
370 struct rw_semaphore lock_rwsem;
371 enum rbd_lock_state lock_state;
cbbfb0ff 372 char lock_cookie[32];
ed95b21a
ID
373 struct rbd_client_id owner_cid;
374 struct work_struct acquired_lock_work;
375 struct work_struct released_lock_work;
376 struct delayed_work lock_dwork;
377 struct work_struct unlock_work;
378 wait_queue_head_t lock_waitq;
379
1643dfa4 380 struct workqueue_struct *task_wq;
59c2be1e 381
86b00e0d
AE
382 struct rbd_spec *parent_spec;
383 u64 parent_overlap;
a2acd00e 384 atomic_t parent_ref;
2f82ee54 385 struct rbd_device *parent;
86b00e0d 386
7ad18afa
CH
387 /* Block layer tags. */
388 struct blk_mq_tag_set tag_set;
389
c666601a
JD
390 /* protects updating the header */
391 struct rw_semaphore header_rwsem;
f84344f3
AE
392
393 struct rbd_mapping mapping;
602adf40
YS
394
395 struct list_head node;
dfc5606d 396
dfc5606d
YS
397 /* sysfs related */
398 struct device dev;
b82d167b 399 unsigned long open_count; /* protected by lock */
dfc5606d
YS
400};
401
b82d167b 402/*
87c0fded
ID
403 * Flag bits for rbd_dev->flags:
404 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
405 * by rbd_dev->lock
406 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 407 */
6d292906
AE
408enum rbd_dev_flags {
409 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 410 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 411 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
412};
413
cfbf6377 414static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 415
602adf40 416static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
417static DEFINE_SPINLOCK(rbd_dev_list_lock);
418
432b8587
AE
419static LIST_HEAD(rbd_client_list); /* clients */
420static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 421
78c2a44a
AE
422/* Slab caches for frequently-allocated structures */
423
1c2a9dfe 424static struct kmem_cache *rbd_img_request_cache;
868311b1 425static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 426
9b60e70b 427static int rbd_major;
f8a22fc2
ID
428static DEFINE_IDA(rbd_dev_id_ida);
429
f5ee37bd
ID
430static struct workqueue_struct *rbd_wq;
431
9b60e70b 432/*
3cfa3b16 433 * single-major requires >= 0.75 version of userspace rbd utility.
9b60e70b 434 */
3cfa3b16 435static bool single_major = true;
9b60e70b 436module_param(single_major, bool, S_IRUGO);
3cfa3b16 437MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
9b60e70b 438
f0f8cef5
AE
439static ssize_t rbd_add(struct bus_type *bus, const char *buf,
440 size_t count);
441static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
442 size_t count);
9b60e70b
ID
443static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
444 size_t count);
445static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
446 size_t count);
6d69bb53 447static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
f0f8cef5 448
9b60e70b
ID
449static int rbd_dev_id_to_minor(int dev_id)
450{
7e513d43 451 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
452}
453
454static int minor_to_rbd_dev_id(int minor)
455{
7e513d43 456 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
457}
458
ed95b21a
ID
459static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
460{
461 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
462 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
463}
464
465static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
466{
467 bool is_lock_owner;
468
469 down_read(&rbd_dev->lock_rwsem);
470 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
471 up_read(&rbd_dev->lock_rwsem);
472 return is_lock_owner;
473}
474
8767b293
ID
475static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
476{
477 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
478}
479
b15a21dd
GKH
480static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
481static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
482static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
483static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
8767b293 484static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
b15a21dd
GKH
485
486static struct attribute *rbd_bus_attrs[] = {
487 &bus_attr_add.attr,
488 &bus_attr_remove.attr,
9b60e70b
ID
489 &bus_attr_add_single_major.attr,
490 &bus_attr_remove_single_major.attr,
8767b293 491 &bus_attr_supported_features.attr,
b15a21dd 492 NULL,
f0f8cef5 493};
92c76dc0
ID
494
495static umode_t rbd_bus_is_visible(struct kobject *kobj,
496 struct attribute *attr, int index)
497{
9b60e70b
ID
498 if (!single_major &&
499 (attr == &bus_attr_add_single_major.attr ||
500 attr == &bus_attr_remove_single_major.attr))
501 return 0;
502
92c76dc0
ID
503 return attr->mode;
504}
505
506static const struct attribute_group rbd_bus_group = {
507 .attrs = rbd_bus_attrs,
508 .is_visible = rbd_bus_is_visible,
509};
510__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
511
512static struct bus_type rbd_bus_type = {
513 .name = "rbd",
b15a21dd 514 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
515};
516
517static void rbd_root_dev_release(struct device *dev)
518{
519}
520
521static struct device rbd_root_dev = {
522 .init_name = "rbd",
523 .release = rbd_root_dev_release,
524};
525
06ecc6cb
AE
526static __printf(2, 3)
527void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
528{
529 struct va_format vaf;
530 va_list args;
531
532 va_start(args, fmt);
533 vaf.fmt = fmt;
534 vaf.va = &args;
535
536 if (!rbd_dev)
537 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
538 else if (rbd_dev->disk)
539 printk(KERN_WARNING "%s: %s: %pV\n",
540 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
541 else if (rbd_dev->spec && rbd_dev->spec->image_name)
542 printk(KERN_WARNING "%s: image %s: %pV\n",
543 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
544 else if (rbd_dev->spec && rbd_dev->spec->image_id)
545 printk(KERN_WARNING "%s: id %s: %pV\n",
546 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
547 else /* punt */
548 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
549 RBD_DRV_NAME, rbd_dev, &vaf);
550 va_end(args);
551}
552
aafb230e
AE
553#ifdef RBD_DEBUG
554#define rbd_assert(expr) \
555 if (unlikely(!(expr))) { \
556 printk(KERN_ERR "\nAssertion failure in %s() " \
557 "at line %d:\n\n" \
558 "\trbd_assert(%s);\n\n", \
559 __func__, __LINE__, #expr); \
560 BUG(); \
561 }
562#else /* !RBD_DEBUG */
563# define rbd_assert(expr) ((void) 0)
564#endif /* !RBD_DEBUG */
dfc5606d 565
05a46afd 566static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 567
cc4a38bd 568static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 569static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 570static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 571static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
572static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
573 u64 snap_id);
2ad3d716
AE
574static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
575 u8 *order, u64 *snap_size);
576static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
577 u64 *snap_features);
59c2be1e 578
602adf40
YS
579static int rbd_open(struct block_device *bdev, fmode_t mode)
580{
f0f8cef5 581 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 582 bool removing = false;
602adf40 583
a14ea269 584 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
585 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
586 removing = true;
587 else
588 rbd_dev->open_count++;
a14ea269 589 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
590 if (removing)
591 return -ENOENT;
592
c3e946ce 593 (void) get_device(&rbd_dev->dev);
340c7a2b 594
602adf40
YS
595 return 0;
596}
597
db2a144b 598static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
599{
600 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
601 unsigned long open_count_before;
602
a14ea269 603 spin_lock_irq(&rbd_dev->lock);
b82d167b 604 open_count_before = rbd_dev->open_count--;
a14ea269 605 spin_unlock_irq(&rbd_dev->lock);
b82d167b 606 rbd_assert(open_count_before > 0);
dfc5606d 607
c3e946ce 608 put_device(&rbd_dev->dev);
dfc5606d
YS
609}
610
131fd9f6
GZ
611static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
612{
1de797bb 613 int ro;
131fd9f6 614
1de797bb 615 if (get_user(ro, (int __user *)arg))
131fd9f6
GZ
616 return -EFAULT;
617
1de797bb 618 /* Snapshots can't be marked read-write */
131fd9f6
GZ
619 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
620 return -EROFS;
621
1de797bb
ID
622 /* Let blkdev_roset() handle it */
623 return -ENOTTY;
131fd9f6
GZ
624}
625
626static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
627 unsigned int cmd, unsigned long arg)
628{
629 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
1de797bb 630 int ret;
131fd9f6 631
131fd9f6
GZ
632 switch (cmd) {
633 case BLKROSET:
634 ret = rbd_ioctl_set_ro(rbd_dev, arg);
635 break;
636 default:
637 ret = -ENOTTY;
638 }
639
131fd9f6
GZ
640 return ret;
641}
642
643#ifdef CONFIG_COMPAT
644static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
645 unsigned int cmd, unsigned long arg)
646{
647 return rbd_ioctl(bdev, mode, cmd, arg);
648}
649#endif /* CONFIG_COMPAT */
650
602adf40
YS
651static const struct block_device_operations rbd_bd_ops = {
652 .owner = THIS_MODULE,
653 .open = rbd_open,
dfc5606d 654 .release = rbd_release,
131fd9f6
GZ
655 .ioctl = rbd_ioctl,
656#ifdef CONFIG_COMPAT
657 .compat_ioctl = rbd_compat_ioctl,
658#endif
602adf40
YS
659};
660
661/*
7262cfca 662 * Initialize an rbd client instance. Success or not, this function
cfbf6377 663 * consumes ceph_opts. Caller holds client_mutex.
602adf40 664 */
f8c38929 665static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
666{
667 struct rbd_client *rbdc;
668 int ret = -ENOMEM;
669
37206ee5 670 dout("%s:\n", __func__);
602adf40
YS
671 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
672 if (!rbdc)
673 goto out_opt;
674
675 kref_init(&rbdc->kref);
676 INIT_LIST_HEAD(&rbdc->node);
677
74da4a0f 678 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 679 if (IS_ERR(rbdc->client))
08f75463 680 goto out_rbdc;
43ae4701 681 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
682
683 ret = ceph_open_session(rbdc->client);
684 if (ret < 0)
08f75463 685 goto out_client;
602adf40 686
432b8587 687 spin_lock(&rbd_client_list_lock);
602adf40 688 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 689 spin_unlock(&rbd_client_list_lock);
602adf40 690
37206ee5 691 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 692
602adf40 693 return rbdc;
08f75463 694out_client:
602adf40 695 ceph_destroy_client(rbdc->client);
08f75463 696out_rbdc:
602adf40
YS
697 kfree(rbdc);
698out_opt:
43ae4701
AE
699 if (ceph_opts)
700 ceph_destroy_options(ceph_opts);
37206ee5
AE
701 dout("%s: error %d\n", __func__, ret);
702
28f259b7 703 return ERR_PTR(ret);
602adf40
YS
704}
705
2f82ee54
AE
706static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
707{
708 kref_get(&rbdc->kref);
709
710 return rbdc;
711}
712
602adf40 713/*
1f7ba331
AE
714 * Find a ceph client with specific addr and configuration. If
715 * found, bump its reference count.
602adf40 716 */
1f7ba331 717static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
718{
719 struct rbd_client *client_node;
1f7ba331 720 bool found = false;
602adf40 721
43ae4701 722 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
723 return NULL;
724
1f7ba331
AE
725 spin_lock(&rbd_client_list_lock);
726 list_for_each_entry(client_node, &rbd_client_list, node) {
727 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
728 __rbd_get_client(client_node);
729
1f7ba331
AE
730 found = true;
731 break;
732 }
733 }
734 spin_unlock(&rbd_client_list_lock);
735
736 return found ? client_node : NULL;
602adf40
YS
737}
738
59c2be1e 739/*
210c104c 740 * (Per device) rbd map options
59c2be1e
YS
741 */
742enum {
b5584180 743 Opt_queue_depth,
59c2be1e
YS
744 Opt_last_int,
745 /* int args above */
746 Opt_last_string,
747 /* string args above */
cc0538b6
AE
748 Opt_read_only,
749 Opt_read_write,
80de1912 750 Opt_lock_on_read,
e010dd0a 751 Opt_exclusive,
210c104c 752 Opt_err
59c2be1e
YS
753};
754
43ae4701 755static match_table_t rbd_opts_tokens = {
b5584180 756 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
757 /* int args above */
758 /* string args above */
be466c1c 759 {Opt_read_only, "read_only"},
cc0538b6
AE
760 {Opt_read_only, "ro"}, /* Alternate spelling */
761 {Opt_read_write, "read_write"},
762 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 763 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 764 {Opt_exclusive, "exclusive"},
210c104c 765 {Opt_err, NULL}
59c2be1e
YS
766};
767
98571b5a 768struct rbd_options {
b5584180 769 int queue_depth;
98571b5a 770 bool read_only;
80de1912 771 bool lock_on_read;
e010dd0a 772 bool exclusive;
98571b5a
AE
773};
774
b5584180 775#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 776#define RBD_READ_ONLY_DEFAULT false
80de1912 777#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 778#define RBD_EXCLUSIVE_DEFAULT false
98571b5a 779
59c2be1e
YS
780static int parse_rbd_opts_token(char *c, void *private)
781{
43ae4701 782 struct rbd_options *rbd_opts = private;
59c2be1e
YS
783 substring_t argstr[MAX_OPT_ARGS];
784 int token, intval, ret;
785
43ae4701 786 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
787 if (token < Opt_last_int) {
788 ret = match_int(&argstr[0], &intval);
789 if (ret < 0) {
210c104c 790 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
791 return ret;
792 }
793 dout("got int token %d val %d\n", token, intval);
794 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 795 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
796 } else {
797 dout("got token %d\n", token);
798 }
799
800 switch (token) {
b5584180
ID
801 case Opt_queue_depth:
802 if (intval < 1) {
803 pr_err("queue_depth out of range\n");
804 return -EINVAL;
805 }
806 rbd_opts->queue_depth = intval;
807 break;
cc0538b6
AE
808 case Opt_read_only:
809 rbd_opts->read_only = true;
810 break;
811 case Opt_read_write:
812 rbd_opts->read_only = false;
813 break;
80de1912
ID
814 case Opt_lock_on_read:
815 rbd_opts->lock_on_read = true;
816 break;
e010dd0a
ID
817 case Opt_exclusive:
818 rbd_opts->exclusive = true;
819 break;
59c2be1e 820 default:
210c104c
ID
821 /* libceph prints "bad option" msg */
822 return -EINVAL;
59c2be1e 823 }
210c104c 824
59c2be1e
YS
825 return 0;
826}
827
6d2940c8
GZ
828static char* obj_op_name(enum obj_operation_type op_type)
829{
830 switch (op_type) {
831 case OBJ_OP_READ:
832 return "read";
833 case OBJ_OP_WRITE:
834 return "write";
90e98c52
GZ
835 case OBJ_OP_DISCARD:
836 return "discard";
6d2940c8
GZ
837 default:
838 return "???";
839 }
840}
841
602adf40
YS
842/*
843 * Destroy ceph client
d23a4b3f 844 *
432b8587 845 * Caller must hold rbd_client_list_lock.
602adf40
YS
846 */
847static void rbd_client_release(struct kref *kref)
848{
849 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
850
37206ee5 851 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 852 spin_lock(&rbd_client_list_lock);
602adf40 853 list_del(&rbdc->node);
cd9d9f5d 854 spin_unlock(&rbd_client_list_lock);
602adf40
YS
855
856 ceph_destroy_client(rbdc->client);
857 kfree(rbdc);
858}
859
860/*
861 * Drop reference to ceph client node. If it's not referenced anymore, release
862 * it.
863 */
9d3997fd 864static void rbd_put_client(struct rbd_client *rbdc)
602adf40 865{
c53d5893
AE
866 if (rbdc)
867 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
868}
869
5feb0d8d
ID
870/*
871 * Get a ceph client with specific addr and configuration, if one does
872 * not exist create it. Either way, ceph_opts is consumed by this
873 * function.
874 */
875static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
876{
877 struct rbd_client *rbdc;
878
879 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
880 rbdc = rbd_client_find(ceph_opts);
881 if (rbdc) /* using an existing client */
882 ceph_destroy_options(ceph_opts);
883 else
884 rbdc = rbd_client_create(ceph_opts);
885 mutex_unlock(&client_mutex);
886
887 return rbdc;
888}
889
a30b71b9
AE
890static bool rbd_image_format_valid(u32 image_format)
891{
892 return image_format == 1 || image_format == 2;
893}
894
8e94af8e
AE
895static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
896{
103a150f
AE
897 size_t size;
898 u32 snap_count;
899
900 /* The header has to start with the magic rbd header text */
901 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
902 return false;
903
db2388b6
AE
904 /* The bio layer requires at least sector-sized I/O */
905
906 if (ondisk->options.order < SECTOR_SHIFT)
907 return false;
908
909 /* If we use u64 in a few spots we may be able to loosen this */
910
911 if (ondisk->options.order > 8 * sizeof (int) - 1)
912 return false;
913
103a150f
AE
914 /*
915 * The size of a snapshot header has to fit in a size_t, and
916 * that limits the number of snapshots.
917 */
918 snap_count = le32_to_cpu(ondisk->snap_count);
919 size = SIZE_MAX - sizeof (struct ceph_snap_context);
920 if (snap_count > size / sizeof (__le64))
921 return false;
922
923 /*
924 * Not only that, but the size of the entire the snapshot
925 * header must also be representable in a size_t.
926 */
927 size -= snap_count * sizeof (__le64);
928 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
929 return false;
930
931 return true;
8e94af8e
AE
932}
933
5bc3fb17
ID
934/*
935 * returns the size of an object in the image
936 */
937static u32 rbd_obj_bytes(struct rbd_image_header *header)
938{
939 return 1U << header->obj_order;
940}
941
263423f8
ID
942static void rbd_init_layout(struct rbd_device *rbd_dev)
943{
944 if (rbd_dev->header.stripe_unit == 0 ||
945 rbd_dev->header.stripe_count == 0) {
946 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
947 rbd_dev->header.stripe_count = 1;
948 }
949
950 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
951 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
952 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
953 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
954 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
955 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
956}
957
602adf40 958/*
bb23e37a
AE
959 * Fill an rbd image header with information from the given format 1
960 * on-disk header.
602adf40 961 */
662518b1 962static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 963 struct rbd_image_header_ondisk *ondisk)
602adf40 964{
662518b1 965 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
966 bool first_time = header->object_prefix == NULL;
967 struct ceph_snap_context *snapc;
968 char *object_prefix = NULL;
969 char *snap_names = NULL;
970 u64 *snap_sizes = NULL;
ccece235 971 u32 snap_count;
bb23e37a 972 int ret = -ENOMEM;
621901d6 973 u32 i;
602adf40 974
bb23e37a 975 /* Allocate this now to avoid having to handle failure below */
6a52325f 976
bb23e37a 977 if (first_time) {
848d796c
ID
978 object_prefix = kstrndup(ondisk->object_prefix,
979 sizeof(ondisk->object_prefix),
980 GFP_KERNEL);
bb23e37a
AE
981 if (!object_prefix)
982 return -ENOMEM;
bb23e37a 983 }
00f1f36f 984
bb23e37a 985 /* Allocate the snapshot context and fill it in */
00f1f36f 986
bb23e37a
AE
987 snap_count = le32_to_cpu(ondisk->snap_count);
988 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
989 if (!snapc)
990 goto out_err;
991 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 992 if (snap_count) {
bb23e37a 993 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
994 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
995
bb23e37a 996 /* We'll keep a copy of the snapshot names... */
621901d6 997
bb23e37a
AE
998 if (snap_names_len > (u64)SIZE_MAX)
999 goto out_2big;
1000 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1001 if (!snap_names)
6a52325f
AE
1002 goto out_err;
1003
bb23e37a 1004 /* ...as well as the array of their sizes. */
88a25a5f
ME
1005 snap_sizes = kmalloc_array(snap_count,
1006 sizeof(*header->snap_sizes),
1007 GFP_KERNEL);
bb23e37a 1008 if (!snap_sizes)
6a52325f 1009 goto out_err;
bb23e37a 1010
f785cc1d 1011 /*
bb23e37a
AE
1012 * Copy the names, and fill in each snapshot's id
1013 * and size.
1014 *
99a41ebc 1015 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1016 * ondisk buffer we're working with has
f785cc1d
AE
1017 * snap_names_len bytes beyond the end of the
1018 * snapshot id array, this memcpy() is safe.
1019 */
bb23e37a
AE
1020 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1021 snaps = ondisk->snaps;
1022 for (i = 0; i < snap_count; i++) {
1023 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1024 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1025 }
602adf40 1026 }
6a52325f 1027
bb23e37a 1028 /* We won't fail any more, fill in the header */
621901d6 1029
bb23e37a
AE
1030 if (first_time) {
1031 header->object_prefix = object_prefix;
1032 header->obj_order = ondisk->options.order;
263423f8 1033 rbd_init_layout(rbd_dev);
602adf40 1034 } else {
662518b1
AE
1035 ceph_put_snap_context(header->snapc);
1036 kfree(header->snap_names);
1037 kfree(header->snap_sizes);
602adf40 1038 }
849b4260 1039
bb23e37a 1040 /* The remaining fields always get updated (when we refresh) */
621901d6 1041
f84344f3 1042 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1043 header->snapc = snapc;
1044 header->snap_names = snap_names;
1045 header->snap_sizes = snap_sizes;
468521c1 1046
602adf40 1047 return 0;
bb23e37a
AE
1048out_2big:
1049 ret = -EIO;
6a52325f 1050out_err:
bb23e37a
AE
1051 kfree(snap_sizes);
1052 kfree(snap_names);
1053 ceph_put_snap_context(snapc);
1054 kfree(object_prefix);
ccece235 1055
bb23e37a 1056 return ret;
602adf40
YS
1057}
1058
9682fc6d
AE
1059static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1060{
1061 const char *snap_name;
1062
1063 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1064
1065 /* Skip over names until we find the one we are looking for */
1066
1067 snap_name = rbd_dev->header.snap_names;
1068 while (which--)
1069 snap_name += strlen(snap_name) + 1;
1070
1071 return kstrdup(snap_name, GFP_KERNEL);
1072}
1073
30d1cff8
AE
1074/*
1075 * Snapshot id comparison function for use with qsort()/bsearch().
1076 * Note that result is for snapshots in *descending* order.
1077 */
1078static int snapid_compare_reverse(const void *s1, const void *s2)
1079{
1080 u64 snap_id1 = *(u64 *)s1;
1081 u64 snap_id2 = *(u64 *)s2;
1082
1083 if (snap_id1 < snap_id2)
1084 return 1;
1085 return snap_id1 == snap_id2 ? 0 : -1;
1086}
1087
1088/*
1089 * Search a snapshot context to see if the given snapshot id is
1090 * present.
1091 *
1092 * Returns the position of the snapshot id in the array if it's found,
1093 * or BAD_SNAP_INDEX otherwise.
1094 *
1095 * Note: The snapshot array is in kept sorted (by the osd) in
1096 * reverse order, highest snapshot id first.
1097 */
9682fc6d
AE
1098static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1099{
1100 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1101 u64 *found;
9682fc6d 1102
30d1cff8
AE
1103 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1104 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1105
30d1cff8 1106 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1107}
1108
2ad3d716
AE
1109static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1110 u64 snap_id)
9e15b77d 1111{
54cac61f 1112 u32 which;
da6a6b63 1113 const char *snap_name;
9e15b77d 1114
54cac61f
AE
1115 which = rbd_dev_snap_index(rbd_dev, snap_id);
1116 if (which == BAD_SNAP_INDEX)
da6a6b63 1117 return ERR_PTR(-ENOENT);
54cac61f 1118
da6a6b63
JD
1119 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1120 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1121}
1122
1123static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1124{
9e15b77d
AE
1125 if (snap_id == CEPH_NOSNAP)
1126 return RBD_SNAP_HEAD_NAME;
1127
54cac61f
AE
1128 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1129 if (rbd_dev->image_format == 1)
1130 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1131
54cac61f 1132 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1133}
1134
2ad3d716
AE
1135static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1136 u64 *snap_size)
602adf40 1137{
2ad3d716
AE
1138 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1139 if (snap_id == CEPH_NOSNAP) {
1140 *snap_size = rbd_dev->header.image_size;
1141 } else if (rbd_dev->image_format == 1) {
1142 u32 which;
602adf40 1143
2ad3d716
AE
1144 which = rbd_dev_snap_index(rbd_dev, snap_id);
1145 if (which == BAD_SNAP_INDEX)
1146 return -ENOENT;
e86924a8 1147
2ad3d716
AE
1148 *snap_size = rbd_dev->header.snap_sizes[which];
1149 } else {
1150 u64 size = 0;
1151 int ret;
1152
1153 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1154 if (ret)
1155 return ret;
1156
1157 *snap_size = size;
1158 }
1159 return 0;
602adf40
YS
1160}
1161
2ad3d716
AE
1162static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1163 u64 *snap_features)
602adf40 1164{
2ad3d716
AE
1165 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1166 if (snap_id == CEPH_NOSNAP) {
1167 *snap_features = rbd_dev->header.features;
1168 } else if (rbd_dev->image_format == 1) {
1169 *snap_features = 0; /* No features for format 1 */
602adf40 1170 } else {
2ad3d716
AE
1171 u64 features = 0;
1172 int ret;
8b0241f8 1173
2ad3d716
AE
1174 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1175 if (ret)
1176 return ret;
1177
1178 *snap_features = features;
1179 }
1180 return 0;
1181}
1182
1183static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1184{
8f4b7d98 1185 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1186 u64 size = 0;
1187 u64 features = 0;
1188 int ret;
1189
2ad3d716
AE
1190 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1191 if (ret)
1192 return ret;
1193 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1194 if (ret)
1195 return ret;
1196
1197 rbd_dev->mapping.size = size;
1198 rbd_dev->mapping.features = features;
1199
8b0241f8 1200 return 0;
602adf40
YS
1201}
1202
d1cf5788
AE
1203static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1204{
1205 rbd_dev->mapping.size = 0;
1206 rbd_dev->mapping.features = 0;
200a6a8b
AE
1207}
1208
5359a17d
ID
1209static void zero_bvec(struct bio_vec *bv)
1210{
1211 void *buf;
1212 unsigned long flags;
1213
1214 buf = bvec_kmap_irq(bv, &flags);
1215 memset(buf, 0, bv->bv_len);
1216 flush_dcache_page(bv->bv_page);
1217 bvec_kunmap_irq(buf, &flags);
1218}
1219
1220static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1221{
1222 struct ceph_bio_iter it = *bio_pos;
1223
1224 ceph_bio_iter_advance(&it, off);
1225 ceph_bio_iter_advance_step(&it, bytes, ({
1226 zero_bvec(&bv);
1227 }));
1228}
1229
7e07efb1
ID
1230static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1231{
1232 struct ceph_bvec_iter it = *bvec_pos;
1233
1234 ceph_bvec_iter_advance(&it, off);
1235 ceph_bvec_iter_advance_step(&it, bytes, ({
1236 zero_bvec(&bv);
1237 }));
b9434c5b
AE
1238}
1239
3da691bf
ID
1240/*
1241 * Zero a range in @obj_req data buffer defined by a bio (list) or
afb97888 1242 * (private) bio_vec array.
3da691bf
ID
1243 *
1244 * @off is relative to the start of the data buffer.
1245 */
1246static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1247 u32 bytes)
1248{
ecc633ca 1249 switch (obj_req->img_request->data_type) {
3da691bf
ID
1250 case OBJ_REQUEST_BIO:
1251 zero_bios(&obj_req->bio_pos, off, bytes);
1252 break;
1253 case OBJ_REQUEST_BVECS:
afb97888 1254 case OBJ_REQUEST_OWN_BVECS:
3da691bf
ID
1255 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1256 break;
1257 default:
1258 rbd_assert(0);
1259 }
1260}
1261
bf0d5f50
AE
1262static void rbd_obj_request_destroy(struct kref *kref);
1263static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1264{
1265 rbd_assert(obj_request != NULL);
37206ee5 1266 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1267 kref_read(&obj_request->kref));
bf0d5f50
AE
1268 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1269}
1270
0f2d5be7
AE
1271static void rbd_img_request_get(struct rbd_img_request *img_request)
1272{
1273 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1274 kref_read(&img_request->kref));
0f2d5be7
AE
1275 kref_get(&img_request->kref);
1276}
1277
bf0d5f50
AE
1278static void rbd_img_request_destroy(struct kref *kref);
1279static void rbd_img_request_put(struct rbd_img_request *img_request)
1280{
1281 rbd_assert(img_request != NULL);
37206ee5 1282 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1283 kref_read(&img_request->kref));
e93aca0a 1284 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1285}
1286
1287static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1288 struct rbd_obj_request *obj_request)
1289{
25dcf954
AE
1290 rbd_assert(obj_request->img_request == NULL);
1291
b155e86c 1292 /* Image request now owns object's original reference */
bf0d5f50 1293 obj_request->img_request = img_request;
25dcf954 1294 img_request->obj_request_count++;
7114edac 1295 img_request->pending_count++;
15961b44 1296 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
bf0d5f50
AE
1297}
1298
1299static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1300 struct rbd_obj_request *obj_request)
1301{
15961b44 1302 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
43df3d35 1303 list_del(&obj_request->ex.oe_item);
25dcf954
AE
1304 rbd_assert(img_request->obj_request_count > 0);
1305 img_request->obj_request_count--;
bf0d5f50 1306 rbd_assert(obj_request->img_request == img_request);
bf0d5f50
AE
1307 rbd_obj_request_put(obj_request);
1308}
1309
980917fc 1310static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1311{
980917fc
ID
1312 struct ceph_osd_request *osd_req = obj_request->osd_req;
1313
a90bb0c1 1314 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
43df3d35
ID
1315 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1316 obj_request->ex.oe_len, osd_req);
980917fc 1317 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1318}
1319
0c425248
AE
1320/*
1321 * The default/initial value for all image request flags is 0. Each
1322 * is conditionally set to 1 at image request initialization time
1323 * and currently never change thereafter.
1324 */
d0b2e944
AE
1325static void img_request_layered_set(struct rbd_img_request *img_request)
1326{
1327 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1328 smp_mb();
1329}
1330
a2acd00e
AE
1331static void img_request_layered_clear(struct rbd_img_request *img_request)
1332{
1333 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1334 smp_mb();
1335}
1336
d0b2e944
AE
1337static bool img_request_layered_test(struct rbd_img_request *img_request)
1338{
1339 smp_mb();
1340 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1341}
1342
3da691bf
ID
1343static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1344{
1345 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1346
43df3d35
ID
1347 return !obj_req->ex.oe_off &&
1348 obj_req->ex.oe_len == rbd_dev->layout.object_size;
3da691bf
ID
1349}
1350
1351static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1352{
1353 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1354
43df3d35 1355 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
3da691bf
ID
1356 rbd_dev->layout.object_size;
1357}
1358
86bd7998
ID
1359static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1360{
1361 return ceph_file_extents_bytes(obj_req->img_extents,
1362 obj_req->num_img_extents);
1363}
1364
3da691bf
ID
1365static bool rbd_img_is_write(struct rbd_img_request *img_req)
1366{
9bb0248d 1367 switch (img_req->op_type) {
3da691bf
ID
1368 case OBJ_OP_READ:
1369 return false;
1370 case OBJ_OP_WRITE:
1371 case OBJ_OP_DISCARD:
1372 return true;
1373 default:
1374 rbd_assert(0);
1375 }
1376}
1377
3da691bf
ID
1378static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1379
85e084fe 1380static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50 1381{
3da691bf 1382 struct rbd_obj_request *obj_req = osd_req->r_priv;
bf0d5f50 1383
3da691bf
ID
1384 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1385 osd_req->r_result, obj_req);
1386 rbd_assert(osd_req == obj_req->osd_req);
0ccd5926 1387
3da691bf
ID
1388 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1389 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1390 obj_req->xferred = osd_req->r_result;
1391 else
1392 /*
1393 * Writes aren't allowed to return a data payload. In some
1394 * guarded write cases (e.g. stat + zero on an empty object)
1395 * a stat response makes it through, but we don't care.
1396 */
1397 obj_req->xferred = 0;
bf0d5f50 1398
3da691bf 1399 rbd_obj_handle_request(obj_req);
bf0d5f50
AE
1400}
1401
9d4df01f 1402static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1403{
8c042b0d 1404 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1405
a162b308 1406 osd_req->r_flags = CEPH_OSD_FLAG_READ;
7c84883a 1407 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1408}
1409
1410static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1411{
9d4df01f 1412 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1413
a162b308 1414 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1134e091 1415 ktime_get_real_ts(&osd_req->r_mtime);
43df3d35 1416 osd_req->r_data_offset = obj_request->ex.oe_off;
430c28c3
AE
1417}
1418
bc81207e 1419static struct ceph_osd_request *
a162b308 1420rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
bc81207e 1421{
a162b308
ID
1422 struct rbd_img_request *img_req = obj_req->img_request;
1423 struct rbd_device *rbd_dev = img_req->rbd_dev;
bc81207e
ID
1424 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1425 struct ceph_osd_request *req;
a90bb0c1
ID
1426 const char *name_format = rbd_dev->image_format == 1 ?
1427 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e 1428
a162b308
ID
1429 req = ceph_osdc_alloc_request(osdc,
1430 (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1431 num_ops, false, GFP_NOIO);
bc81207e
ID
1432 if (!req)
1433 return NULL;
1434
bc81207e 1435 req->r_callback = rbd_osd_req_callback;
a162b308 1436 req->r_priv = obj_req;
bc81207e
ID
1437
1438 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
a90bb0c1 1439 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
43df3d35 1440 rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
bc81207e
ID
1441 goto err_req;
1442
1443 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1444 goto err_req;
1445
1446 return req;
1447
1448err_req:
1449 ceph_osdc_put_request(req);
1450 return NULL;
1451}
1452
bf0d5f50
AE
1453static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1454{
1455 ceph_osdc_put_request(osd_req);
1456}
1457
ecc633ca 1458static struct rbd_obj_request *rbd_obj_request_create(void)
bf0d5f50
AE
1459{
1460 struct rbd_obj_request *obj_request;
bf0d5f50 1461
5a60e876 1462 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 1463 if (!obj_request)
f907ad55 1464 return NULL;
f907ad55 1465
43df3d35 1466 ceph_object_extent_init(&obj_request->ex);
bf0d5f50
AE
1467 kref_init(&obj_request->kref);
1468
67e2b652 1469 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
1470 return obj_request;
1471}
1472
1473static void rbd_obj_request_destroy(struct kref *kref)
1474{
1475 struct rbd_obj_request *obj_request;
7e07efb1 1476 u32 i;
bf0d5f50
AE
1477
1478 obj_request = container_of(kref, struct rbd_obj_request, kref);
1479
37206ee5
AE
1480 dout("%s: obj %p\n", __func__, obj_request);
1481
bf0d5f50
AE
1482 if (obj_request->osd_req)
1483 rbd_osd_req_destroy(obj_request->osd_req);
1484
ecc633ca 1485 switch (obj_request->img_request->data_type) {
9969ebc5 1486 case OBJ_REQUEST_NODATA:
bf0d5f50 1487 case OBJ_REQUEST_BIO:
7e07efb1 1488 case OBJ_REQUEST_BVECS:
5359a17d 1489 break; /* Nothing to do */
afb97888
ID
1490 case OBJ_REQUEST_OWN_BVECS:
1491 kfree(obj_request->bvec_pos.bvecs);
1492 break;
7e07efb1
ID
1493 default:
1494 rbd_assert(0);
bf0d5f50
AE
1495 }
1496
86bd7998 1497 kfree(obj_request->img_extents);
7e07efb1
ID
1498 if (obj_request->copyup_bvecs) {
1499 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1500 if (obj_request->copyup_bvecs[i].bv_page)
1501 __free_page(obj_request->copyup_bvecs[i].bv_page);
1502 }
1503 kfree(obj_request->copyup_bvecs);
1504 }
f9dcbc44 1505
868311b1 1506 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1507}
1508
fb65d228
AE
1509/* It's OK to call this for a device with no parent */
1510
1511static void rbd_spec_put(struct rbd_spec *spec);
1512static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1513{
1514 rbd_dev_remove_parent(rbd_dev);
1515 rbd_spec_put(rbd_dev->parent_spec);
1516 rbd_dev->parent_spec = NULL;
1517 rbd_dev->parent_overlap = 0;
1518}
1519
a2acd00e
AE
1520/*
1521 * Parent image reference counting is used to determine when an
1522 * image's parent fields can be safely torn down--after there are no
1523 * more in-flight requests to the parent image. When the last
1524 * reference is dropped, cleaning them up is safe.
1525 */
1526static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1527{
1528 int counter;
1529
1530 if (!rbd_dev->parent_spec)
1531 return;
1532
1533 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1534 if (counter > 0)
1535 return;
1536
1537 /* Last reference; clean up parent data structures */
1538
1539 if (!counter)
1540 rbd_dev_unparent(rbd_dev);
1541 else
9584d508 1542 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
1543}
1544
1545/*
1546 * If an image has a non-zero parent overlap, get a reference to its
1547 * parent.
1548 *
1549 * Returns true if the rbd device has a parent with a non-zero
1550 * overlap and a reference for it was successfully taken, or
1551 * false otherwise.
1552 */
1553static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1554{
ae43e9d0 1555 int counter = 0;
a2acd00e
AE
1556
1557 if (!rbd_dev->parent_spec)
1558 return false;
1559
ae43e9d0
ID
1560 down_read(&rbd_dev->header_rwsem);
1561 if (rbd_dev->parent_overlap)
1562 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1563 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
1564
1565 if (counter < 0)
9584d508 1566 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 1567
ae43e9d0 1568 return counter > 0;
a2acd00e
AE
1569}
1570
bf0d5f50
AE
1571/*
1572 * Caller is responsible for filling in the list of object requests
1573 * that comprises the image request, and the Linux request pointer
1574 * (if there is one).
1575 */
cc344fa1
AE
1576static struct rbd_img_request *rbd_img_request_create(
1577 struct rbd_device *rbd_dev,
6d2940c8 1578 enum obj_operation_type op_type,
4e752f0a 1579 struct ceph_snap_context *snapc)
bf0d5f50
AE
1580{
1581 struct rbd_img_request *img_request;
bf0d5f50 1582
a0c5895b 1583 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
1584 if (!img_request)
1585 return NULL;
1586
bf0d5f50 1587 img_request->rbd_dev = rbd_dev;
9bb0248d 1588 img_request->op_type = op_type;
9bb0248d 1589 if (!rbd_img_is_write(img_request))
bf0d5f50 1590 img_request->snap_id = rbd_dev->spec->snap_id;
9bb0248d
ID
1591 else
1592 img_request->snapc = snapc;
1593
a2acd00e 1594 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1595 img_request_layered_set(img_request);
a0c5895b 1596
bf0d5f50 1597 spin_lock_init(&img_request->completion_lock);
43df3d35 1598 INIT_LIST_HEAD(&img_request->object_extents);
bf0d5f50
AE
1599 kref_init(&img_request->kref);
1600
dfd9875f
ID
1601 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1602 obj_op_name(op_type), img_request);
bf0d5f50
AE
1603 return img_request;
1604}
1605
1606static void rbd_img_request_destroy(struct kref *kref)
1607{
1608 struct rbd_img_request *img_request;
1609 struct rbd_obj_request *obj_request;
1610 struct rbd_obj_request *next_obj_request;
1611
1612 img_request = container_of(kref, struct rbd_img_request, kref);
1613
37206ee5
AE
1614 dout("%s: img %p\n", __func__, img_request);
1615
bf0d5f50
AE
1616 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1617 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1618 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1619
a2acd00e
AE
1620 if (img_request_layered_test(img_request)) {
1621 img_request_layered_clear(img_request);
1622 rbd_dev_parent_put(img_request->rbd_dev);
1623 }
1624
9bb0248d 1625 if (rbd_img_is_write(img_request))
812164f8 1626 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1627
1c2a9dfe 1628 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1629}
1630
86bd7998
ID
1631static void prune_extents(struct ceph_file_extent *img_extents,
1632 u32 *num_img_extents, u64 overlap)
1633{
1634 u32 cnt = *num_img_extents;
1635
1636 /* drop extents completely beyond the overlap */
1637 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1638 cnt--;
1639
1640 if (cnt) {
1641 struct ceph_file_extent *ex = &img_extents[cnt - 1];
1642
1643 /* trim final overlapping extent */
1644 if (ex->fe_off + ex->fe_len > overlap)
1645 ex->fe_len = overlap - ex->fe_off;
1646 }
1647
1648 *num_img_extents = cnt;
1649}
1650
1651/*
1652 * Determine the byte range(s) covered by either just the object extent
1653 * or the entire object in the parent image.
1654 */
1655static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1656 bool entire)
1657{
1658 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1659 int ret;
1660
1661 if (!rbd_dev->parent_overlap)
1662 return 0;
1663
1664 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1665 entire ? 0 : obj_req->ex.oe_off,
1666 entire ? rbd_dev->layout.object_size :
1667 obj_req->ex.oe_len,
1668 &obj_req->img_extents,
1669 &obj_req->num_img_extents);
1670 if (ret)
1671 return ret;
1672
1673 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1674 rbd_dev->parent_overlap);
1675 return 0;
1676}
1677
3da691bf
ID
1678static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1679{
ecc633ca 1680 switch (obj_req->img_request->data_type) {
3da691bf
ID
1681 case OBJ_REQUEST_BIO:
1682 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1683 &obj_req->bio_pos,
43df3d35 1684 obj_req->ex.oe_len);
3da691bf
ID
1685 break;
1686 case OBJ_REQUEST_BVECS:
afb97888 1687 case OBJ_REQUEST_OWN_BVECS:
3da691bf 1688 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
43df3d35 1689 obj_req->ex.oe_len);
afb97888 1690 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
3da691bf
ID
1691 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1692 &obj_req->bvec_pos);
1693 break;
1694 default:
1695 rbd_assert(0);
1696 }
1697}
1698
1699static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1700{
a162b308 1701 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
3da691bf
ID
1702 if (!obj_req->osd_req)
1703 return -ENOMEM;
1704
1705 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
43df3d35 1706 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf
ID
1707 rbd_osd_req_setup_data(obj_req, 0);
1708
1709 rbd_osd_req_format_read(obj_req);
1710 return 0;
1711}
1712
1713static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1714 unsigned int which)
1715{
1716 struct page **pages;
1717
1718 /*
1719 * The response data for a STAT call consists of:
1720 * le64 length;
1721 * struct {
1722 * le32 tv_sec;
1723 * le32 tv_nsec;
1724 * } mtime;
1725 */
1726 pages = ceph_alloc_page_vector(1, GFP_NOIO);
1727 if (IS_ERR(pages))
1728 return PTR_ERR(pages);
1729
1730 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1731 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1732 8 + sizeof(struct ceph_timespec),
1733 0, false, true);
1734 return 0;
1735}
1736
1737static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1738 unsigned int which)
1739{
1740 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1741 u16 opcode;
1742
1743 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1744 rbd_dev->layout.object_size,
1745 rbd_dev->layout.object_size);
1746
1747 if (rbd_obj_is_entire(obj_req))
1748 opcode = CEPH_OSD_OP_WRITEFULL;
1749 else
1750 opcode = CEPH_OSD_OP_WRITE;
1751
1752 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
43df3d35 1753 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf
ID
1754 rbd_osd_req_setup_data(obj_req, which++);
1755
1756 rbd_assert(which == obj_req->osd_req->r_num_ops);
1757 rbd_osd_req_format_write(obj_req);
1758}
1759
1760static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1761{
3da691bf
ID
1762 unsigned int num_osd_ops, which = 0;
1763 int ret;
1764
86bd7998
ID
1765 /* reverse map the entire object onto the parent */
1766 ret = rbd_obj_calc_img_extents(obj_req, true);
1767 if (ret)
1768 return ret;
1769
1770 if (obj_req->num_img_extents) {
3da691bf
ID
1771 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1772 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1773 } else {
1774 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1775 num_osd_ops = 2; /* setallochint + write/writefull */
1776 }
1777
a162b308 1778 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1779 if (!obj_req->osd_req)
1780 return -ENOMEM;
1781
86bd7998 1782 if (obj_req->num_img_extents) {
3da691bf
ID
1783 ret = __rbd_obj_setup_stat(obj_req, which++);
1784 if (ret)
1785 return ret;
1786 }
1787
1788 __rbd_obj_setup_write(obj_req, which);
1789 return 0;
1790}
1791
1792static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1793 unsigned int which)
1794{
1795 u16 opcode;
1796
1797 if (rbd_obj_is_entire(obj_req)) {
86bd7998 1798 if (obj_req->num_img_extents) {
2bb1e56e
ID
1799 osd_req_op_init(obj_req->osd_req, which++,
1800 CEPH_OSD_OP_CREATE, 0);
3da691bf
ID
1801 opcode = CEPH_OSD_OP_TRUNCATE;
1802 } else {
1803 osd_req_op_init(obj_req->osd_req, which++,
1804 CEPH_OSD_OP_DELETE, 0);
1805 opcode = 0;
1806 }
1807 } else if (rbd_obj_is_tail(obj_req)) {
1808 opcode = CEPH_OSD_OP_TRUNCATE;
1809 } else {
1810 opcode = CEPH_OSD_OP_ZERO;
1811 }
1812
1813 if (opcode)
1814 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
43df3d35 1815 obj_req->ex.oe_off, obj_req->ex.oe_len,
3da691bf
ID
1816 0, 0);
1817
1818 rbd_assert(which == obj_req->osd_req->r_num_ops);
1819 rbd_osd_req_format_write(obj_req);
1820}
1821
1822static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
1823{
3da691bf
ID
1824 unsigned int num_osd_ops, which = 0;
1825 int ret;
1826
86bd7998
ID
1827 /* reverse map the entire object onto the parent */
1828 ret = rbd_obj_calc_img_extents(obj_req, true);
1829 if (ret)
1830 return ret;
1831
3da691bf
ID
1832 if (rbd_obj_is_entire(obj_req)) {
1833 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2bb1e56e
ID
1834 if (obj_req->num_img_extents)
1835 num_osd_ops = 2; /* create + truncate */
1836 else
1837 num_osd_ops = 1; /* delete */
3da691bf 1838 } else {
86bd7998 1839 if (obj_req->num_img_extents) {
3da691bf
ID
1840 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1841 num_osd_ops = 2; /* stat + truncate/zero */
1842 } else {
1843 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1844 num_osd_ops = 1; /* truncate/zero */
1845 }
1846 }
1847
a162b308 1848 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1849 if (!obj_req->osd_req)
1850 return -ENOMEM;
1851
86bd7998 1852 if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
3da691bf
ID
1853 ret = __rbd_obj_setup_stat(obj_req, which++);
1854 if (ret)
1855 return ret;
1856 }
1857
1858 __rbd_obj_setup_discard(obj_req, which);
1859 return 0;
1860}
1861
1862/*
1863 * For each object request in @img_req, allocate an OSD request, add
1864 * individual OSD ops and prepare them for submission. The number of
1865 * OSD ops depends on op_type and the overlap point (if any).
1866 */
1867static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1868{
1869 struct rbd_obj_request *obj_req;
1870 int ret;
1871
1872 for_each_obj_request(img_req, obj_req) {
9bb0248d 1873 switch (img_req->op_type) {
3da691bf
ID
1874 case OBJ_OP_READ:
1875 ret = rbd_obj_setup_read(obj_req);
1876 break;
1877 case OBJ_OP_WRITE:
1878 ret = rbd_obj_setup_write(obj_req);
1879 break;
1880 case OBJ_OP_DISCARD:
1881 ret = rbd_obj_setup_discard(obj_req);
1882 break;
1883 default:
1884 rbd_assert(0);
1885 }
1886 if (ret)
1887 return ret;
1888 }
1889
1890 return 0;
1891}
1892
5a237819
ID
1893union rbd_img_fill_iter {
1894 struct ceph_bio_iter bio_iter;
1895 struct ceph_bvec_iter bvec_iter;
1896};
1897
1898struct rbd_img_fill_ctx {
1899 enum obj_request_type pos_type;
1900 union rbd_img_fill_iter *pos;
1901 union rbd_img_fill_iter iter;
1902 ceph_object_extent_fn_t set_pos_fn;
afb97888
ID
1903 ceph_object_extent_fn_t count_fn;
1904 ceph_object_extent_fn_t copy_fn;
5a237819
ID
1905};
1906
1907static struct ceph_object_extent *alloc_object_extent(void *arg)
1908{
1909 struct rbd_img_request *img_req = arg;
1910 struct rbd_obj_request *obj_req;
1911
1912 obj_req = rbd_obj_request_create();
1913 if (!obj_req)
1914 return NULL;
1915
1916 rbd_img_obj_request_add(img_req, obj_req);
1917 return &obj_req->ex;
1918}
1919
afb97888
ID
1920/*
1921 * While su != os && sc == 1 is technically not fancy (it's the same
1922 * layout as su == os && sc == 1), we can't use the nocopy path for it
1923 * because ->set_pos_fn() should be called only once per object.
1924 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1925 * treat su != os && sc == 1 as fancy.
1926 */
1927static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1928{
1929 return l->stripe_unit != l->object_size;
1930}
1931
1932static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1933 struct ceph_file_extent *img_extents,
1934 u32 num_img_extents,
1935 struct rbd_img_fill_ctx *fctx)
1936{
1937 u32 i;
1938 int ret;
1939
1940 img_req->data_type = fctx->pos_type;
1941
1942 /*
1943 * Create object requests and set each object request's starting
1944 * position in the provided bio (list) or bio_vec array.
1945 */
1946 fctx->iter = *fctx->pos;
1947 for (i = 0; i < num_img_extents; i++) {
1948 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
1949 img_extents[i].fe_off,
1950 img_extents[i].fe_len,
1951 &img_req->object_extents,
1952 alloc_object_extent, img_req,
1953 fctx->set_pos_fn, &fctx->iter);
1954 if (ret)
1955 return ret;
1956 }
1957
1958 return __rbd_img_fill_request(img_req);
1959}
1960
5a237819
ID
1961/*
1962 * Map a list of image extents to a list of object extents, create the
1963 * corresponding object requests (normally each to a different object,
1964 * but not always) and add them to @img_req. For each object request,
afb97888 1965 * set up its data descriptor to point to the corresponding chunk(s) of
5a237819
ID
1966 * @fctx->pos data buffer.
1967 *
afb97888
ID
1968 * Because ceph_file_to_extents() will merge adjacent object extents
1969 * together, each object request's data descriptor may point to multiple
1970 * different chunks of @fctx->pos data buffer.
1971 *
5a237819
ID
1972 * @fctx->pos data buffer is assumed to be large enough.
1973 */
1974static int rbd_img_fill_request(struct rbd_img_request *img_req,
1975 struct ceph_file_extent *img_extents,
1976 u32 num_img_extents,
1977 struct rbd_img_fill_ctx *fctx)
1978{
afb97888
ID
1979 struct rbd_device *rbd_dev = img_req->rbd_dev;
1980 struct rbd_obj_request *obj_req;
5a237819
ID
1981 u32 i;
1982 int ret;
1983
afb97888
ID
1984 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
1985 !rbd_layout_is_fancy(&rbd_dev->layout))
1986 return rbd_img_fill_request_nocopy(img_req, img_extents,
1987 num_img_extents, fctx);
1988
1989 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
5a237819
ID
1990
1991 /*
afb97888
ID
1992 * Create object requests and determine ->bvec_count for each object
1993 * request. Note that ->bvec_count sum over all object requests may
1994 * be greater than the number of bio_vecs in the provided bio (list)
1995 * or bio_vec array because when mapped, those bio_vecs can straddle
1996 * stripe unit boundaries.
5a237819
ID
1997 */
1998 fctx->iter = *fctx->pos;
1999 for (i = 0; i < num_img_extents; i++) {
afb97888 2000 ret = ceph_file_to_extents(&rbd_dev->layout,
5a237819
ID
2001 img_extents[i].fe_off,
2002 img_extents[i].fe_len,
2003 &img_req->object_extents,
2004 alloc_object_extent, img_req,
afb97888
ID
2005 fctx->count_fn, &fctx->iter);
2006 if (ret)
2007 return ret;
2008 }
2009
2010 for_each_obj_request(img_req, obj_req) {
2011 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2012 sizeof(*obj_req->bvec_pos.bvecs),
2013 GFP_NOIO);
2014 if (!obj_req->bvec_pos.bvecs)
2015 return -ENOMEM;
2016 }
2017
2018 /*
2019 * Fill in each object request's private bio_vec array, splitting and
2020 * rearranging the provided bio_vecs in stripe unit chunks as needed.
2021 */
2022 fctx->iter = *fctx->pos;
2023 for (i = 0; i < num_img_extents; i++) {
2024 ret = ceph_iterate_extents(&rbd_dev->layout,
2025 img_extents[i].fe_off,
2026 img_extents[i].fe_len,
2027 &img_req->object_extents,
2028 fctx->copy_fn, &fctx->iter);
5a237819
ID
2029 if (ret)
2030 return ret;
2031 }
2032
2033 return __rbd_img_fill_request(img_req);
2034}
2035
2036static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2037 u64 off, u64 len)
2038{
2039 struct ceph_file_extent ex = { off, len };
2040 union rbd_img_fill_iter dummy;
2041 struct rbd_img_fill_ctx fctx = {
2042 .pos_type = OBJ_REQUEST_NODATA,
2043 .pos = &dummy,
2044 };
2045
2046 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2047}
2048
2049static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2050{
2051 struct rbd_obj_request *obj_req =
2052 container_of(ex, struct rbd_obj_request, ex);
2053 struct ceph_bio_iter *it = arg;
2054
2055 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2056 obj_req->bio_pos = *it;
2057 ceph_bio_iter_advance(it, bytes);
2058}
2059
afb97888
ID
2060static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2061{
2062 struct rbd_obj_request *obj_req =
2063 container_of(ex, struct rbd_obj_request, ex);
2064 struct ceph_bio_iter *it = arg;
2065
2066 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2067 ceph_bio_iter_advance_step(it, bytes, ({
2068 obj_req->bvec_count++;
2069 }));
2070
2071}
2072
2073static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2074{
2075 struct rbd_obj_request *obj_req =
2076 container_of(ex, struct rbd_obj_request, ex);
2077 struct ceph_bio_iter *it = arg;
2078
2079 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2080 ceph_bio_iter_advance_step(it, bytes, ({
2081 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2082 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2083 }));
2084}
2085
5a237819
ID
2086static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2087 struct ceph_file_extent *img_extents,
2088 u32 num_img_extents,
2089 struct ceph_bio_iter *bio_pos)
2090{
2091 struct rbd_img_fill_ctx fctx = {
2092 .pos_type = OBJ_REQUEST_BIO,
2093 .pos = (union rbd_img_fill_iter *)bio_pos,
2094 .set_pos_fn = set_bio_pos,
afb97888
ID
2095 .count_fn = count_bio_bvecs,
2096 .copy_fn = copy_bio_bvecs,
5a237819
ID
2097 };
2098
2099 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2100 &fctx);
2101}
2102
2103static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2104 u64 off, u64 len, struct bio *bio)
2105{
2106 struct ceph_file_extent ex = { off, len };
2107 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2108
2109 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2110}
2111
2112static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2113{
2114 struct rbd_obj_request *obj_req =
2115 container_of(ex, struct rbd_obj_request, ex);
2116 struct ceph_bvec_iter *it = arg;
2117
2118 obj_req->bvec_pos = *it;
2119 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2120 ceph_bvec_iter_advance(it, bytes);
2121}
2122
afb97888
ID
2123static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2124{
2125 struct rbd_obj_request *obj_req =
2126 container_of(ex, struct rbd_obj_request, ex);
2127 struct ceph_bvec_iter *it = arg;
2128
2129 ceph_bvec_iter_advance_step(it, bytes, ({
2130 obj_req->bvec_count++;
2131 }));
2132}
2133
2134static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2135{
2136 struct rbd_obj_request *obj_req =
2137 container_of(ex, struct rbd_obj_request, ex);
2138 struct ceph_bvec_iter *it = arg;
2139
2140 ceph_bvec_iter_advance_step(it, bytes, ({
2141 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2142 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2143 }));
2144}
2145
5a237819
ID
2146static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2147 struct ceph_file_extent *img_extents,
2148 u32 num_img_extents,
2149 struct ceph_bvec_iter *bvec_pos)
2150{
2151 struct rbd_img_fill_ctx fctx = {
2152 .pos_type = OBJ_REQUEST_BVECS,
2153 .pos = (union rbd_img_fill_iter *)bvec_pos,
2154 .set_pos_fn = set_bvec_pos,
afb97888
ID
2155 .count_fn = count_bvecs,
2156 .copy_fn = copy_bvecs,
5a237819
ID
2157 };
2158
2159 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2160 &fctx);
2161}
2162
2163static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2164 struct ceph_file_extent *img_extents,
2165 u32 num_img_extents,
2166 struct bio_vec *bvecs)
2167{
2168 struct ceph_bvec_iter it = {
2169 .bvecs = bvecs,
2170 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2171 num_img_extents) },
2172 };
2173
2174 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2175 &it);
2176}
2177
efbd1a11 2178static void rbd_img_request_submit(struct rbd_img_request *img_request)
bf0d5f50 2179{
bf0d5f50
AE
2180 struct rbd_obj_request *obj_request;
2181
37206ee5 2182 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 2183
663ae2cc 2184 rbd_img_request_get(img_request);
efbd1a11 2185 for_each_obj_request(img_request, obj_request)
3da691bf 2186 rbd_obj_request_submit(obj_request);
bf0d5f50 2187
663ae2cc 2188 rbd_img_request_put(img_request);
bf0d5f50 2189}
8b3e1a56 2190
86bd7998 2191static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
3da691bf
ID
2192{
2193 struct rbd_img_request *img_req = obj_req->img_request;
2194 struct rbd_img_request *child_img_req;
2195 int ret;
2196
e93aca0a
ID
2197 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2198 OBJ_OP_READ, NULL);
3da691bf
ID
2199 if (!child_img_req)
2200 return -ENOMEM;
2201
e93aca0a
ID
2202 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2203 child_img_req->obj_request = obj_req;
2204
3da691bf 2205 if (!rbd_img_is_write(img_req)) {
ecc633ca 2206 switch (img_req->data_type) {
3da691bf 2207 case OBJ_REQUEST_BIO:
5a237819
ID
2208 ret = __rbd_img_fill_from_bio(child_img_req,
2209 obj_req->img_extents,
2210 obj_req->num_img_extents,
2211 &obj_req->bio_pos);
3da691bf
ID
2212 break;
2213 case OBJ_REQUEST_BVECS:
afb97888 2214 case OBJ_REQUEST_OWN_BVECS:
5a237819
ID
2215 ret = __rbd_img_fill_from_bvecs(child_img_req,
2216 obj_req->img_extents,
2217 obj_req->num_img_extents,
2218 &obj_req->bvec_pos);
3da691bf
ID
2219 break;
2220 default:
2221 rbd_assert(0);
2222 }
2223 } else {
5a237819
ID
2224 ret = rbd_img_fill_from_bvecs(child_img_req,
2225 obj_req->img_extents,
2226 obj_req->num_img_extents,
2227 obj_req->copyup_bvecs);
3da691bf
ID
2228 }
2229 if (ret) {
2230 rbd_img_request_put(child_img_req);
2231 return ret;
2232 }
2233
2234 rbd_img_request_submit(child_img_req);
2235 return 0;
2236}
2237
2238static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2239{
2240 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2241 int ret;
2242
2243 if (obj_req->result == -ENOENT &&
86bd7998
ID
2244 rbd_dev->parent_overlap && !obj_req->tried_parent) {
2245 /* reverse map this object extent onto the parent */
2246 ret = rbd_obj_calc_img_extents(obj_req, false);
3da691bf
ID
2247 if (ret) {
2248 obj_req->result = ret;
2249 return true;
2250 }
86bd7998
ID
2251
2252 if (obj_req->num_img_extents) {
2253 obj_req->tried_parent = true;
2254 ret = rbd_obj_read_from_parent(obj_req);
2255 if (ret) {
2256 obj_req->result = ret;
2257 return true;
2258 }
2259 return false;
2260 }
3da691bf
ID
2261 }
2262
2263 /*
2264 * -ENOENT means a hole in the image -- zero-fill the entire
2265 * length of the request. A short read also implies zero-fill
2266 * to the end of the request. In both cases we update xferred
2267 * count to indicate the whole request was satisfied.
2268 */
2269 if (obj_req->result == -ENOENT ||
43df3d35 2270 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
3da691bf
ID
2271 rbd_assert(!obj_req->xferred || !obj_req->result);
2272 rbd_obj_zero_range(obj_req, obj_req->xferred,
43df3d35 2273 obj_req->ex.oe_len - obj_req->xferred);
3da691bf 2274 obj_req->result = 0;
43df3d35 2275 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2276 }
2277
2278 return true;
2279}
2280
2281/*
2282 * copyup_bvecs pages are never highmem pages
2283 */
2284static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2285{
2286 struct ceph_bvec_iter it = {
2287 .bvecs = bvecs,
2288 .iter = { .bi_size = bytes },
2289 };
2290
2291 ceph_bvec_iter_advance_step(&it, bytes, ({
2292 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2293 bv.bv_len))
2294 return false;
2295 }));
2296 return true;
2297}
2298
2299static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2300{
3da691bf
ID
2301 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2302
2303 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2304 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2305 rbd_osd_req_destroy(obj_req->osd_req);
2306
2307 /*
2308 * Create a copyup request with the same number of OSD ops as
2309 * the original request. The original request was stat + op(s),
2310 * the new copyup request will be copyup + the same op(s).
2311 */
a162b308 2312 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
2313 if (!obj_req->osd_req)
2314 return -ENOMEM;
2315
2316 /*
2317 * Only send non-zero copyup data to save some I/O and network
2318 * bandwidth -- zero copyup data is equivalent to the object not
2319 * existing.
2320 */
2321 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2322 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2323 bytes = 0;
2324 }
2325
2326 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2327 "copyup");
2328 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2329 obj_req->copyup_bvecs, bytes);
2330
9bb0248d 2331 switch (obj_req->img_request->op_type) {
3da691bf
ID
2332 case OBJ_OP_WRITE:
2333 __rbd_obj_setup_write(obj_req, 1);
2334 break;
2335 case OBJ_OP_DISCARD:
2336 rbd_assert(!rbd_obj_is_entire(obj_req));
2337 __rbd_obj_setup_discard(obj_req, 1);
2338 break;
2339 default:
2340 rbd_assert(0);
2341 }
2342
2343 rbd_obj_request_submit(obj_req);
3da691bf
ID
2344 return 0;
2345}
2346
7e07efb1
ID
2347static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2348{
2349 u32 i;
2350
2351 rbd_assert(!obj_req->copyup_bvecs);
2352 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2353 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2354 sizeof(*obj_req->copyup_bvecs),
2355 GFP_NOIO);
2356 if (!obj_req->copyup_bvecs)
2357 return -ENOMEM;
2358
2359 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2360 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2361
2362 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2363 if (!obj_req->copyup_bvecs[i].bv_page)
2364 return -ENOMEM;
2365
2366 obj_req->copyup_bvecs[i].bv_offset = 0;
2367 obj_req->copyup_bvecs[i].bv_len = len;
2368 obj_overlap -= len;
2369 }
2370
2371 rbd_assert(!obj_overlap);
2372 return 0;
2373}
2374
3da691bf
ID
2375static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2376{
2377 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3da691bf
ID
2378 int ret;
2379
86bd7998
ID
2380 rbd_assert(obj_req->num_img_extents);
2381 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2382 rbd_dev->parent_overlap);
2383 if (!obj_req->num_img_extents) {
3da691bf
ID
2384 /*
2385 * The overlap has become 0 (most likely because the
2386 * image has been flattened). Use rbd_obj_issue_copyup()
2387 * to re-submit the original write request -- the copyup
2388 * operation itself will be a no-op, since someone must
2389 * have populated the child object while we weren't
2390 * looking. Move to WRITE_FLAT state as we'll be done
2391 * with the operation once the null copyup completes.
2392 */
2393 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2394 return rbd_obj_issue_copyup(obj_req, 0);
2395 }
2396
86bd7998 2397 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3da691bf
ID
2398 if (ret)
2399 return ret;
2400
2401 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
86bd7998 2402 return rbd_obj_read_from_parent(obj_req);
3da691bf
ID
2403}
2404
2405static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2406{
2407 int ret;
2408
2409again:
2410 switch (obj_req->write_state) {
2411 case RBD_OBJ_WRITE_GUARD:
2412 rbd_assert(!obj_req->xferred);
2413 if (obj_req->result == -ENOENT) {
2414 /*
2415 * The target object doesn't exist. Read the data for
2416 * the entire target object up to the overlap point (if
2417 * any) from the parent, so we can use it for a copyup.
2418 */
2419 ret = rbd_obj_handle_write_guard(obj_req);
2420 if (ret) {
2421 obj_req->result = ret;
2422 return true;
2423 }
2424 return false;
2425 }
2426 /* fall through */
2427 case RBD_OBJ_WRITE_FLAT:
2428 if (!obj_req->result)
2429 /*
2430 * There is no such thing as a successful short
2431 * write -- indicate the whole request was satisfied.
2432 */
43df3d35 2433 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2434 return true;
2435 case RBD_OBJ_WRITE_COPYUP:
2436 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2437 if (obj_req->result)
2438 goto again;
2439
2440 rbd_assert(obj_req->xferred);
2441 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2442 if (ret) {
2443 obj_req->result = ret;
2444 return true;
2445 }
2446 return false;
2447 default:
2448 rbd_assert(0);
2449 }
2450}
2451
2452/*
2453 * Returns true if @obj_req is completed, or false otherwise.
2454 */
2455static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2456{
9bb0248d 2457 switch (obj_req->img_request->op_type) {
3da691bf
ID
2458 case OBJ_OP_READ:
2459 return rbd_obj_handle_read(obj_req);
2460 case OBJ_OP_WRITE:
2461 return rbd_obj_handle_write(obj_req);
2462 case OBJ_OP_DISCARD:
2463 if (rbd_obj_handle_write(obj_req)) {
2464 /*
2465 * Hide -ENOENT from delete/truncate/zero -- discarding
2466 * a non-existent object is not a problem.
2467 */
2468 if (obj_req->result == -ENOENT) {
2469 obj_req->result = 0;
43df3d35 2470 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2471 }
2472 return true;
2473 }
2474 return false;
2475 default:
2476 rbd_assert(0);
2477 }
2478}
2479
7114edac
ID
2480static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2481{
2482 struct rbd_img_request *img_req = obj_req->img_request;
2483
2484 rbd_assert((!obj_req->result &&
43df3d35 2485 obj_req->xferred == obj_req->ex.oe_len) ||
7114edac
ID
2486 (obj_req->result < 0 && !obj_req->xferred));
2487 if (!obj_req->result) {
2488 img_req->xferred += obj_req->xferred;
2489 return;
2490 }
2491
2492 rbd_warn(img_req->rbd_dev,
2493 "%s at objno %llu %llu~%llu result %d xferred %llu",
43df3d35
ID
2494 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2495 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
7114edac
ID
2496 obj_req->xferred);
2497 if (!img_req->result) {
2498 img_req->result = obj_req->result;
2499 img_req->xferred = 0;
2500 }
2501}
2502
3da691bf
ID
2503static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2504{
2505 struct rbd_obj_request *obj_req = img_req->obj_request;
2506
2507 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
86bd7998
ID
2508 rbd_assert((!img_req->result &&
2509 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2510 (img_req->result < 0 && !img_req->xferred));
3da691bf
ID
2511
2512 obj_req->result = img_req->result;
2513 obj_req->xferred = img_req->xferred;
2514 rbd_img_request_put(img_req);
7114edac 2515}
3da691bf 2516
7114edac
ID
2517static void rbd_img_end_request(struct rbd_img_request *img_req)
2518{
2519 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2520 rbd_assert((!img_req->result &&
2521 img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2522 (img_req->result < 0 && !img_req->xferred));
2523
2524 blk_mq_end_request(img_req->rq,
2525 errno_to_blk_status(img_req->result));
2526 rbd_img_request_put(img_req);
3da691bf
ID
2527}
2528
2529static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2530{
7114edac
ID
2531 struct rbd_img_request *img_req;
2532
2533again:
3da691bf
ID
2534 if (!__rbd_obj_handle_request(obj_req))
2535 return;
2536
7114edac
ID
2537 img_req = obj_req->img_request;
2538 spin_lock(&img_req->completion_lock);
2539 rbd_obj_end_request(obj_req);
2540 rbd_assert(img_req->pending_count);
2541 if (--img_req->pending_count) {
2542 spin_unlock(&img_req->completion_lock);
2543 return;
2544 }
2545
2546 spin_unlock(&img_req->completion_lock);
2547 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2548 obj_req = img_req->obj_request;
2549 rbd_img_end_child_request(img_req);
2550 goto again;
2551 }
2552 rbd_img_end_request(img_req);
3da691bf
ID
2553}
2554
ed95b21a 2555static const struct rbd_client_id rbd_empty_cid;
b8d70035 2556
ed95b21a
ID
2557static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2558 const struct rbd_client_id *rhs)
2559{
2560 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2561}
2562
2563static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2564{
2565 struct rbd_client_id cid;
2566
2567 mutex_lock(&rbd_dev->watch_mutex);
2568 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2569 cid.handle = rbd_dev->watch_cookie;
2570 mutex_unlock(&rbd_dev->watch_mutex);
2571 return cid;
2572}
2573
2574/*
2575 * lock_rwsem must be held for write
2576 */
2577static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2578 const struct rbd_client_id *cid)
2579{
2580 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2581 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2582 cid->gid, cid->handle);
2583 rbd_dev->owner_cid = *cid; /* struct */
2584}
2585
2586static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2587{
2588 mutex_lock(&rbd_dev->watch_mutex);
2589 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2590 mutex_unlock(&rbd_dev->watch_mutex);
2591}
2592
edd8ca80
FM
2593static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2594{
2595 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2596
2597 strcpy(rbd_dev->lock_cookie, cookie);
2598 rbd_set_owner_cid(rbd_dev, &cid);
2599 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2600}
2601
ed95b21a
ID
2602/*
2603 * lock_rwsem must be held for write
2604 */
2605static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 2606{
922dab61 2607 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2608 char cookie[32];
e627db08 2609 int ret;
b8d70035 2610
cbbfb0ff
ID
2611 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2612 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 2613
ed95b21a
ID
2614 format_lock_cookie(rbd_dev, cookie);
2615 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2616 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2617 RBD_LOCK_TAG, "", 0);
e627db08 2618 if (ret)
ed95b21a 2619 return ret;
b8d70035 2620
ed95b21a 2621 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
edd8ca80 2622 __rbd_lock(rbd_dev, cookie);
ed95b21a 2623 return 0;
b8d70035
AE
2624}
2625
ed95b21a
ID
2626/*
2627 * lock_rwsem must be held for write
2628 */
bbead745 2629static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 2630{
922dab61 2631 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
2632 int ret;
2633
cbbfb0ff
ID
2634 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2635 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 2636
ed95b21a 2637 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 2638 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
2639 if (ret && ret != -ENOENT)
2640 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 2641
bbead745
ID
2642 /* treat errors as the image is unlocked */
2643 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 2644 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
2645 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2646 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
2647}
2648
ed95b21a
ID
2649static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2650 enum rbd_notify_op notify_op,
2651 struct page ***preply_pages,
2652 size_t *preply_len)
9969ebc5
AE
2653{
2654 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
2655 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2656 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
2657 char buf[buf_size];
2658 void *p = buf;
9969ebc5 2659
ed95b21a 2660 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 2661
ed95b21a
ID
2662 /* encode *LockPayload NotifyMessage (op + ClientId) */
2663 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2664 ceph_encode_32(&p, notify_op);
2665 ceph_encode_64(&p, cid.gid);
2666 ceph_encode_64(&p, cid.handle);
8eb87565 2667
ed95b21a
ID
2668 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2669 &rbd_dev->header_oloc, buf, buf_size,
2670 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
2671}
2672
ed95b21a
ID
2673static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2674 enum rbd_notify_op notify_op)
b30a01f2 2675{
ed95b21a
ID
2676 struct page **reply_pages;
2677 size_t reply_len;
b30a01f2 2678
ed95b21a
ID
2679 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2680 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2681}
b30a01f2 2682
ed95b21a
ID
2683static void rbd_notify_acquired_lock(struct work_struct *work)
2684{
2685 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2686 acquired_lock_work);
76756a51 2687
ed95b21a 2688 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
2689}
2690
ed95b21a 2691static void rbd_notify_released_lock(struct work_struct *work)
c525f036 2692{
ed95b21a
ID
2693 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2694 released_lock_work);
811c6688 2695
ed95b21a 2696 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
2697}
2698
ed95b21a 2699static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 2700{
ed95b21a
ID
2701 struct page **reply_pages;
2702 size_t reply_len;
2703 bool lock_owner_responded = false;
36be9a76
AE
2704 int ret;
2705
ed95b21a 2706 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 2707
ed95b21a
ID
2708 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2709 &reply_pages, &reply_len);
2710 if (ret && ret != -ETIMEDOUT) {
2711 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 2712 goto out;
ed95b21a 2713 }
36be9a76 2714
ed95b21a
ID
2715 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2716 void *p = page_address(reply_pages[0]);
2717 void *const end = p + reply_len;
2718 u32 n;
36be9a76 2719
ed95b21a
ID
2720 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2721 while (n--) {
2722 u8 struct_v;
2723 u32 len;
36be9a76 2724
ed95b21a
ID
2725 ceph_decode_need(&p, end, 8 + 8, e_inval);
2726 p += 8 + 8; /* skip gid and cookie */
04017e29 2727
ed95b21a
ID
2728 ceph_decode_32_safe(&p, end, len, e_inval);
2729 if (!len)
2730 continue;
2731
2732 if (lock_owner_responded) {
2733 rbd_warn(rbd_dev,
2734 "duplicate lock owners detected");
2735 ret = -EIO;
2736 goto out;
2737 }
2738
2739 lock_owner_responded = true;
2740 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2741 &struct_v, &len);
2742 if (ret) {
2743 rbd_warn(rbd_dev,
2744 "failed to decode ResponseMessage: %d",
2745 ret);
2746 goto e_inval;
2747 }
2748
2749 ret = ceph_decode_32(&p);
2750 }
2751 }
2752
2753 if (!lock_owner_responded) {
2754 rbd_warn(rbd_dev, "no lock owners detected");
2755 ret = -ETIMEDOUT;
2756 }
2757
2758out:
2759 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2760 return ret;
2761
2762e_inval:
2763 ret = -EINVAL;
2764 goto out;
2765}
2766
2767static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2768{
2769 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2770
2771 cancel_delayed_work(&rbd_dev->lock_dwork);
2772 if (wake_all)
2773 wake_up_all(&rbd_dev->lock_waitq);
2774 else
2775 wake_up(&rbd_dev->lock_waitq);
2776}
2777
2778static int get_lock_owner_info(struct rbd_device *rbd_dev,
2779 struct ceph_locker **lockers, u32 *num_lockers)
2780{
2781 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2782 u8 lock_type;
2783 char *lock_tag;
2784 int ret;
2785
2786 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2787
2788 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2789 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2790 &lock_type, &lock_tag, lockers, num_lockers);
2791 if (ret)
2792 return ret;
2793
2794 if (*num_lockers == 0) {
2795 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2796 goto out;
2797 }
2798
2799 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2800 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2801 lock_tag);
2802 ret = -EBUSY;
2803 goto out;
2804 }
2805
2806 if (lock_type == CEPH_CLS_LOCK_SHARED) {
2807 rbd_warn(rbd_dev, "shared lock type detected");
2808 ret = -EBUSY;
2809 goto out;
2810 }
2811
2812 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2813 strlen(RBD_LOCK_COOKIE_PREFIX))) {
2814 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2815 (*lockers)[0].id.cookie);
2816 ret = -EBUSY;
2817 goto out;
2818 }
2819
2820out:
2821 kfree(lock_tag);
2822 return ret;
2823}
2824
2825static int find_watcher(struct rbd_device *rbd_dev,
2826 const struct ceph_locker *locker)
2827{
2828 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2829 struct ceph_watch_item *watchers;
2830 u32 num_watchers;
2831 u64 cookie;
2832 int i;
2833 int ret;
2834
2835 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2836 &rbd_dev->header_oloc, &watchers,
2837 &num_watchers);
2838 if (ret)
2839 return ret;
2840
2841 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2842 for (i = 0; i < num_watchers; i++) {
2843 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2844 sizeof(locker->info.addr)) &&
2845 watchers[i].cookie == cookie) {
2846 struct rbd_client_id cid = {
2847 .gid = le64_to_cpu(watchers[i].name.num),
2848 .handle = cookie,
2849 };
2850
2851 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2852 rbd_dev, cid.gid, cid.handle);
2853 rbd_set_owner_cid(rbd_dev, &cid);
2854 ret = 1;
2855 goto out;
2856 }
2857 }
2858
2859 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2860 ret = 0;
2861out:
2862 kfree(watchers);
2863 return ret;
2864}
2865
2866/*
2867 * lock_rwsem must be held for write
2868 */
2869static int rbd_try_lock(struct rbd_device *rbd_dev)
2870{
2871 struct ceph_client *client = rbd_dev->rbd_client->client;
2872 struct ceph_locker *lockers;
2873 u32 num_lockers;
2874 int ret;
2875
2876 for (;;) {
2877 ret = rbd_lock(rbd_dev);
2878 if (ret != -EBUSY)
2879 return ret;
2880
2881 /* determine if the current lock holder is still alive */
2882 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2883 if (ret)
2884 return ret;
2885
2886 if (num_lockers == 0)
2887 goto again;
2888
2889 ret = find_watcher(rbd_dev, lockers);
2890 if (ret) {
2891 if (ret > 0)
2892 ret = 0; /* have to request lock */
2893 goto out;
2894 }
2895
2896 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2897 ENTITY_NAME(lockers[0].id.name));
2898
2899 ret = ceph_monc_blacklist_add(&client->monc,
2900 &lockers[0].info.addr);
2901 if (ret) {
2902 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2903 ENTITY_NAME(lockers[0].id.name), ret);
2904 goto out;
2905 }
2906
2907 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2908 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2909 lockers[0].id.cookie,
2910 &lockers[0].id.name);
2911 if (ret && ret != -ENOENT)
2912 goto out;
2913
2914again:
2915 ceph_free_lockers(lockers, num_lockers);
2916 }
2917
2918out:
2919 ceph_free_lockers(lockers, num_lockers);
2920 return ret;
2921}
2922
2923/*
2924 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2925 */
2926static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2927 int *pret)
2928{
2929 enum rbd_lock_state lock_state;
2930
2931 down_read(&rbd_dev->lock_rwsem);
2932 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
2933 rbd_dev->lock_state);
2934 if (__rbd_is_lock_owner(rbd_dev)) {
2935 lock_state = rbd_dev->lock_state;
2936 up_read(&rbd_dev->lock_rwsem);
2937 return lock_state;
2938 }
2939
2940 up_read(&rbd_dev->lock_rwsem);
2941 down_write(&rbd_dev->lock_rwsem);
2942 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
2943 rbd_dev->lock_state);
2944 if (!__rbd_is_lock_owner(rbd_dev)) {
2945 *pret = rbd_try_lock(rbd_dev);
2946 if (*pret)
2947 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
2948 }
2949
2950 lock_state = rbd_dev->lock_state;
2951 up_write(&rbd_dev->lock_rwsem);
2952 return lock_state;
2953}
2954
2955static void rbd_acquire_lock(struct work_struct *work)
2956{
2957 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
2958 struct rbd_device, lock_dwork);
2959 enum rbd_lock_state lock_state;
37f13252 2960 int ret = 0;
ed95b21a
ID
2961
2962 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2963again:
2964 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
2965 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
2966 if (lock_state == RBD_LOCK_STATE_LOCKED)
2967 wake_requests(rbd_dev, true);
2968 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
2969 rbd_dev, lock_state, ret);
2970 return;
2971 }
2972
2973 ret = rbd_request_lock(rbd_dev);
2974 if (ret == -ETIMEDOUT) {
2975 goto again; /* treat this as a dead client */
e010dd0a
ID
2976 } else if (ret == -EROFS) {
2977 rbd_warn(rbd_dev, "peer will not release lock");
2978 /*
2979 * If this is rbd_add_acquire_lock(), we want to fail
2980 * immediately -- reuse BLACKLISTED flag. Otherwise we
2981 * want to block.
2982 */
2983 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
2984 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
2985 /* wake "rbd map --exclusive" process */
2986 wake_requests(rbd_dev, false);
2987 }
ed95b21a
ID
2988 } else if (ret < 0) {
2989 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
2990 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
2991 RBD_RETRY_DELAY);
2992 } else {
2993 /*
2994 * lock owner acked, but resend if we don't see them
2995 * release the lock
2996 */
2997 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
2998 rbd_dev);
2999 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3000 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3001 }
3002}
3003
3004/*
3005 * lock_rwsem must be held for write
3006 */
3007static bool rbd_release_lock(struct rbd_device *rbd_dev)
3008{
3009 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3010 rbd_dev->lock_state);
3011 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3012 return false;
3013
3014 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3015 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3016 /*
ed95b21a 3017 * Ensure that all in-flight IO is flushed.
52bb1f9b 3018 *
ed95b21a
ID
3019 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3020 * may be shared with other devices.
52bb1f9b 3021 */
ed95b21a
ID
3022 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3023 up_read(&rbd_dev->lock_rwsem);
3024
3025 down_write(&rbd_dev->lock_rwsem);
3026 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3027 rbd_dev->lock_state);
3028 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3029 return false;
3030
bbead745
ID
3031 rbd_unlock(rbd_dev);
3032 /*
3033 * Give others a chance to grab the lock - we would re-acquire
3034 * almost immediately if we got new IO during ceph_osdc_sync()
3035 * otherwise. We need to ack our own notifications, so this
3036 * lock_dwork will be requeued from rbd_wait_state_locked()
3037 * after wake_requests() in rbd_handle_released_lock().
3038 */
3039 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3040 return true;
3041}
3042
3043static void rbd_release_lock_work(struct work_struct *work)
3044{
3045 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3046 unlock_work);
3047
3048 down_write(&rbd_dev->lock_rwsem);
3049 rbd_release_lock(rbd_dev);
3050 up_write(&rbd_dev->lock_rwsem);
3051}
3052
3053static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3054 void **p)
3055{
3056 struct rbd_client_id cid = { 0 };
3057
3058 if (struct_v >= 2) {
3059 cid.gid = ceph_decode_64(p);
3060 cid.handle = ceph_decode_64(p);
3061 }
3062
3063 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3064 cid.handle);
3065 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3066 down_write(&rbd_dev->lock_rwsem);
3067 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3068 /*
3069 * we already know that the remote client is
3070 * the owner
3071 */
3072 up_write(&rbd_dev->lock_rwsem);
3073 return;
3074 }
3075
3076 rbd_set_owner_cid(rbd_dev, &cid);
3077 downgrade_write(&rbd_dev->lock_rwsem);
3078 } else {
3079 down_read(&rbd_dev->lock_rwsem);
3080 }
3081
3082 if (!__rbd_is_lock_owner(rbd_dev))
3083 wake_requests(rbd_dev, false);
3084 up_read(&rbd_dev->lock_rwsem);
3085}
3086
3087static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3088 void **p)
3089{
3090 struct rbd_client_id cid = { 0 };
3091
3092 if (struct_v >= 2) {
3093 cid.gid = ceph_decode_64(p);
3094 cid.handle = ceph_decode_64(p);
3095 }
3096
3097 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3098 cid.handle);
3099 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3100 down_write(&rbd_dev->lock_rwsem);
3101 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3102 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3103 __func__, rbd_dev, cid.gid, cid.handle,
3104 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3105 up_write(&rbd_dev->lock_rwsem);
3106 return;
3107 }
3108
3109 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3110 downgrade_write(&rbd_dev->lock_rwsem);
3111 } else {
3112 down_read(&rbd_dev->lock_rwsem);
3113 }
3114
3115 if (!__rbd_is_lock_owner(rbd_dev))
3116 wake_requests(rbd_dev, false);
3117 up_read(&rbd_dev->lock_rwsem);
3118}
3119
3b77faa0
ID
3120/*
3121 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3122 * ResponseMessage is needed.
3123 */
3124static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3125 void **p)
ed95b21a
ID
3126{
3127 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3128 struct rbd_client_id cid = { 0 };
3b77faa0 3129 int result = 1;
ed95b21a
ID
3130
3131 if (struct_v >= 2) {
3132 cid.gid = ceph_decode_64(p);
3133 cid.handle = ceph_decode_64(p);
3134 }
3135
3136 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3137 cid.handle);
3138 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3139 return result;
ed95b21a
ID
3140
3141 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3142 if (__rbd_is_lock_owner(rbd_dev)) {
3143 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3144 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3145 goto out_unlock;
3146
3147 /*
3148 * encode ResponseMessage(0) so the peer can detect
3149 * a missing owner
3150 */
3151 result = 0;
3152
3153 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3154 if (!rbd_dev->opts->exclusive) {
3155 dout("%s rbd_dev %p queueing unlock_work\n",
3156 __func__, rbd_dev);
3157 queue_work(rbd_dev->task_wq,
3158 &rbd_dev->unlock_work);
3159 } else {
3160 /* refuse to release the lock */
3161 result = -EROFS;
3162 }
ed95b21a
ID
3163 }
3164 }
3b77faa0
ID
3165
3166out_unlock:
ed95b21a 3167 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3168 return result;
ed95b21a
ID
3169}
3170
3171static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3172 u64 notify_id, u64 cookie, s32 *result)
3173{
3174 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3175 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3176 char buf[buf_size];
3177 int ret;
3178
3179 if (result) {
3180 void *p = buf;
3181
3182 /* encode ResponseMessage */
3183 ceph_start_encoding(&p, 1, 1,
3184 buf_size - CEPH_ENCODING_START_BLK_LEN);
3185 ceph_encode_32(&p, *result);
3186 } else {
3187 buf_size = 0;
3188 }
b8d70035 3189
922dab61
ID
3190 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3191 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3192 buf, buf_size);
52bb1f9b 3193 if (ret)
ed95b21a
ID
3194 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3195}
3196
3197static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3198 u64 cookie)
3199{
3200 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3201 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3202}
3203
3204static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3205 u64 notify_id, u64 cookie, s32 result)
3206{
3207 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3208 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3209}
3210
3211static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3212 u64 notifier_id, void *data, size_t data_len)
3213{
3214 struct rbd_device *rbd_dev = arg;
3215 void *p = data;
3216 void *const end = p + data_len;
d4c2269b 3217 u8 struct_v = 0;
ed95b21a
ID
3218 u32 len;
3219 u32 notify_op;
3220 int ret;
3221
3222 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3223 __func__, rbd_dev, cookie, notify_id, data_len);
3224 if (data_len) {
3225 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3226 &struct_v, &len);
3227 if (ret) {
3228 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3229 ret);
3230 return;
3231 }
3232
3233 notify_op = ceph_decode_32(&p);
3234 } else {
3235 /* legacy notification for header updates */
3236 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3237 len = 0;
3238 }
3239
3240 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3241 switch (notify_op) {
3242 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3243 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3244 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3245 break;
3246 case RBD_NOTIFY_OP_RELEASED_LOCK:
3247 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3248 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3249 break;
3250 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3251 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3252 if (ret <= 0)
ed95b21a 3253 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3254 cookie, ret);
ed95b21a
ID
3255 else
3256 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3257 break;
3258 case RBD_NOTIFY_OP_HEADER_UPDATE:
3259 ret = rbd_dev_refresh(rbd_dev);
3260 if (ret)
3261 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3262
3263 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3264 break;
3265 default:
3266 if (rbd_is_lock_owner(rbd_dev))
3267 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3268 cookie, -EOPNOTSUPP);
3269 else
3270 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3271 break;
3272 }
b8d70035
AE
3273}
3274
99d16943
ID
3275static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3276
922dab61 3277static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3278{
922dab61 3279 struct rbd_device *rbd_dev = arg;
bb040aa0 3280
922dab61 3281 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3282
ed95b21a
ID
3283 down_write(&rbd_dev->lock_rwsem);
3284 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3285 up_write(&rbd_dev->lock_rwsem);
3286
99d16943
ID
3287 mutex_lock(&rbd_dev->watch_mutex);
3288 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3289 __rbd_unregister_watch(rbd_dev);
3290 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3291
99d16943 3292 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3293 }
99d16943 3294 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3295}
3296
9969ebc5 3297/*
99d16943 3298 * watch_mutex must be locked
9969ebc5 3299 */
99d16943 3300static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3301{
3302 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3303 struct ceph_osd_linger_request *handle;
9969ebc5 3304
922dab61 3305 rbd_assert(!rbd_dev->watch_handle);
99d16943 3306 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3307
922dab61
ID
3308 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3309 &rbd_dev->header_oloc, rbd_watch_cb,
3310 rbd_watch_errcb, rbd_dev);
3311 if (IS_ERR(handle))
3312 return PTR_ERR(handle);
8eb87565 3313
922dab61 3314 rbd_dev->watch_handle = handle;
b30a01f2 3315 return 0;
b30a01f2
ID
3316}
3317
99d16943
ID
3318/*
3319 * watch_mutex must be locked
3320 */
3321static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3322{
922dab61
ID
3323 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3324 int ret;
b30a01f2 3325
99d16943
ID
3326 rbd_assert(rbd_dev->watch_handle);
3327 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3328
922dab61
ID
3329 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3330 if (ret)
3331 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3332
922dab61 3333 rbd_dev->watch_handle = NULL;
c525f036
ID
3334}
3335
99d16943
ID
3336static int rbd_register_watch(struct rbd_device *rbd_dev)
3337{
3338 int ret;
3339
3340 mutex_lock(&rbd_dev->watch_mutex);
3341 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3342 ret = __rbd_register_watch(rbd_dev);
3343 if (ret)
3344 goto out;
3345
3346 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3347 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3348
3349out:
3350 mutex_unlock(&rbd_dev->watch_mutex);
3351 return ret;
3352}
3353
3354static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3355{
99d16943
ID
3356 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3357
3358 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3359 cancel_work_sync(&rbd_dev->acquired_lock_work);
3360 cancel_work_sync(&rbd_dev->released_lock_work);
3361 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3362 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3363}
3364
3365static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3366{
ed95b21a 3367 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3368 cancel_tasks_sync(rbd_dev);
3369
3370 mutex_lock(&rbd_dev->watch_mutex);
3371 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3372 __rbd_unregister_watch(rbd_dev);
3373 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3374 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3375
811c6688 3376 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3377}
3378
14bb211d
ID
3379/*
3380 * lock_rwsem must be held for write
3381 */
3382static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3383{
3384 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3385 char cookie[32];
3386 int ret;
3387
3388 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3389
3390 format_lock_cookie(rbd_dev, cookie);
3391 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3392 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3393 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3394 RBD_LOCK_TAG, cookie);
3395 if (ret) {
3396 if (ret != -EOPNOTSUPP)
3397 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3398 ret);
3399
3400 /*
3401 * Lock cookie cannot be updated on older OSDs, so do
3402 * a manual release and queue an acquire.
3403 */
3404 if (rbd_release_lock(rbd_dev))
3405 queue_delayed_work(rbd_dev->task_wq,
3406 &rbd_dev->lock_dwork, 0);
3407 } else {
edd8ca80 3408 __rbd_lock(rbd_dev, cookie);
14bb211d
ID
3409 }
3410}
3411
99d16943
ID
3412static void rbd_reregister_watch(struct work_struct *work)
3413{
3414 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3415 struct rbd_device, watch_dwork);
3416 int ret;
3417
3418 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3419
3420 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3421 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3422 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3423 return;
87c0fded 3424 }
99d16943
ID
3425
3426 ret = __rbd_register_watch(rbd_dev);
3427 if (ret) {
3428 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3429 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3430 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3431 wake_requests(rbd_dev, true);
87c0fded 3432 } else {
99d16943
ID
3433 queue_delayed_work(rbd_dev->task_wq,
3434 &rbd_dev->watch_dwork,
3435 RBD_RETRY_DELAY);
87c0fded
ID
3436 }
3437 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3438 return;
99d16943
ID
3439 }
3440
3441 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3442 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3443 mutex_unlock(&rbd_dev->watch_mutex);
3444
14bb211d
ID
3445 down_write(&rbd_dev->lock_rwsem);
3446 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3447 rbd_reacquire_lock(rbd_dev);
3448 up_write(&rbd_dev->lock_rwsem);
3449
99d16943
ID
3450 ret = rbd_dev_refresh(rbd_dev);
3451 if (ret)
3452 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
99d16943
ID
3453}
3454
36be9a76 3455/*
f40eb349
AE
3456 * Synchronous osd object method call. Returns the number of bytes
3457 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3458 */
3459static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3460 struct ceph_object_id *oid,
3461 struct ceph_object_locator *oloc,
36be9a76 3462 const char *method_name,
4157976b 3463 const void *outbound,
36be9a76 3464 size_t outbound_size,
4157976b 3465 void *inbound,
e2a58ee5 3466 size_t inbound_size)
36be9a76 3467{
ecd4a68a
ID
3468 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3469 struct page *req_page = NULL;
3470 struct page *reply_page;
36be9a76
AE
3471 int ret;
3472
3473 /*
6010a451
AE
3474 * Method calls are ultimately read operations. The result
3475 * should placed into the inbound buffer provided. They
3476 * also supply outbound data--parameters for the object
3477 * method. Currently if this is present it will be a
3478 * snapshot id.
36be9a76 3479 */
ecd4a68a
ID
3480 if (outbound) {
3481 if (outbound_size > PAGE_SIZE)
3482 return -E2BIG;
36be9a76 3483
ecd4a68a
ID
3484 req_page = alloc_page(GFP_KERNEL);
3485 if (!req_page)
3486 return -ENOMEM;
04017e29 3487
ecd4a68a 3488 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3489 }
36be9a76 3490
ecd4a68a
ID
3491 reply_page = alloc_page(GFP_KERNEL);
3492 if (!reply_page) {
3493 if (req_page)
3494 __free_page(req_page);
3495 return -ENOMEM;
3496 }
57385b51 3497
ecd4a68a
ID
3498 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3499 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3500 reply_page, &inbound_size);
3501 if (!ret) {
3502 memcpy(inbound, page_address(reply_page), inbound_size);
3503 ret = inbound_size;
3504 }
36be9a76 3505
ecd4a68a
ID
3506 if (req_page)
3507 __free_page(req_page);
3508 __free_page(reply_page);
36be9a76
AE
3509 return ret;
3510}
3511
ed95b21a
ID
3512/*
3513 * lock_rwsem must be held for read
3514 */
3515static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3516{
3517 DEFINE_WAIT(wait);
3518
3519 do {
3520 /*
3521 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3522 * and cancel_delayed_work() in wake_requests().
3523 */
3524 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3525 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3526 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3527 TASK_UNINTERRUPTIBLE);
3528 up_read(&rbd_dev->lock_rwsem);
3529 schedule();
3530 down_read(&rbd_dev->lock_rwsem);
87c0fded
ID
3531 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3532 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3533
ed95b21a
ID
3534 finish_wait(&rbd_dev->lock_waitq, &wait);
3535}
3536
7ad18afa 3537static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 3538{
7ad18afa
CH
3539 struct request *rq = blk_mq_rq_from_pdu(work);
3540 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 3541 struct rbd_img_request *img_request;
4e752f0a 3542 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
3543 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3544 u64 length = blk_rq_bytes(rq);
6d2940c8 3545 enum obj_operation_type op_type;
4e752f0a 3546 u64 mapping_size;
80de1912 3547 bool must_be_locked;
bf0d5f50
AE
3548 int result;
3549
aebf526b
CH
3550 switch (req_op(rq)) {
3551 case REQ_OP_DISCARD:
6ac56951 3552 case REQ_OP_WRITE_ZEROES:
90e98c52 3553 op_type = OBJ_OP_DISCARD;
aebf526b
CH
3554 break;
3555 case REQ_OP_WRITE:
6d2940c8 3556 op_type = OBJ_OP_WRITE;
aebf526b
CH
3557 break;
3558 case REQ_OP_READ:
6d2940c8 3559 op_type = OBJ_OP_READ;
aebf526b
CH
3560 break;
3561 default:
3562 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3563 result = -EIO;
3564 goto err;
3565 }
6d2940c8 3566
bc1ecc65 3567 /* Ignore/skip any zero-length requests */
bf0d5f50 3568
bc1ecc65
ID
3569 if (!length) {
3570 dout("%s: zero-length request\n", __func__);
3571 result = 0;
3572 goto err_rq;
3573 }
bf0d5f50 3574
9568c93e
ID
3575 rbd_assert(op_type == OBJ_OP_READ ||
3576 rbd_dev->spec->snap_id == CEPH_NOSNAP);
4dda41d3 3577
bc1ecc65
ID
3578 /*
3579 * Quit early if the mapped snapshot no longer exists. It's
3580 * still possible the snapshot will have disappeared by the
3581 * time our request arrives at the osd, but there's no sense in
3582 * sending it if we already know.
3583 */
3584 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3585 dout("request for non-existent snapshot");
3586 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3587 result = -ENXIO;
3588 goto err_rq;
3589 }
4dda41d3 3590
bc1ecc65
ID
3591 if (offset && length > U64_MAX - offset + 1) {
3592 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3593 length);
3594 result = -EINVAL;
3595 goto err_rq; /* Shouldn't happen */
3596 }
4dda41d3 3597
7ad18afa
CH
3598 blk_mq_start_request(rq);
3599
4e752f0a
JD
3600 down_read(&rbd_dev->header_rwsem);
3601 mapping_size = rbd_dev->mapping.size;
6d2940c8 3602 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
3603 snapc = rbd_dev->header.snapc;
3604 ceph_get_snap_context(snapc);
3605 }
3606 up_read(&rbd_dev->header_rwsem);
3607
3608 if (offset + length > mapping_size) {
bc1ecc65 3609 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 3610 length, mapping_size);
bc1ecc65
ID
3611 result = -EIO;
3612 goto err_rq;
3613 }
bf0d5f50 3614
f9bebd58
ID
3615 must_be_locked =
3616 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3617 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
3618 if (must_be_locked) {
3619 down_read(&rbd_dev->lock_rwsem);
87c0fded 3620 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
e010dd0a
ID
3621 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3622 if (rbd_dev->opts->exclusive) {
3623 rbd_warn(rbd_dev, "exclusive lock required");
3624 result = -EROFS;
3625 goto err_unlock;
3626 }
ed95b21a 3627 rbd_wait_state_locked(rbd_dev);
e010dd0a 3628 }
87c0fded
ID
3629 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3630 result = -EBLACKLISTED;
3631 goto err_unlock;
3632 }
ed95b21a
ID
3633 }
3634
dfd9875f 3635 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
bc1ecc65
ID
3636 if (!img_request) {
3637 result = -ENOMEM;
ed95b21a 3638 goto err_unlock;
bc1ecc65
ID
3639 }
3640 img_request->rq = rq;
70b16db8 3641 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 3642
90e98c52 3643 if (op_type == OBJ_OP_DISCARD)
5a237819
ID
3644 result = rbd_img_fill_nodata(img_request, offset, length);
3645 else
3646 result = rbd_img_fill_from_bio(img_request, offset, length,
3647 rq->bio);
bc1ecc65
ID
3648 if (result)
3649 goto err_img_request;
bf0d5f50 3650
efbd1a11 3651 rbd_img_request_submit(img_request);
ed95b21a
ID
3652 if (must_be_locked)
3653 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 3654 return;
bf0d5f50 3655
bc1ecc65
ID
3656err_img_request:
3657 rbd_img_request_put(img_request);
ed95b21a
ID
3658err_unlock:
3659 if (must_be_locked)
3660 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
3661err_rq:
3662 if (result)
3663 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 3664 obj_op_name(op_type), length, offset, result);
e96a650a 3665 ceph_put_snap_context(snapc);
7ad18afa 3666err:
2a842aca 3667 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 3668}
bf0d5f50 3669
fc17b653 3670static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 3671 const struct blk_mq_queue_data *bd)
bc1ecc65 3672{
7ad18afa
CH
3673 struct request *rq = bd->rq;
3674 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 3675
7ad18afa 3676 queue_work(rbd_wq, work);
fc17b653 3677 return BLK_STS_OK;
bf0d5f50
AE
3678}
3679
602adf40
YS
3680static void rbd_free_disk(struct rbd_device *rbd_dev)
3681{
5769ed0c
ID
3682 blk_cleanup_queue(rbd_dev->disk->queue);
3683 blk_mq_free_tag_set(&rbd_dev->tag_set);
3684 put_disk(rbd_dev->disk);
a0cab924 3685 rbd_dev->disk = NULL;
602adf40
YS
3686}
3687
788e2df3 3688static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
3689 struct ceph_object_id *oid,
3690 struct ceph_object_locator *oloc,
3691 void *buf, int buf_len)
788e2df3
AE
3692
3693{
fe5478e0
ID
3694 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3695 struct ceph_osd_request *req;
3696 struct page **pages;
3697 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
3698 int ret;
3699
fe5478e0
ID
3700 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3701 if (!req)
3702 return -ENOMEM;
788e2df3 3703
fe5478e0
ID
3704 ceph_oid_copy(&req->r_base_oid, oid);
3705 ceph_oloc_copy(&req->r_base_oloc, oloc);
3706 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 3707
fe5478e0 3708 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 3709 if (ret)
fe5478e0 3710 goto out_req;
788e2df3 3711
fe5478e0
ID
3712 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3713 if (IS_ERR(pages)) {
3714 ret = PTR_ERR(pages);
3715 goto out_req;
3716 }
1ceae7ef 3717
fe5478e0
ID
3718 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3719 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3720 true);
3721
3722 ceph_osdc_start_request(osdc, req, false);
3723 ret = ceph_osdc_wait_request(osdc, req);
3724 if (ret >= 0)
3725 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 3726
fe5478e0
ID
3727out_req:
3728 ceph_osdc_put_request(req);
788e2df3
AE
3729 return ret;
3730}
3731
602adf40 3732/*
662518b1
AE
3733 * Read the complete header for the given rbd device. On successful
3734 * return, the rbd_dev->header field will contain up-to-date
3735 * information about the image.
602adf40 3736 */
99a41ebc 3737static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3738{
4156d998 3739 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3740 u32 snap_count = 0;
4156d998
AE
3741 u64 names_size = 0;
3742 u32 want_count;
3743 int ret;
602adf40 3744
00f1f36f 3745 /*
4156d998
AE
3746 * The complete header will include an array of its 64-bit
3747 * snapshot ids, followed by the names of those snapshots as
3748 * a contiguous block of NUL-terminated strings. Note that
3749 * the number of snapshots could change by the time we read
3750 * it in, in which case we re-read it.
00f1f36f 3751 */
4156d998
AE
3752 do {
3753 size_t size;
3754
3755 kfree(ondisk);
3756
3757 size = sizeof (*ondisk);
3758 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3759 size += names_size;
3760 ondisk = kmalloc(size, GFP_KERNEL);
3761 if (!ondisk)
662518b1 3762 return -ENOMEM;
4156d998 3763
fe5478e0
ID
3764 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3765 &rbd_dev->header_oloc, ondisk, size);
4156d998 3766 if (ret < 0)
662518b1 3767 goto out;
c0cd10db 3768 if ((size_t)ret < size) {
4156d998 3769 ret = -ENXIO;
06ecc6cb
AE
3770 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3771 size, ret);
662518b1 3772 goto out;
4156d998
AE
3773 }
3774 if (!rbd_dev_ondisk_valid(ondisk)) {
3775 ret = -ENXIO;
06ecc6cb 3776 rbd_warn(rbd_dev, "invalid header");
662518b1 3777 goto out;
81e759fb 3778 }
602adf40 3779
4156d998
AE
3780 names_size = le64_to_cpu(ondisk->snap_names_len);
3781 want_count = snap_count;
3782 snap_count = le32_to_cpu(ondisk->snap_count);
3783 } while (snap_count != want_count);
00f1f36f 3784
662518b1
AE
3785 ret = rbd_header_from_disk(rbd_dev, ondisk);
3786out:
4156d998
AE
3787 kfree(ondisk);
3788
3789 return ret;
602adf40
YS
3790}
3791
15228ede
AE
3792/*
3793 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3794 * has disappeared from the (just updated) snapshot context.
3795 */
3796static void rbd_exists_validate(struct rbd_device *rbd_dev)
3797{
3798 u64 snap_id;
3799
3800 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3801 return;
3802
3803 snap_id = rbd_dev->spec->snap_id;
3804 if (snap_id == CEPH_NOSNAP)
3805 return;
3806
3807 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3808 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3809}
3810
9875201e
JD
3811static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3812{
3813 sector_t size;
9875201e
JD
3814
3815 /*
811c6688
ID
3816 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3817 * try to update its size. If REMOVING is set, updating size
3818 * is just useless work since the device can't be opened.
9875201e 3819 */
811c6688
ID
3820 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3821 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
3822 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3823 dout("setting size to %llu sectors", (unsigned long long)size);
3824 set_capacity(rbd_dev->disk, size);
3825 revalidate_disk(rbd_dev->disk);
3826 }
3827}
3828
cc4a38bd 3829static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3830{
e627db08 3831 u64 mapping_size;
1fe5e993
AE
3832 int ret;
3833
cfbf6377 3834 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3835 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
3836
3837 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 3838 if (ret)
73e39e4d 3839 goto out;
15228ede 3840
e8f59b59
ID
3841 /*
3842 * If there is a parent, see if it has disappeared due to the
3843 * mapped image getting flattened.
3844 */
3845 if (rbd_dev->parent) {
3846 ret = rbd_dev_v2_parent_info(rbd_dev);
3847 if (ret)
73e39e4d 3848 goto out;
e8f59b59
ID
3849 }
3850
5ff1108c 3851 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 3852 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
3853 } else {
3854 /* validate mapped snapshot's EXISTS flag */
3855 rbd_exists_validate(rbd_dev);
3856 }
15228ede 3857
73e39e4d 3858out:
cfbf6377 3859 up_write(&rbd_dev->header_rwsem);
73e39e4d 3860 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 3861 rbd_dev_update_size(rbd_dev);
1fe5e993 3862
73e39e4d 3863 return ret;
1fe5e993
AE
3864}
3865
d6296d39
CH
3866static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3867 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
3868{
3869 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3870
3871 INIT_WORK(work, rbd_queue_workfn);
3872 return 0;
3873}
3874
f363b089 3875static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 3876 .queue_rq = rbd_queue_rq,
7ad18afa
CH
3877 .init_request = rbd_init_request,
3878};
3879
602adf40
YS
3880static int rbd_init_disk(struct rbd_device *rbd_dev)
3881{
3882 struct gendisk *disk;
3883 struct request_queue *q;
593a9e7b 3884 u64 segment_size;
7ad18afa 3885 int err;
602adf40 3886
602adf40 3887 /* create gendisk info */
7e513d43
ID
3888 disk = alloc_disk(single_major ?
3889 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3890 RBD_MINORS_PER_MAJOR);
602adf40 3891 if (!disk)
1fcdb8aa 3892 return -ENOMEM;
602adf40 3893
f0f8cef5 3894 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3895 rbd_dev->dev_id);
602adf40 3896 disk->major = rbd_dev->major;
dd82fff1 3897 disk->first_minor = rbd_dev->minor;
7e513d43
ID
3898 if (single_major)
3899 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
3900 disk->fops = &rbd_bd_ops;
3901 disk->private_data = rbd_dev;
3902
7ad18afa
CH
3903 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3904 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 3905 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 3906 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 3907 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
3908 rbd_dev->tag_set.nr_hw_queues = 1;
3909 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3910
3911 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3912 if (err)
602adf40 3913 goto out_disk;
029bcbd8 3914
7ad18afa
CH
3915 q = blk_mq_init_queue(&rbd_dev->tag_set);
3916 if (IS_ERR(q)) {
3917 err = PTR_ERR(q);
3918 goto out_tag_set;
3919 }
3920
d8a2c89c
ID
3921 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
3922 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 3923
029bcbd8 3924 /* set io sizes to object size */
593a9e7b
AE
3925 segment_size = rbd_obj_bytes(&rbd_dev->header);
3926 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 3927 q->limits.max_sectors = queue_max_hw_sectors(q);
21acdf45 3928 blk_queue_max_segments(q, USHRT_MAX);
24f1df60 3929 blk_queue_max_segment_size(q, UINT_MAX);
593a9e7b
AE
3930 blk_queue_io_min(q, segment_size);
3931 blk_queue_io_opt(q, segment_size);
029bcbd8 3932
90e98c52
GZ
3933 /* enable the discard support */
3934 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
3935 q->limits.discard_granularity = segment_size;
2bb4cd5c 3936 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
6ac56951 3937 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
90e98c52 3938
bae818ee 3939 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 3940 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 3941
5769ed0c
ID
3942 /*
3943 * disk_release() expects a queue ref from add_disk() and will
3944 * put it. Hold an extra ref until add_disk() is called.
3945 */
3946 WARN_ON(!blk_get_queue(q));
602adf40 3947 disk->queue = q;
602adf40
YS
3948 q->queuedata = rbd_dev;
3949
3950 rbd_dev->disk = disk;
602adf40 3951
602adf40 3952 return 0;
7ad18afa
CH
3953out_tag_set:
3954 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
3955out_disk:
3956 put_disk(disk);
7ad18afa 3957 return err;
602adf40
YS
3958}
3959
dfc5606d
YS
3960/*
3961 sysfs
3962*/
3963
593a9e7b
AE
3964static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3965{
3966 return container_of(dev, struct rbd_device, dev);
3967}
3968
dfc5606d
YS
3969static ssize_t rbd_size_show(struct device *dev,
3970 struct device_attribute *attr, char *buf)
3971{
593a9e7b 3972 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3973
fc71d833
AE
3974 return sprintf(buf, "%llu\n",
3975 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3976}
3977
34b13184
AE
3978/*
3979 * Note this shows the features for whatever's mapped, which is not
3980 * necessarily the base image.
3981 */
3982static ssize_t rbd_features_show(struct device *dev,
3983 struct device_attribute *attr, char *buf)
3984{
3985 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3986
3987 return sprintf(buf, "0x%016llx\n",
fc71d833 3988 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3989}
3990
dfc5606d
YS
3991static ssize_t rbd_major_show(struct device *dev,
3992 struct device_attribute *attr, char *buf)
3993{
593a9e7b 3994 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3995
fc71d833
AE
3996 if (rbd_dev->major)
3997 return sprintf(buf, "%d\n", rbd_dev->major);
3998
3999 return sprintf(buf, "(none)\n");
dd82fff1
ID
4000}
4001
4002static ssize_t rbd_minor_show(struct device *dev,
4003 struct device_attribute *attr, char *buf)
4004{
4005 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4006
dd82fff1 4007 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4008}
4009
005a07bf
ID
4010static ssize_t rbd_client_addr_show(struct device *dev,
4011 struct device_attribute *attr, char *buf)
4012{
4013 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4014 struct ceph_entity_addr *client_addr =
4015 ceph_client_addr(rbd_dev->rbd_client->client);
4016
4017 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4018 le32_to_cpu(client_addr->nonce));
4019}
4020
dfc5606d
YS
4021static ssize_t rbd_client_id_show(struct device *dev,
4022 struct device_attribute *attr, char *buf)
602adf40 4023{
593a9e7b 4024 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4025
1dbb4399 4026 return sprintf(buf, "client%lld\n",
033268a5 4027 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4028}
4029
267fb90b
MC
4030static ssize_t rbd_cluster_fsid_show(struct device *dev,
4031 struct device_attribute *attr, char *buf)
4032{
4033 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4034
4035 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4036}
4037
0d6d1e9c
MC
4038static ssize_t rbd_config_info_show(struct device *dev,
4039 struct device_attribute *attr, char *buf)
4040{
4041 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4042
4043 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4044}
4045
dfc5606d
YS
4046static ssize_t rbd_pool_show(struct device *dev,
4047 struct device_attribute *attr, char *buf)
602adf40 4048{
593a9e7b 4049 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4050
0d7dbfce 4051 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4052}
4053
9bb2f334
AE
4054static ssize_t rbd_pool_id_show(struct device *dev,
4055 struct device_attribute *attr, char *buf)
4056{
4057 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4058
0d7dbfce 4059 return sprintf(buf, "%llu\n",
fc71d833 4060 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4061}
4062
dfc5606d
YS
4063static ssize_t rbd_name_show(struct device *dev,
4064 struct device_attribute *attr, char *buf)
4065{
593a9e7b 4066 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4067
a92ffdf8
AE
4068 if (rbd_dev->spec->image_name)
4069 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4070
4071 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4072}
4073
589d30e0
AE
4074static ssize_t rbd_image_id_show(struct device *dev,
4075 struct device_attribute *attr, char *buf)
4076{
4077 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4078
0d7dbfce 4079 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4080}
4081
34b13184
AE
4082/*
4083 * Shows the name of the currently-mapped snapshot (or
4084 * RBD_SNAP_HEAD_NAME for the base image).
4085 */
dfc5606d
YS
4086static ssize_t rbd_snap_show(struct device *dev,
4087 struct device_attribute *attr,
4088 char *buf)
4089{
593a9e7b 4090 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4091
0d7dbfce 4092 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4093}
4094
92a58671
MC
4095static ssize_t rbd_snap_id_show(struct device *dev,
4096 struct device_attribute *attr, char *buf)
4097{
4098 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4099
4100 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4101}
4102
86b00e0d 4103/*
ff96128f
ID
4104 * For a v2 image, shows the chain of parent images, separated by empty
4105 * lines. For v1 images or if there is no parent, shows "(no parent
4106 * image)".
86b00e0d
AE
4107 */
4108static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4109 struct device_attribute *attr,
4110 char *buf)
86b00e0d
AE
4111{
4112 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4113 ssize_t count = 0;
86b00e0d 4114
ff96128f 4115 if (!rbd_dev->parent)
86b00e0d
AE
4116 return sprintf(buf, "(no parent image)\n");
4117
ff96128f
ID
4118 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4119 struct rbd_spec *spec = rbd_dev->parent_spec;
4120
4121 count += sprintf(&buf[count], "%s"
4122 "pool_id %llu\npool_name %s\n"
4123 "image_id %s\nimage_name %s\n"
4124 "snap_id %llu\nsnap_name %s\n"
4125 "overlap %llu\n",
4126 !count ? "" : "\n", /* first? */
4127 spec->pool_id, spec->pool_name,
4128 spec->image_id, spec->image_name ?: "(unknown)",
4129 spec->snap_id, spec->snap_name,
4130 rbd_dev->parent_overlap);
4131 }
4132
4133 return count;
86b00e0d
AE
4134}
4135
dfc5606d
YS
4136static ssize_t rbd_image_refresh(struct device *dev,
4137 struct device_attribute *attr,
4138 const char *buf,
4139 size_t size)
4140{
593a9e7b 4141 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4142 int ret;
602adf40 4143
cc4a38bd 4144 ret = rbd_dev_refresh(rbd_dev);
e627db08 4145 if (ret)
52bb1f9b 4146 return ret;
b813623a 4147
52bb1f9b 4148 return size;
dfc5606d 4149}
602adf40 4150
dfc5606d 4151static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4152static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4153static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4154static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4155static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4156static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4157static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4158static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4159static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4160static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4161static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4162static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4163static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4164static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4165static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4166static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4167
4168static struct attribute *rbd_attrs[] = {
4169 &dev_attr_size.attr,
34b13184 4170 &dev_attr_features.attr,
dfc5606d 4171 &dev_attr_major.attr,
dd82fff1 4172 &dev_attr_minor.attr,
005a07bf 4173 &dev_attr_client_addr.attr,
dfc5606d 4174 &dev_attr_client_id.attr,
267fb90b 4175 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4176 &dev_attr_config_info.attr,
dfc5606d 4177 &dev_attr_pool.attr,
9bb2f334 4178 &dev_attr_pool_id.attr,
dfc5606d 4179 &dev_attr_name.attr,
589d30e0 4180 &dev_attr_image_id.attr,
dfc5606d 4181 &dev_attr_current_snap.attr,
92a58671 4182 &dev_attr_snap_id.attr,
86b00e0d 4183 &dev_attr_parent.attr,
dfc5606d 4184 &dev_attr_refresh.attr,
dfc5606d
YS
4185 NULL
4186};
4187
4188static struct attribute_group rbd_attr_group = {
4189 .attrs = rbd_attrs,
4190};
4191
4192static const struct attribute_group *rbd_attr_groups[] = {
4193 &rbd_attr_group,
4194 NULL
4195};
4196
6cac4695 4197static void rbd_dev_release(struct device *dev);
dfc5606d 4198
b9942bc9 4199static const struct device_type rbd_device_type = {
dfc5606d
YS
4200 .name = "rbd",
4201 .groups = rbd_attr_groups,
6cac4695 4202 .release = rbd_dev_release,
dfc5606d
YS
4203};
4204
8b8fb99c
AE
4205static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4206{
4207 kref_get(&spec->kref);
4208
4209 return spec;
4210}
4211
4212static void rbd_spec_free(struct kref *kref);
4213static void rbd_spec_put(struct rbd_spec *spec)
4214{
4215 if (spec)
4216 kref_put(&spec->kref, rbd_spec_free);
4217}
4218
4219static struct rbd_spec *rbd_spec_alloc(void)
4220{
4221 struct rbd_spec *spec;
4222
4223 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4224 if (!spec)
4225 return NULL;
04077599
ID
4226
4227 spec->pool_id = CEPH_NOPOOL;
4228 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4229 kref_init(&spec->kref);
4230
8b8fb99c
AE
4231 return spec;
4232}
4233
4234static void rbd_spec_free(struct kref *kref)
4235{
4236 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4237
4238 kfree(spec->pool_name);
4239 kfree(spec->image_id);
4240 kfree(spec->image_name);
4241 kfree(spec->snap_name);
4242 kfree(spec);
4243}
4244
1643dfa4 4245static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4246{
99d16943 4247 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4248 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4249
c41d13a3 4250 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4251 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4252 kfree(rbd_dev->config_info);
c41d13a3 4253
dd5ac32d
ID
4254 rbd_put_client(rbd_dev->rbd_client);
4255 rbd_spec_put(rbd_dev->spec);
4256 kfree(rbd_dev->opts);
4257 kfree(rbd_dev);
1643dfa4
ID
4258}
4259
4260static void rbd_dev_release(struct device *dev)
4261{
4262 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4263 bool need_put = !!rbd_dev->opts;
4264
4265 if (need_put) {
4266 destroy_workqueue(rbd_dev->task_wq);
4267 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4268 }
4269
4270 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4271
4272 /*
4273 * This is racy, but way better than putting module outside of
4274 * the release callback. The race window is pretty small, so
4275 * doing something similar to dm (dm-builtin.c) is overkill.
4276 */
4277 if (need_put)
4278 module_put(THIS_MODULE);
4279}
4280
1643dfa4
ID
4281static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4282 struct rbd_spec *spec)
c53d5893
AE
4283{
4284 struct rbd_device *rbd_dev;
4285
1643dfa4 4286 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4287 if (!rbd_dev)
4288 return NULL;
4289
4290 spin_lock_init(&rbd_dev->lock);
4291 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4292 init_rwsem(&rbd_dev->header_rwsem);
4293
7e97332e 4294 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4295 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4296 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4297
99d16943
ID
4298 mutex_init(&rbd_dev->watch_mutex);
4299 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4300 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4301
ed95b21a
ID
4302 init_rwsem(&rbd_dev->lock_rwsem);
4303 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4304 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4305 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4306 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4307 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4308 init_waitqueue_head(&rbd_dev->lock_waitq);
4309
dd5ac32d
ID
4310 rbd_dev->dev.bus = &rbd_bus_type;
4311 rbd_dev->dev.type = &rbd_device_type;
4312 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4313 device_initialize(&rbd_dev->dev);
4314
c53d5893 4315 rbd_dev->rbd_client = rbdc;
d147543d 4316 rbd_dev->spec = spec;
0903e875 4317
1643dfa4
ID
4318 return rbd_dev;
4319}
4320
4321/*
4322 * Create a mapping rbd_dev.
4323 */
4324static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4325 struct rbd_spec *spec,
4326 struct rbd_options *opts)
4327{
4328 struct rbd_device *rbd_dev;
4329
4330 rbd_dev = __rbd_dev_create(rbdc, spec);
4331 if (!rbd_dev)
4332 return NULL;
4333
4334 rbd_dev->opts = opts;
4335
4336 /* get an id and fill in device name */
4337 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4338 minor_to_rbd_dev_id(1 << MINORBITS),
4339 GFP_KERNEL);
4340 if (rbd_dev->dev_id < 0)
4341 goto fail_rbd_dev;
4342
4343 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4344 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4345 rbd_dev->name);
4346 if (!rbd_dev->task_wq)
4347 goto fail_dev_id;
dd5ac32d 4348
1643dfa4
ID
4349 /* we have a ref from do_rbd_add() */
4350 __module_get(THIS_MODULE);
dd5ac32d 4351
1643dfa4 4352 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4353 return rbd_dev;
1643dfa4
ID
4354
4355fail_dev_id:
4356 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4357fail_rbd_dev:
4358 rbd_dev_free(rbd_dev);
4359 return NULL;
c53d5893
AE
4360}
4361
4362static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4363{
dd5ac32d
ID
4364 if (rbd_dev)
4365 put_device(&rbd_dev->dev);
c53d5893
AE
4366}
4367
9d475de5
AE
4368/*
4369 * Get the size and object order for an image snapshot, or if
4370 * snap_id is CEPH_NOSNAP, gets this information for the base
4371 * image.
4372 */
4373static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4374 u8 *order, u64 *snap_size)
4375{
4376 __le64 snapid = cpu_to_le64(snap_id);
4377 int ret;
4378 struct {
4379 u8 order;
4380 __le64 size;
4381 } __attribute__ ((packed)) size_buf = { 0 };
4382
ecd4a68a
ID
4383 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4384 &rbd_dev->header_oloc, "get_size",
4385 &snapid, sizeof(snapid),
4386 &size_buf, sizeof(size_buf));
36be9a76 4387 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4388 if (ret < 0)
4389 return ret;
57385b51
AE
4390 if (ret < sizeof (size_buf))
4391 return -ERANGE;
9d475de5 4392
c3545579 4393 if (order) {
c86f86e9 4394 *order = size_buf.order;
c3545579
JD
4395 dout(" order %u", (unsigned int)*order);
4396 }
9d475de5
AE
4397 *snap_size = le64_to_cpu(size_buf.size);
4398
c3545579
JD
4399 dout(" snap_id 0x%016llx snap_size = %llu\n",
4400 (unsigned long long)snap_id,
57385b51 4401 (unsigned long long)*snap_size);
9d475de5
AE
4402
4403 return 0;
4404}
4405
4406static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4407{
4408 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4409 &rbd_dev->header.obj_order,
4410 &rbd_dev->header.image_size);
4411}
4412
1e130199
AE
4413static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4414{
4415 void *reply_buf;
4416 int ret;
4417 void *p;
4418
4419 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4420 if (!reply_buf)
4421 return -ENOMEM;
4422
ecd4a68a
ID
4423 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4424 &rbd_dev->header_oloc, "get_object_prefix",
4425 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4426 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4427 if (ret < 0)
4428 goto out;
4429
4430 p = reply_buf;
4431 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4432 p + ret, NULL, GFP_NOIO);
4433 ret = 0;
1e130199
AE
4434
4435 if (IS_ERR(rbd_dev->header.object_prefix)) {
4436 ret = PTR_ERR(rbd_dev->header.object_prefix);
4437 rbd_dev->header.object_prefix = NULL;
4438 } else {
4439 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4440 }
1e130199
AE
4441out:
4442 kfree(reply_buf);
4443
4444 return ret;
4445}
4446
b1b5402a
AE
4447static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4448 u64 *snap_features)
4449{
4450 __le64 snapid = cpu_to_le64(snap_id);
4451 struct {
4452 __le64 features;
4453 __le64 incompat;
4157976b 4454 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4455 u64 unsup;
b1b5402a
AE
4456 int ret;
4457
ecd4a68a
ID
4458 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4459 &rbd_dev->header_oloc, "get_features",
4460 &snapid, sizeof(snapid),
4461 &features_buf, sizeof(features_buf));
36be9a76 4462 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4463 if (ret < 0)
4464 return ret;
57385b51
AE
4465 if (ret < sizeof (features_buf))
4466 return -ERANGE;
d889140c 4467
d3767f0f
ID
4468 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4469 if (unsup) {
4470 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4471 unsup);
b8f5c6ed 4472 return -ENXIO;
d3767f0f 4473 }
d889140c 4474
b1b5402a
AE
4475 *snap_features = le64_to_cpu(features_buf.features);
4476
4477 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4478 (unsigned long long)snap_id,
4479 (unsigned long long)*snap_features,
4480 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4481
4482 return 0;
4483}
4484
4485static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4486{
4487 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4488 &rbd_dev->header.features);
4489}
4490
86b00e0d
AE
4491static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4492{
4493 struct rbd_spec *parent_spec;
4494 size_t size;
4495 void *reply_buf = NULL;
4496 __le64 snapid;
4497 void *p;
4498 void *end;
642a2537 4499 u64 pool_id;
86b00e0d 4500 char *image_id;
3b5cf2a2 4501 u64 snap_id;
86b00e0d 4502 u64 overlap;
86b00e0d
AE
4503 int ret;
4504
4505 parent_spec = rbd_spec_alloc();
4506 if (!parent_spec)
4507 return -ENOMEM;
4508
4509 size = sizeof (__le64) + /* pool_id */
4510 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4511 sizeof (__le64) + /* snap_id */
4512 sizeof (__le64); /* overlap */
4513 reply_buf = kmalloc(size, GFP_KERNEL);
4514 if (!reply_buf) {
4515 ret = -ENOMEM;
4516 goto out_err;
4517 }
4518
4d9b67cd 4519 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
4520 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4521 &rbd_dev->header_oloc, "get_parent",
4522 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4523 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
4524 if (ret < 0)
4525 goto out_err;
4526
86b00e0d 4527 p = reply_buf;
57385b51
AE
4528 end = reply_buf + ret;
4529 ret = -ERANGE;
642a2537 4530 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
4531 if (pool_id == CEPH_NOPOOL) {
4532 /*
4533 * Either the parent never existed, or we have
4534 * record of it but the image got flattened so it no
4535 * longer has a parent. When the parent of a
4536 * layered image disappears we immediately set the
4537 * overlap to 0. The effect of this is that all new
4538 * requests will be treated as if the image had no
4539 * parent.
4540 */
4541 if (rbd_dev->parent_overlap) {
4542 rbd_dev->parent_overlap = 0;
392a9dad
AE
4543 rbd_dev_parent_put(rbd_dev);
4544 pr_info("%s: clone image has been flattened\n",
4545 rbd_dev->disk->disk_name);
4546 }
4547
86b00e0d 4548 goto out; /* No parent? No problem. */
392a9dad 4549 }
86b00e0d 4550
0903e875
AE
4551 /* The ceph file layout needs to fit pool id in 32 bits */
4552
4553 ret = -EIO;
642a2537 4554 if (pool_id > (u64)U32_MAX) {
9584d508 4555 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 4556 (unsigned long long)pool_id, U32_MAX);
57385b51 4557 goto out_err;
c0cd10db 4558 }
0903e875 4559
979ed480 4560 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
4561 if (IS_ERR(image_id)) {
4562 ret = PTR_ERR(image_id);
4563 goto out_err;
4564 }
3b5cf2a2 4565 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
4566 ceph_decode_64_safe(&p, end, overlap, out_err);
4567
3b5cf2a2
AE
4568 /*
4569 * The parent won't change (except when the clone is
4570 * flattened, already handled that). So we only need to
4571 * record the parent spec we have not already done so.
4572 */
4573 if (!rbd_dev->parent_spec) {
4574 parent_spec->pool_id = pool_id;
4575 parent_spec->image_id = image_id;
4576 parent_spec->snap_id = snap_id;
70cf49cf
AE
4577 rbd_dev->parent_spec = parent_spec;
4578 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
4579 } else {
4580 kfree(image_id);
3b5cf2a2
AE
4581 }
4582
4583 /*
cf32bd9c
ID
4584 * We always update the parent overlap. If it's zero we issue
4585 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 4586 */
3b5cf2a2 4587 if (!overlap) {
3b5cf2a2 4588 if (parent_spec) {
cf32bd9c
ID
4589 /* refresh, careful to warn just once */
4590 if (rbd_dev->parent_overlap)
4591 rbd_warn(rbd_dev,
4592 "clone now standalone (overlap became 0)");
3b5cf2a2 4593 } else {
cf32bd9c
ID
4594 /* initial probe */
4595 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 4596 }
70cf49cf 4597 }
cf32bd9c
ID
4598 rbd_dev->parent_overlap = overlap;
4599
86b00e0d
AE
4600out:
4601 ret = 0;
4602out_err:
4603 kfree(reply_buf);
4604 rbd_spec_put(parent_spec);
4605
4606 return ret;
4607}
4608
cc070d59
AE
4609static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4610{
4611 struct {
4612 __le64 stripe_unit;
4613 __le64 stripe_count;
4614 } __attribute__ ((packed)) striping_info_buf = { 0 };
4615 size_t size = sizeof (striping_info_buf);
4616 void *p;
cc070d59
AE
4617 int ret;
4618
ecd4a68a
ID
4619 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4620 &rbd_dev->header_oloc, "get_stripe_unit_count",
4621 NULL, 0, &striping_info_buf, size);
cc070d59
AE
4622 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4623 if (ret < 0)
4624 return ret;
4625 if (ret < size)
4626 return -ERANGE;
4627
cc070d59 4628 p = &striping_info_buf;
b1331852
ID
4629 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4630 rbd_dev->header.stripe_count = ceph_decode_64(&p);
cc070d59
AE
4631 return 0;
4632}
4633
7e97332e
ID
4634static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4635{
4636 __le64 data_pool_id;
4637 int ret;
4638
4639 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4640 &rbd_dev->header_oloc, "get_data_pool",
4641 NULL, 0, &data_pool_id, sizeof(data_pool_id));
4642 if (ret < 0)
4643 return ret;
4644 if (ret < sizeof(data_pool_id))
4645 return -EBADMSG;
4646
4647 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4648 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4649 return 0;
4650}
4651
9e15b77d
AE
4652static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4653{
ecd4a68a 4654 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
4655 size_t image_id_size;
4656 char *image_id;
4657 void *p;
4658 void *end;
4659 size_t size;
4660 void *reply_buf = NULL;
4661 size_t len = 0;
4662 char *image_name = NULL;
4663 int ret;
4664
4665 rbd_assert(!rbd_dev->spec->image_name);
4666
69e7a02f
AE
4667 len = strlen(rbd_dev->spec->image_id);
4668 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4669 image_id = kmalloc(image_id_size, GFP_KERNEL);
4670 if (!image_id)
4671 return NULL;
4672
4673 p = image_id;
4157976b 4674 end = image_id + image_id_size;
57385b51 4675 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4676
4677 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4678 reply_buf = kmalloc(size, GFP_KERNEL);
4679 if (!reply_buf)
4680 goto out;
4681
ecd4a68a
ID
4682 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4683 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4684 "dir_get_name", image_id, image_id_size,
4685 reply_buf, size);
9e15b77d
AE
4686 if (ret < 0)
4687 goto out;
4688 p = reply_buf;
f40eb349
AE
4689 end = reply_buf + ret;
4690
9e15b77d
AE
4691 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4692 if (IS_ERR(image_name))
4693 image_name = NULL;
4694 else
4695 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4696out:
4697 kfree(reply_buf);
4698 kfree(image_id);
4699
4700 return image_name;
4701}
4702
2ad3d716
AE
4703static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4704{
4705 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4706 const char *snap_name;
4707 u32 which = 0;
4708
4709 /* Skip over names until we find the one we are looking for */
4710
4711 snap_name = rbd_dev->header.snap_names;
4712 while (which < snapc->num_snaps) {
4713 if (!strcmp(name, snap_name))
4714 return snapc->snaps[which];
4715 snap_name += strlen(snap_name) + 1;
4716 which++;
4717 }
4718 return CEPH_NOSNAP;
4719}
4720
4721static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4722{
4723 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4724 u32 which;
4725 bool found = false;
4726 u64 snap_id;
4727
4728 for (which = 0; !found && which < snapc->num_snaps; which++) {
4729 const char *snap_name;
4730
4731 snap_id = snapc->snaps[which];
4732 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4733 if (IS_ERR(snap_name)) {
4734 /* ignore no-longer existing snapshots */
4735 if (PTR_ERR(snap_name) == -ENOENT)
4736 continue;
4737 else
4738 break;
4739 }
2ad3d716
AE
4740 found = !strcmp(name, snap_name);
4741 kfree(snap_name);
4742 }
4743 return found ? snap_id : CEPH_NOSNAP;
4744}
4745
4746/*
4747 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4748 * no snapshot by that name is found, or if an error occurs.
4749 */
4750static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4751{
4752 if (rbd_dev->image_format == 1)
4753 return rbd_v1_snap_id_by_name(rbd_dev, name);
4754
4755 return rbd_v2_snap_id_by_name(rbd_dev, name);
4756}
4757
9e15b77d 4758/*
04077599
ID
4759 * An image being mapped will have everything but the snap id.
4760 */
4761static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4762{
4763 struct rbd_spec *spec = rbd_dev->spec;
4764
4765 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4766 rbd_assert(spec->image_id && spec->image_name);
4767 rbd_assert(spec->snap_name);
4768
4769 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4770 u64 snap_id;
4771
4772 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4773 if (snap_id == CEPH_NOSNAP)
4774 return -ENOENT;
4775
4776 spec->snap_id = snap_id;
4777 } else {
4778 spec->snap_id = CEPH_NOSNAP;
4779 }
4780
4781 return 0;
4782}
4783
4784/*
4785 * A parent image will have all ids but none of the names.
e1d4213f 4786 *
04077599
ID
4787 * All names in an rbd spec are dynamically allocated. It's OK if we
4788 * can't figure out the name for an image id.
9e15b77d 4789 */
04077599 4790static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 4791{
2e9f7f1c
AE
4792 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4793 struct rbd_spec *spec = rbd_dev->spec;
4794 const char *pool_name;
4795 const char *image_name;
4796 const char *snap_name;
9e15b77d
AE
4797 int ret;
4798
04077599
ID
4799 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4800 rbd_assert(spec->image_id);
4801 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 4802
2e9f7f1c 4803 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4804
2e9f7f1c
AE
4805 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4806 if (!pool_name) {
4807 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4808 return -EIO;
4809 }
2e9f7f1c
AE
4810 pool_name = kstrdup(pool_name, GFP_KERNEL);
4811 if (!pool_name)
9e15b77d
AE
4812 return -ENOMEM;
4813
4814 /* Fetch the image name; tolerate failure here */
4815
2e9f7f1c
AE
4816 image_name = rbd_dev_image_name(rbd_dev);
4817 if (!image_name)
06ecc6cb 4818 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4819
04077599 4820 /* Fetch the snapshot name */
9e15b77d 4821
2e9f7f1c 4822 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4823 if (IS_ERR(snap_name)) {
4824 ret = PTR_ERR(snap_name);
9e15b77d 4825 goto out_err;
2e9f7f1c
AE
4826 }
4827
4828 spec->pool_name = pool_name;
4829 spec->image_name = image_name;
4830 spec->snap_name = snap_name;
9e15b77d
AE
4831
4832 return 0;
04077599 4833
9e15b77d 4834out_err:
2e9f7f1c
AE
4835 kfree(image_name);
4836 kfree(pool_name);
9e15b77d
AE
4837 return ret;
4838}
4839
cc4a38bd 4840static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4841{
4842 size_t size;
4843 int ret;
4844 void *reply_buf;
4845 void *p;
4846 void *end;
4847 u64 seq;
4848 u32 snap_count;
4849 struct ceph_snap_context *snapc;
4850 u32 i;
4851
4852 /*
4853 * We'll need room for the seq value (maximum snapshot id),
4854 * snapshot count, and array of that many snapshot ids.
4855 * For now we have a fixed upper limit on the number we're
4856 * prepared to receive.
4857 */
4858 size = sizeof (__le64) + sizeof (__le32) +
4859 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4860 reply_buf = kzalloc(size, GFP_KERNEL);
4861 if (!reply_buf)
4862 return -ENOMEM;
4863
ecd4a68a
ID
4864 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4865 &rbd_dev->header_oloc, "get_snapcontext",
4866 NULL, 0, reply_buf, size);
36be9a76 4867 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4868 if (ret < 0)
4869 goto out;
4870
35d489f9 4871 p = reply_buf;
57385b51
AE
4872 end = reply_buf + ret;
4873 ret = -ERANGE;
35d489f9
AE
4874 ceph_decode_64_safe(&p, end, seq, out);
4875 ceph_decode_32_safe(&p, end, snap_count, out);
4876
4877 /*
4878 * Make sure the reported number of snapshot ids wouldn't go
4879 * beyond the end of our buffer. But before checking that,
4880 * make sure the computed size of the snapshot context we
4881 * allocate is representable in a size_t.
4882 */
4883 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4884 / sizeof (u64)) {
4885 ret = -EINVAL;
4886 goto out;
4887 }
4888 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4889 goto out;
468521c1 4890 ret = 0;
35d489f9 4891
812164f8 4892 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4893 if (!snapc) {
4894 ret = -ENOMEM;
4895 goto out;
4896 }
35d489f9 4897 snapc->seq = seq;
35d489f9
AE
4898 for (i = 0; i < snap_count; i++)
4899 snapc->snaps[i] = ceph_decode_64(&p);
4900
49ece554 4901 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4902 rbd_dev->header.snapc = snapc;
4903
4904 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4905 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4906out:
4907 kfree(reply_buf);
4908
57385b51 4909 return ret;
35d489f9
AE
4910}
4911
54cac61f
AE
4912static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4913 u64 snap_id)
b8b1e2db
AE
4914{
4915 size_t size;
4916 void *reply_buf;
54cac61f 4917 __le64 snapid;
b8b1e2db
AE
4918 int ret;
4919 void *p;
4920 void *end;
b8b1e2db
AE
4921 char *snap_name;
4922
4923 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4924 reply_buf = kmalloc(size, GFP_KERNEL);
4925 if (!reply_buf)
4926 return ERR_PTR(-ENOMEM);
4927
54cac61f 4928 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
4929 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4930 &rbd_dev->header_oloc, "get_snapshot_name",
4931 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4932 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4933 if (ret < 0) {
4934 snap_name = ERR_PTR(ret);
b8b1e2db 4935 goto out;
f40eb349 4936 }
b8b1e2db
AE
4937
4938 p = reply_buf;
f40eb349 4939 end = reply_buf + ret;
e5c35534 4940 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4941 if (IS_ERR(snap_name))
b8b1e2db 4942 goto out;
b8b1e2db 4943
f40eb349 4944 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4945 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4946out:
4947 kfree(reply_buf);
4948
f40eb349 4949 return snap_name;
b8b1e2db
AE
4950}
4951
2df3fac7 4952static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4953{
2df3fac7 4954 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4955 int ret;
117973fb 4956
1617e40c
JD
4957 ret = rbd_dev_v2_image_size(rbd_dev);
4958 if (ret)
cfbf6377 4959 return ret;
1617e40c 4960
2df3fac7
AE
4961 if (first_time) {
4962 ret = rbd_dev_v2_header_onetime(rbd_dev);
4963 if (ret)
cfbf6377 4964 return ret;
2df3fac7
AE
4965 }
4966
cc4a38bd 4967 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
4968 if (ret && first_time) {
4969 kfree(rbd_dev->header.object_prefix);
4970 rbd_dev->header.object_prefix = NULL;
4971 }
117973fb
AE
4972
4973 return ret;
4974}
4975
a720ae09
ID
4976static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4977{
4978 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4979
4980 if (rbd_dev->image_format == 1)
4981 return rbd_dev_v1_header_info(rbd_dev);
4982
4983 return rbd_dev_v2_header_info(rbd_dev);
4984}
4985
e28fff26
AE
4986/*
4987 * Skips over white space at *buf, and updates *buf to point to the
4988 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4989 * the token (string of non-white space characters) found. Note
4990 * that *buf must be terminated with '\0'.
e28fff26
AE
4991 */
4992static inline size_t next_token(const char **buf)
4993{
4994 /*
4995 * These are the characters that produce nonzero for
4996 * isspace() in the "C" and "POSIX" locales.
4997 */
4998 const char *spaces = " \f\n\r\t\v";
4999
5000 *buf += strspn(*buf, spaces); /* Find start of token */
5001
5002 return strcspn(*buf, spaces); /* Return token length */
5003}
5004
ea3352f4
AE
5005/*
5006 * Finds the next token in *buf, dynamically allocates a buffer big
5007 * enough to hold a copy of it, and copies the token into the new
5008 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5009 * that a duplicate buffer is created even for a zero-length token.
5010 *
5011 * Returns a pointer to the newly-allocated duplicate, or a null
5012 * pointer if memory for the duplicate was not available. If
5013 * the lenp argument is a non-null pointer, the length of the token
5014 * (not including the '\0') is returned in *lenp.
5015 *
5016 * If successful, the *buf pointer will be updated to point beyond
5017 * the end of the found token.
5018 *
5019 * Note: uses GFP_KERNEL for allocation.
5020 */
5021static inline char *dup_token(const char **buf, size_t *lenp)
5022{
5023 char *dup;
5024 size_t len;
5025
5026 len = next_token(buf);
4caf35f9 5027 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5028 if (!dup)
5029 return NULL;
ea3352f4
AE
5030 *(dup + len) = '\0';
5031 *buf += len;
5032
5033 if (lenp)
5034 *lenp = len;
5035
5036 return dup;
5037}
5038
a725f65e 5039/*
859c31df
AE
5040 * Parse the options provided for an "rbd add" (i.e., rbd image
5041 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5042 * and the data written is passed here via a NUL-terminated buffer.
5043 * Returns 0 if successful or an error code otherwise.
d22f76e7 5044 *
859c31df
AE
5045 * The information extracted from these options is recorded in
5046 * the other parameters which return dynamically-allocated
5047 * structures:
5048 * ceph_opts
5049 * The address of a pointer that will refer to a ceph options
5050 * structure. Caller must release the returned pointer using
5051 * ceph_destroy_options() when it is no longer needed.
5052 * rbd_opts
5053 * Address of an rbd options pointer. Fully initialized by
5054 * this function; caller must release with kfree().
5055 * spec
5056 * Address of an rbd image specification pointer. Fully
5057 * initialized by this function based on parsed options.
5058 * Caller must release with rbd_spec_put().
5059 *
5060 * The options passed take this form:
5061 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5062 * where:
5063 * <mon_addrs>
5064 * A comma-separated list of one or more monitor addresses.
5065 * A monitor address is an ip address, optionally followed
5066 * by a port number (separated by a colon).
5067 * I.e.: ip1[:port1][,ip2[:port2]...]
5068 * <options>
5069 * A comma-separated list of ceph and/or rbd options.
5070 * <pool_name>
5071 * The name of the rados pool containing the rbd image.
5072 * <image_name>
5073 * The name of the image in that pool to map.
5074 * <snap_id>
5075 * An optional snapshot id. If provided, the mapping will
5076 * present data from the image at the time that snapshot was
5077 * created. The image head is used if no snapshot id is
5078 * provided. Snapshot mappings are always read-only.
a725f65e 5079 */
859c31df 5080static int rbd_add_parse_args(const char *buf,
dc79b113 5081 struct ceph_options **ceph_opts,
859c31df
AE
5082 struct rbd_options **opts,
5083 struct rbd_spec **rbd_spec)
e28fff26 5084{
d22f76e7 5085 size_t len;
859c31df 5086 char *options;
0ddebc0c 5087 const char *mon_addrs;
ecb4dc22 5088 char *snap_name;
0ddebc0c 5089 size_t mon_addrs_size;
859c31df 5090 struct rbd_spec *spec = NULL;
4e9afeba 5091 struct rbd_options *rbd_opts = NULL;
859c31df 5092 struct ceph_options *copts;
dc79b113 5093 int ret;
e28fff26
AE
5094
5095 /* The first four tokens are required */
5096
7ef3214a 5097 len = next_token(&buf);
4fb5d671
AE
5098 if (!len) {
5099 rbd_warn(NULL, "no monitor address(es) provided");
5100 return -EINVAL;
5101 }
0ddebc0c 5102 mon_addrs = buf;
f28e565a 5103 mon_addrs_size = len + 1;
7ef3214a 5104 buf += len;
a725f65e 5105
dc79b113 5106 ret = -EINVAL;
f28e565a
AE
5107 options = dup_token(&buf, NULL);
5108 if (!options)
dc79b113 5109 return -ENOMEM;
4fb5d671
AE
5110 if (!*options) {
5111 rbd_warn(NULL, "no options provided");
5112 goto out_err;
5113 }
e28fff26 5114
859c31df
AE
5115 spec = rbd_spec_alloc();
5116 if (!spec)
f28e565a 5117 goto out_mem;
859c31df
AE
5118
5119 spec->pool_name = dup_token(&buf, NULL);
5120 if (!spec->pool_name)
5121 goto out_mem;
4fb5d671
AE
5122 if (!*spec->pool_name) {
5123 rbd_warn(NULL, "no pool name provided");
5124 goto out_err;
5125 }
e28fff26 5126
69e7a02f 5127 spec->image_name = dup_token(&buf, NULL);
859c31df 5128 if (!spec->image_name)
f28e565a 5129 goto out_mem;
4fb5d671
AE
5130 if (!*spec->image_name) {
5131 rbd_warn(NULL, "no image name provided");
5132 goto out_err;
5133 }
d4b125e9 5134
f28e565a
AE
5135 /*
5136 * Snapshot name is optional; default is to use "-"
5137 * (indicating the head/no snapshot).
5138 */
3feeb894 5139 len = next_token(&buf);
820a5f3e 5140 if (!len) {
3feeb894
AE
5141 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5142 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5143 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5144 ret = -ENAMETOOLONG;
f28e565a 5145 goto out_err;
849b4260 5146 }
ecb4dc22
AE
5147 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5148 if (!snap_name)
f28e565a 5149 goto out_mem;
ecb4dc22
AE
5150 *(snap_name + len) = '\0';
5151 spec->snap_name = snap_name;
e5c35534 5152
0ddebc0c 5153 /* Initialize all rbd options to the defaults */
e28fff26 5154
4e9afeba
AE
5155 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5156 if (!rbd_opts)
5157 goto out_mem;
5158
5159 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5160 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5161 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
e010dd0a 5162 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
d22f76e7 5163
859c31df 5164 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5165 mon_addrs + mon_addrs_size - 1,
4e9afeba 5166 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5167 if (IS_ERR(copts)) {
5168 ret = PTR_ERR(copts);
dc79b113
AE
5169 goto out_err;
5170 }
859c31df
AE
5171 kfree(options);
5172
5173 *ceph_opts = copts;
4e9afeba 5174 *opts = rbd_opts;
859c31df 5175 *rbd_spec = spec;
0ddebc0c 5176
dc79b113 5177 return 0;
f28e565a 5178out_mem:
dc79b113 5179 ret = -ENOMEM;
d22f76e7 5180out_err:
859c31df
AE
5181 kfree(rbd_opts);
5182 rbd_spec_put(spec);
f28e565a 5183 kfree(options);
d22f76e7 5184
dc79b113 5185 return ret;
a725f65e
AE
5186}
5187
30ba1f02
ID
5188/*
5189 * Return pool id (>= 0) or a negative error code.
5190 */
5191static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5192{
a319bf56 5193 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5194 u64 newest_epoch;
30ba1f02
ID
5195 int tries = 0;
5196 int ret;
5197
5198again:
5199 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5200 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5201 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5202 &newest_epoch);
30ba1f02
ID
5203 if (ret < 0)
5204 return ret;
5205
5206 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5207 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5208 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5209 newest_epoch,
5210 opts->mount_timeout);
30ba1f02
ID
5211 goto again;
5212 } else {
5213 /* the osdmap we have is new enough */
5214 return -ENOENT;
5215 }
5216 }
5217
5218 return ret;
5219}
5220
e010dd0a
ID
5221static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5222{
5223 down_write(&rbd_dev->lock_rwsem);
5224 if (__rbd_is_lock_owner(rbd_dev))
5225 rbd_unlock(rbd_dev);
5226 up_write(&rbd_dev->lock_rwsem);
5227}
5228
5229static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5230{
5231 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5232 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5233 return -EINVAL;
5234 }
5235
5236 /* FIXME: "rbd map --exclusive" should be in interruptible */
5237 down_read(&rbd_dev->lock_rwsem);
5238 rbd_wait_state_locked(rbd_dev);
5239 up_read(&rbd_dev->lock_rwsem);
5240 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5241 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5242 return -EROFS;
5243 }
5244
5245 return 0;
5246}
5247
589d30e0
AE
5248/*
5249 * An rbd format 2 image has a unique identifier, distinct from the
5250 * name given to it by the user. Internally, that identifier is
5251 * what's used to specify the names of objects related to the image.
5252 *
5253 * A special "rbd id" object is used to map an rbd image name to its
5254 * id. If that object doesn't exist, then there is no v2 rbd image
5255 * with the supplied name.
5256 *
5257 * This function will record the given rbd_dev's image_id field if
5258 * it can be determined, and in that case will return 0. If any
5259 * errors occur a negative errno will be returned and the rbd_dev's
5260 * image_id field will be unchanged (and should be NULL).
5261 */
5262static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5263{
5264 int ret;
5265 size_t size;
ecd4a68a 5266 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5267 void *response;
c0fba368 5268 char *image_id;
2f82ee54 5269
2c0d0a10
AE
5270 /*
5271 * When probing a parent image, the image id is already
5272 * known (and the image name likely is not). There's no
c0fba368
AE
5273 * need to fetch the image id again in this case. We
5274 * do still need to set the image format though.
2c0d0a10 5275 */
c0fba368
AE
5276 if (rbd_dev->spec->image_id) {
5277 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5278
2c0d0a10 5279 return 0;
c0fba368 5280 }
2c0d0a10 5281
589d30e0
AE
5282 /*
5283 * First, see if the format 2 image id file exists, and if
5284 * so, get the image's persistent id from it.
5285 */
ecd4a68a
ID
5286 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5287 rbd_dev->spec->image_name);
5288 if (ret)
5289 return ret;
5290
5291 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5292
5293 /* Response will be an encoded string, which includes a length */
5294
5295 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5296 response = kzalloc(size, GFP_NOIO);
5297 if (!response) {
5298 ret = -ENOMEM;
5299 goto out;
5300 }
5301
c0fba368
AE
5302 /* If it doesn't exist we'll assume it's a format 1 image */
5303
ecd4a68a
ID
5304 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5305 "get_id", NULL, 0,
5306 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5307 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5308 if (ret == -ENOENT) {
5309 image_id = kstrdup("", GFP_KERNEL);
5310 ret = image_id ? 0 : -ENOMEM;
5311 if (!ret)
5312 rbd_dev->image_format = 1;
7dd440c9 5313 } else if (ret >= 0) {
c0fba368
AE
5314 void *p = response;
5315
5316 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5317 NULL, GFP_NOIO);
461f758a 5318 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5319 if (!ret)
5320 rbd_dev->image_format = 2;
c0fba368
AE
5321 }
5322
5323 if (!ret) {
5324 rbd_dev->spec->image_id = image_id;
5325 dout("image_id is %s\n", image_id);
589d30e0
AE
5326 }
5327out:
5328 kfree(response);
ecd4a68a 5329 ceph_oid_destroy(&oid);
589d30e0
AE
5330 return ret;
5331}
5332
3abef3b3
AE
5333/*
5334 * Undo whatever state changes are made by v1 or v2 header info
5335 * call.
5336 */
6fd48b3b
AE
5337static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5338{
5339 struct rbd_image_header *header;
5340
e69b8d41 5341 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5342
5343 /* Free dynamic fields from the header, then zero it out */
5344
5345 header = &rbd_dev->header;
812164f8 5346 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5347 kfree(header->snap_sizes);
5348 kfree(header->snap_names);
5349 kfree(header->object_prefix);
5350 memset(header, 0, sizeof (*header));
5351}
5352
2df3fac7 5353static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5354{
5355 int ret;
a30b71b9 5356
1e130199 5357 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5358 if (ret)
b1b5402a
AE
5359 goto out_err;
5360
2df3fac7
AE
5361 /*
5362 * Get the and check features for the image. Currently the
5363 * features are assumed to never change.
5364 */
b1b5402a 5365 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5366 if (ret)
9d475de5 5367 goto out_err;
35d489f9 5368
cc070d59
AE
5369 /* If the image supports fancy striping, get its parameters */
5370
5371 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5372 ret = rbd_dev_v2_striping_info(rbd_dev);
5373 if (ret < 0)
5374 goto out_err;
5375 }
a30b71b9 5376
7e97332e
ID
5377 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5378 ret = rbd_dev_v2_data_pool(rbd_dev);
5379 if (ret)
5380 goto out_err;
5381 }
5382
263423f8 5383 rbd_init_layout(rbd_dev);
35152979 5384 return 0;
263423f8 5385
9d475de5 5386out_err:
642a2537 5387 rbd_dev->header.features = 0;
1e130199
AE
5388 kfree(rbd_dev->header.object_prefix);
5389 rbd_dev->header.object_prefix = NULL;
9d475de5 5390 return ret;
a30b71b9
AE
5391}
5392
6d69bb53
ID
5393/*
5394 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5395 * rbd_dev_image_probe() recursion depth, which means it's also the
5396 * length of the already discovered part of the parent chain.
5397 */
5398static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5399{
2f82ee54 5400 struct rbd_device *parent = NULL;
124afba2
AE
5401 int ret;
5402
5403 if (!rbd_dev->parent_spec)
5404 return 0;
124afba2 5405
6d69bb53
ID
5406 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5407 pr_info("parent chain is too long (%d)\n", depth);
5408 ret = -EINVAL;
5409 goto out_err;
5410 }
5411
1643dfa4 5412 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5413 if (!parent) {
5414 ret = -ENOMEM;
124afba2 5415 goto out_err;
1f2c6651
ID
5416 }
5417
5418 /*
5419 * Images related by parent/child relationships always share
5420 * rbd_client and spec/parent_spec, so bump their refcounts.
5421 */
5422 __rbd_get_client(rbd_dev->rbd_client);
5423 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5424
6d69bb53 5425 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5426 if (ret < 0)
5427 goto out_err;
1f2c6651 5428
124afba2 5429 rbd_dev->parent = parent;
a2acd00e 5430 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5431 return 0;
1f2c6651 5432
124afba2 5433out_err:
1f2c6651 5434 rbd_dev_unparent(rbd_dev);
1761b229 5435 rbd_dev_destroy(parent);
124afba2
AE
5436 return ret;
5437}
5438
5769ed0c
ID
5439static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5440{
5441 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5442 rbd_dev_mapping_clear(rbd_dev);
5443 rbd_free_disk(rbd_dev);
5444 if (!single_major)
5445 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5446}
5447
811c6688
ID
5448/*
5449 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5450 * upon return.
5451 */
200a6a8b 5452static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5453{
83a06263 5454 int ret;
d1cf5788 5455
9b60e70b 5456 /* Record our major and minor device numbers. */
83a06263 5457
9b60e70b
ID
5458 if (!single_major) {
5459 ret = register_blkdev(0, rbd_dev->name);
5460 if (ret < 0)
1643dfa4 5461 goto err_out_unlock;
9b60e70b
ID
5462
5463 rbd_dev->major = ret;
5464 rbd_dev->minor = 0;
5465 } else {
5466 rbd_dev->major = rbd_major;
5467 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5468 }
83a06263
AE
5469
5470 /* Set up the blkdev mapping. */
5471
5472 ret = rbd_init_disk(rbd_dev);
5473 if (ret)
5474 goto err_out_blkdev;
5475
f35a4dee 5476 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5477 if (ret)
5478 goto err_out_disk;
bc1ecc65 5479
f35a4dee 5480 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
9568c93e 5481 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
f35a4dee 5482
5769ed0c 5483 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 5484 if (ret)
f5ee37bd 5485 goto err_out_mapping;
83a06263 5486
129b79d4 5487 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5488 up_write(&rbd_dev->header_rwsem);
5769ed0c 5489 return 0;
2f82ee54 5490
f35a4dee
AE
5491err_out_mapping:
5492 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5493err_out_disk:
5494 rbd_free_disk(rbd_dev);
5495err_out_blkdev:
9b60e70b
ID
5496 if (!single_major)
5497 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5498err_out_unlock:
5499 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5500 return ret;
5501}
5502
332bb12d
AE
5503static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5504{
5505 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5506 int ret;
332bb12d
AE
5507
5508 /* Record the header object name for this rbd image. */
5509
5510 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 5511 if (rbd_dev->image_format == 1)
c41d13a3
ID
5512 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5513 spec->image_name, RBD_SUFFIX);
332bb12d 5514 else
c41d13a3
ID
5515 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5516 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5517
c41d13a3 5518 return ret;
332bb12d
AE
5519}
5520
200a6a8b
AE
5521static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5522{
6fd48b3b 5523 rbd_dev_unprobe(rbd_dev);
fd22aef8
ID
5524 if (rbd_dev->opts)
5525 rbd_unregister_watch(rbd_dev);
6fd48b3b
AE
5526 rbd_dev->image_format = 0;
5527 kfree(rbd_dev->spec->image_id);
5528 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
5529}
5530
a30b71b9
AE
5531/*
5532 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
5533 * device. If this image is the one being mapped (i.e., not a
5534 * parent), initiate a watch on its header object before using that
5535 * object to get detailed information about the rbd image.
a30b71b9 5536 */
6d69bb53 5537static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
5538{
5539 int ret;
5540
5541 /*
3abef3b3
AE
5542 * Get the id from the image id object. Unless there's an
5543 * error, rbd_dev->spec->image_id will be filled in with
5544 * a dynamically-allocated string, and rbd_dev->image_format
5545 * will be set to either 1 or 2.
a30b71b9
AE
5546 */
5547 ret = rbd_dev_image_id(rbd_dev);
5548 if (ret)
c0fba368 5549 return ret;
c0fba368 5550
332bb12d
AE
5551 ret = rbd_dev_header_name(rbd_dev);
5552 if (ret)
5553 goto err_out_format;
5554
6d69bb53 5555 if (!depth) {
99d16943 5556 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
5557 if (ret) {
5558 if (ret == -ENOENT)
5559 pr_info("image %s/%s does not exist\n",
5560 rbd_dev->spec->pool_name,
5561 rbd_dev->spec->image_name);
c41d13a3 5562 goto err_out_format;
1fe48023 5563 }
1f3ef788 5564 }
b644de2b 5565
a720ae09 5566 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 5567 if (ret)
b644de2b 5568 goto err_out_watch;
83a06263 5569
04077599
ID
5570 /*
5571 * If this image is the one being mapped, we have pool name and
5572 * id, image name and id, and snap name - need to fill snap id.
5573 * Otherwise this is a parent image, identified by pool, image
5574 * and snap ids - need to fill in names for those ids.
5575 */
6d69bb53 5576 if (!depth)
04077599
ID
5577 ret = rbd_spec_fill_snap_id(rbd_dev);
5578 else
5579 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
5580 if (ret) {
5581 if (ret == -ENOENT)
5582 pr_info("snap %s/%s@%s does not exist\n",
5583 rbd_dev->spec->pool_name,
5584 rbd_dev->spec->image_name,
5585 rbd_dev->spec->snap_name);
33dca39f 5586 goto err_out_probe;
1fe48023 5587 }
9bb81c9b 5588
e8f59b59
ID
5589 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5590 ret = rbd_dev_v2_parent_info(rbd_dev);
5591 if (ret)
5592 goto err_out_probe;
5593
5594 /*
5595 * Need to warn users if this image is the one being
5596 * mapped and has a parent.
5597 */
6d69bb53 5598 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
5599 rbd_warn(rbd_dev,
5600 "WARNING: kernel layering is EXPERIMENTAL!");
5601 }
5602
6d69bb53 5603 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
5604 if (ret)
5605 goto err_out_probe;
5606
5607 dout("discovered format %u image, header name is %s\n",
c41d13a3 5608 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 5609 return 0;
e8f59b59 5610
6fd48b3b
AE
5611err_out_probe:
5612 rbd_dev_unprobe(rbd_dev);
b644de2b 5613err_out_watch:
6d69bb53 5614 if (!depth)
99d16943 5615 rbd_unregister_watch(rbd_dev);
332bb12d
AE
5616err_out_format:
5617 rbd_dev->image_format = 0;
5655c4d9
AE
5618 kfree(rbd_dev->spec->image_id);
5619 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
5620 return ret;
5621}
5622
9b60e70b
ID
5623static ssize_t do_rbd_add(struct bus_type *bus,
5624 const char *buf,
5625 size_t count)
602adf40 5626{
cb8627c7 5627 struct rbd_device *rbd_dev = NULL;
dc79b113 5628 struct ceph_options *ceph_opts = NULL;
4e9afeba 5629 struct rbd_options *rbd_opts = NULL;
859c31df 5630 struct rbd_spec *spec = NULL;
9d3997fd 5631 struct rbd_client *rbdc;
b51c83c2 5632 int rc;
602adf40
YS
5633
5634 if (!try_module_get(THIS_MODULE))
5635 return -ENODEV;
5636
602adf40 5637 /* parse add command */
859c31df 5638 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5639 if (rc < 0)
dd5ac32d 5640 goto out;
78cea76e 5641
9d3997fd
AE
5642 rbdc = rbd_get_client(ceph_opts);
5643 if (IS_ERR(rbdc)) {
5644 rc = PTR_ERR(rbdc);
0ddebc0c 5645 goto err_out_args;
9d3997fd 5646 }
602adf40 5647
602adf40 5648 /* pick the pool */
30ba1f02 5649 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
5650 if (rc < 0) {
5651 if (rc == -ENOENT)
5652 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 5653 goto err_out_client;
1fe48023 5654 }
c0cd10db 5655 spec->pool_id = (u64)rc;
859c31df 5656
d147543d 5657 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
5658 if (!rbd_dev) {
5659 rc = -ENOMEM;
bd4ba655 5660 goto err_out_client;
b51c83c2 5661 }
c53d5893
AE
5662 rbdc = NULL; /* rbd_dev now owns this */
5663 spec = NULL; /* rbd_dev now owns this */
d147543d 5664 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 5665
0d6d1e9c
MC
5666 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5667 if (!rbd_dev->config_info) {
5668 rc = -ENOMEM;
5669 goto err_out_rbd_dev;
5670 }
5671
811c6688 5672 down_write(&rbd_dev->header_rwsem);
6d69bb53 5673 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
5674 if (rc < 0) {
5675 up_write(&rbd_dev->header_rwsem);
c53d5893 5676 goto err_out_rbd_dev;
0d6d1e9c 5677 }
05fd6f6f 5678
7ce4eef7 5679 /* If we are mapping a snapshot it must be marked read-only */
7ce4eef7 5680 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9568c93e 5681 rbd_dev->opts->read_only = true;
7ce4eef7 5682
b536f69a 5683 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 5684 if (rc)
8b679ec5 5685 goto err_out_image_probe;
3abef3b3 5686
e010dd0a
ID
5687 if (rbd_dev->opts->exclusive) {
5688 rc = rbd_add_acquire_lock(rbd_dev);
5689 if (rc)
5690 goto err_out_device_setup;
3abef3b3
AE
5691 }
5692
5769ed0c
ID
5693 /* Everything's ready. Announce the disk to the world. */
5694
5695 rc = device_add(&rbd_dev->dev);
5696 if (rc)
e010dd0a 5697 goto err_out_image_lock;
5769ed0c
ID
5698
5699 add_disk(rbd_dev->disk);
5700 /* see rbd_init_disk() */
5701 blk_put_queue(rbd_dev->disk->queue);
5702
5703 spin_lock(&rbd_dev_list_lock);
5704 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5705 spin_unlock(&rbd_dev_list_lock);
5706
5707 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5708 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5709 rbd_dev->header.features);
dd5ac32d
ID
5710 rc = count;
5711out:
5712 module_put(THIS_MODULE);
5713 return rc;
b536f69a 5714
e010dd0a
ID
5715err_out_image_lock:
5716 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
5717err_out_device_setup:
5718 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
5719err_out_image_probe:
5720 rbd_dev_image_release(rbd_dev);
c53d5893
AE
5721err_out_rbd_dev:
5722 rbd_dev_destroy(rbd_dev);
bd4ba655 5723err_out_client:
9d3997fd 5724 rbd_put_client(rbdc);
0ddebc0c 5725err_out_args:
859c31df 5726 rbd_spec_put(spec);
d147543d 5727 kfree(rbd_opts);
dd5ac32d 5728 goto out;
602adf40
YS
5729}
5730
9b60e70b
ID
5731static ssize_t rbd_add(struct bus_type *bus,
5732 const char *buf,
5733 size_t count)
5734{
5735 if (single_major)
5736 return -EINVAL;
5737
5738 return do_rbd_add(bus, buf, count);
5739}
5740
5741static ssize_t rbd_add_single_major(struct bus_type *bus,
5742 const char *buf,
5743 size_t count)
5744{
5745 return do_rbd_add(bus, buf, count);
5746}
5747
05a46afd
AE
5748static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5749{
ad945fc1 5750 while (rbd_dev->parent) {
05a46afd
AE
5751 struct rbd_device *first = rbd_dev;
5752 struct rbd_device *second = first->parent;
5753 struct rbd_device *third;
5754
5755 /*
5756 * Follow to the parent with no grandparent and
5757 * remove it.
5758 */
5759 while (second && (third = second->parent)) {
5760 first = second;
5761 second = third;
5762 }
ad945fc1 5763 rbd_assert(second);
8ad42cd0 5764 rbd_dev_image_release(second);
8b679ec5 5765 rbd_dev_destroy(second);
ad945fc1
AE
5766 first->parent = NULL;
5767 first->parent_overlap = 0;
5768
5769 rbd_assert(first->parent_spec);
05a46afd
AE
5770 rbd_spec_put(first->parent_spec);
5771 first->parent_spec = NULL;
05a46afd
AE
5772 }
5773}
5774
9b60e70b
ID
5775static ssize_t do_rbd_remove(struct bus_type *bus,
5776 const char *buf,
5777 size_t count)
602adf40
YS
5778{
5779 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5780 struct list_head *tmp;
5781 int dev_id;
0276dca6 5782 char opt_buf[6];
82a442d2 5783 bool already = false;
0276dca6 5784 bool force = false;
0d8189e1 5785 int ret;
602adf40 5786
0276dca6
MC
5787 dev_id = -1;
5788 opt_buf[0] = '\0';
5789 sscanf(buf, "%d %5s", &dev_id, opt_buf);
5790 if (dev_id < 0) {
5791 pr_err("dev_id out of range\n");
602adf40 5792 return -EINVAL;
0276dca6
MC
5793 }
5794 if (opt_buf[0] != '\0') {
5795 if (!strcmp(opt_buf, "force")) {
5796 force = true;
5797 } else {
5798 pr_err("bad remove option at '%s'\n", opt_buf);
5799 return -EINVAL;
5800 }
5801 }
602adf40 5802
751cc0e3
AE
5803 ret = -ENOENT;
5804 spin_lock(&rbd_dev_list_lock);
5805 list_for_each(tmp, &rbd_dev_list) {
5806 rbd_dev = list_entry(tmp, struct rbd_device, node);
5807 if (rbd_dev->dev_id == dev_id) {
5808 ret = 0;
5809 break;
5810 }
42382b70 5811 }
751cc0e3
AE
5812 if (!ret) {
5813 spin_lock_irq(&rbd_dev->lock);
0276dca6 5814 if (rbd_dev->open_count && !force)
751cc0e3
AE
5815 ret = -EBUSY;
5816 else
82a442d2
AE
5817 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5818 &rbd_dev->flags);
751cc0e3
AE
5819 spin_unlock_irq(&rbd_dev->lock);
5820 }
5821 spin_unlock(&rbd_dev_list_lock);
82a442d2 5822 if (ret < 0 || already)
1ba0f1e7 5823 return ret;
751cc0e3 5824
0276dca6
MC
5825 if (force) {
5826 /*
5827 * Prevent new IO from being queued and wait for existing
5828 * IO to complete/fail.
5829 */
5830 blk_mq_freeze_queue(rbd_dev->disk->queue);
5831 blk_set_queue_dying(rbd_dev->disk->queue);
5832 }
5833
5769ed0c
ID
5834 del_gendisk(rbd_dev->disk);
5835 spin_lock(&rbd_dev_list_lock);
5836 list_del_init(&rbd_dev->node);
5837 spin_unlock(&rbd_dev_list_lock);
5838 device_del(&rbd_dev->dev);
fca27065 5839
e010dd0a 5840 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 5841 rbd_dev_device_release(rbd_dev);
8ad42cd0 5842 rbd_dev_image_release(rbd_dev);
8b679ec5 5843 rbd_dev_destroy(rbd_dev);
1ba0f1e7 5844 return count;
602adf40
YS
5845}
5846
9b60e70b
ID
5847static ssize_t rbd_remove(struct bus_type *bus,
5848 const char *buf,
5849 size_t count)
5850{
5851 if (single_major)
5852 return -EINVAL;
5853
5854 return do_rbd_remove(bus, buf, count);
5855}
5856
5857static ssize_t rbd_remove_single_major(struct bus_type *bus,
5858 const char *buf,
5859 size_t count)
5860{
5861 return do_rbd_remove(bus, buf, count);
5862}
5863
602adf40
YS
5864/*
5865 * create control files in sysfs
dfc5606d 5866 * /sys/bus/rbd/...
602adf40
YS
5867 */
5868static int rbd_sysfs_init(void)
5869{
dfc5606d 5870 int ret;
602adf40 5871
fed4c143 5872 ret = device_register(&rbd_root_dev);
21079786 5873 if (ret < 0)
dfc5606d 5874 return ret;
602adf40 5875
fed4c143
AE
5876 ret = bus_register(&rbd_bus_type);
5877 if (ret < 0)
5878 device_unregister(&rbd_root_dev);
602adf40 5879
602adf40
YS
5880 return ret;
5881}
5882
5883static void rbd_sysfs_cleanup(void)
5884{
dfc5606d 5885 bus_unregister(&rbd_bus_type);
fed4c143 5886 device_unregister(&rbd_root_dev);
602adf40
YS
5887}
5888
1c2a9dfe
AE
5889static int rbd_slab_init(void)
5890{
5891 rbd_assert(!rbd_img_request_cache);
03d94406 5892 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
5893 if (!rbd_img_request_cache)
5894 return -ENOMEM;
5895
5896 rbd_assert(!rbd_obj_request_cache);
03d94406 5897 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
5898 if (!rbd_obj_request_cache)
5899 goto out_err;
5900
6c696d85 5901 return 0;
1c2a9dfe 5902
6c696d85 5903out_err:
868311b1
AE
5904 kmem_cache_destroy(rbd_img_request_cache);
5905 rbd_img_request_cache = NULL;
1c2a9dfe
AE
5906 return -ENOMEM;
5907}
5908
5909static void rbd_slab_exit(void)
5910{
868311b1
AE
5911 rbd_assert(rbd_obj_request_cache);
5912 kmem_cache_destroy(rbd_obj_request_cache);
5913 rbd_obj_request_cache = NULL;
5914
1c2a9dfe
AE
5915 rbd_assert(rbd_img_request_cache);
5916 kmem_cache_destroy(rbd_img_request_cache);
5917 rbd_img_request_cache = NULL;
5918}
5919
cc344fa1 5920static int __init rbd_init(void)
602adf40
YS
5921{
5922 int rc;
5923
1e32d34c
AE
5924 if (!libceph_compatible(NULL)) {
5925 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5926 return -EINVAL;
5927 }
e1b4d96d 5928
1c2a9dfe 5929 rc = rbd_slab_init();
602adf40
YS
5930 if (rc)
5931 return rc;
e1b4d96d 5932
f5ee37bd
ID
5933 /*
5934 * The number of active work items is limited by the number of
f77303bd 5935 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
5936 */
5937 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5938 if (!rbd_wq) {
5939 rc = -ENOMEM;
5940 goto err_out_slab;
5941 }
5942
9b60e70b
ID
5943 if (single_major) {
5944 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5945 if (rbd_major < 0) {
5946 rc = rbd_major;
f5ee37bd 5947 goto err_out_wq;
9b60e70b
ID
5948 }
5949 }
5950
1c2a9dfe
AE
5951 rc = rbd_sysfs_init();
5952 if (rc)
9b60e70b
ID
5953 goto err_out_blkdev;
5954
5955 if (single_major)
5956 pr_info("loaded (major %d)\n", rbd_major);
5957 else
5958 pr_info("loaded\n");
1c2a9dfe 5959
e1b4d96d
ID
5960 return 0;
5961
9b60e70b
ID
5962err_out_blkdev:
5963 if (single_major)
5964 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
5965err_out_wq:
5966 destroy_workqueue(rbd_wq);
e1b4d96d
ID
5967err_out_slab:
5968 rbd_slab_exit();
1c2a9dfe 5969 return rc;
602adf40
YS
5970}
5971
cc344fa1 5972static void __exit rbd_exit(void)
602adf40 5973{
ffe312cf 5974 ida_destroy(&rbd_dev_id_ida);
602adf40 5975 rbd_sysfs_cleanup();
9b60e70b
ID
5976 if (single_major)
5977 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 5978 destroy_workqueue(rbd_wq);
1c2a9dfe 5979 rbd_slab_exit();
602adf40
YS
5980}
5981
5982module_init(rbd_init);
5983module_exit(rbd_exit);
5984
d552c619 5985MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
5986MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5987MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
5988/* following authorship retained from original osdblk.c */
5989MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5990
90da258b 5991MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 5992MODULE_LICENSE("GPL");