blk-mq: avoid starving tag allocation after allocating process migrates
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
43df3d35 35#include <linux/ceph/striper.h>
602adf40 36#include <linux/ceph/decode.h>
59c2be1e 37#include <linux/parser.h>
30d1cff8 38#include <linux/bsearch.h>
602adf40
YS
39
40#include <linux/kernel.h>
41#include <linux/device.h>
42#include <linux/module.h>
7ad18afa 43#include <linux/blk-mq.h>
602adf40
YS
44#include <linux/fs.h>
45#include <linux/blkdev.h>
1c2a9dfe 46#include <linux/slab.h>
f8a22fc2 47#include <linux/idr.h>
bc1ecc65 48#include <linux/workqueue.h>
602adf40
YS
49
50#include "rbd_types.h"
51
aafb230e
AE
52#define RBD_DEBUG /* Activate rbd_assert() calls */
53
a2acd00e
AE
54/*
55 * Increment the given counter and return its updated value.
56 * If the counter is already 0 it will not be incremented.
57 * If the counter is already at its maximum value returns
58 * -EINVAL without updating it.
59 */
60static int atomic_inc_return_safe(atomic_t *v)
61{
62 unsigned int counter;
63
64 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
65 if (counter <= (unsigned int)INT_MAX)
66 return (int)counter;
67
68 atomic_dec(v);
69
70 return -EINVAL;
71}
72
73/* Decrement the counter. Return the resulting value, or -EINVAL */
74static int atomic_dec_return_safe(atomic_t *v)
75{
76 int counter;
77
78 counter = atomic_dec_return(v);
79 if (counter >= 0)
80 return counter;
81
82 atomic_inc(v);
83
84 return -EINVAL;
85}
86
f0f8cef5 87#define RBD_DRV_NAME "rbd"
602adf40 88
7e513d43
ID
89#define RBD_MINORS_PER_MAJOR 256
90#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 91
6d69bb53
ID
92#define RBD_MAX_PARENT_CHAIN_LEN 16
93
d4b125e9
AE
94#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95#define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
35d489f9 98#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
99
100#define RBD_SNAP_HEAD_NAME "-"
101
9682fc6d
AE
102#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
103
9e15b77d
AE
104/* This allows a single page to hold an image name sent by OSD */
105#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 106#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 107
1e130199 108#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 109
ed95b21a 110#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
111#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
112
d889140c
AE
113/* Feature bits */
114
8767b293
ID
115#define RBD_FEATURE_LAYERING (1ULL<<0)
116#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118#define RBD_FEATURE_DATA_POOL (1ULL<<7)
e573427a 119#define RBD_FEATURE_OPERATIONS (1ULL<<8)
8767b293 120
ed95b21a
ID
121#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
122 RBD_FEATURE_STRIPINGV2 | \
7e97332e 123 RBD_FEATURE_EXCLUSIVE_LOCK | \
e573427a
ID
124 RBD_FEATURE_DATA_POOL | \
125 RBD_FEATURE_OPERATIONS)
d889140c
AE
126
127/* Features supported by this (client software) implementation. */
128
770eba6e 129#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 130
81a89793
AE
131/*
132 * An RBD device name will be "rbd#", where the "rbd" comes from
133 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 134 */
602adf40
YS
135#define DEV_NAME_LEN 32
136
137/*
138 * block device image metadata (in-memory version)
139 */
140struct rbd_image_header {
f35a4dee 141 /* These six fields never change for a given rbd image */
849b4260 142 char *object_prefix;
602adf40 143 __u8 obj_order;
f35a4dee
AE
144 u64 stripe_unit;
145 u64 stripe_count;
7e97332e 146 s64 data_pool_id;
f35a4dee 147 u64 features; /* Might be changeable someday? */
602adf40 148
f84344f3
AE
149 /* The remaining fields need to be updated occasionally */
150 u64 image_size;
151 struct ceph_snap_context *snapc;
f35a4dee
AE
152 char *snap_names; /* format 1 only */
153 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
154};
155
0d7dbfce
AE
156/*
157 * An rbd image specification.
158 *
159 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
160 * identify an image. Each rbd_dev structure includes a pointer to
161 * an rbd_spec structure that encapsulates this identity.
162 *
163 * Each of the id's in an rbd_spec has an associated name. For a
164 * user-mapped image, the names are supplied and the id's associated
165 * with them are looked up. For a layered image, a parent image is
166 * defined by the tuple, and the names are looked up.
167 *
168 * An rbd_dev structure contains a parent_spec pointer which is
169 * non-null if the image it represents is a child in a layered
170 * image. This pointer will refer to the rbd_spec structure used
171 * by the parent rbd_dev for its own identity (i.e., the structure
172 * is shared between the parent and child).
173 *
174 * Since these structures are populated once, during the discovery
175 * phase of image construction, they are effectively immutable so
176 * we make no effort to synchronize access to them.
177 *
178 * Note that code herein does not assume the image name is known (it
179 * could be a null pointer).
0d7dbfce
AE
180 */
181struct rbd_spec {
182 u64 pool_id;
ecb4dc22 183 const char *pool_name;
0d7dbfce 184
ecb4dc22
AE
185 const char *image_id;
186 const char *image_name;
0d7dbfce
AE
187
188 u64 snap_id;
ecb4dc22 189 const char *snap_name;
0d7dbfce
AE
190
191 struct kref kref;
192};
193
602adf40 194/*
f0f8cef5 195 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
196 */
197struct rbd_client {
198 struct ceph_client *client;
199 struct kref kref;
200 struct list_head node;
201};
202
bf0d5f50 203struct rbd_img_request;
bf0d5f50 204
9969ebc5 205enum obj_request_type {
a1fbb5e7 206 OBJ_REQUEST_NODATA = 1,
5359a17d 207 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
7e07efb1 208 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
afb97888 209 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
9969ebc5 210};
bf0d5f50 211
6d2940c8 212enum obj_operation_type {
a1fbb5e7 213 OBJ_OP_READ = 1,
6d2940c8 214 OBJ_OP_WRITE,
90e98c52 215 OBJ_OP_DISCARD,
6d2940c8
GZ
216};
217
3da691bf
ID
218/*
219 * Writes go through the following state machine to deal with
220 * layering:
221 *
222 * need copyup
223 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
224 * | ^ |
225 * v \------------------------------/
226 * done
227 * ^
228 * |
229 * RBD_OBJ_WRITE_FLAT
230 *
231 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
232 * there is a parent or not.
233 */
234enum rbd_obj_write_state {
235 RBD_OBJ_WRITE_FLAT = 1,
236 RBD_OBJ_WRITE_GUARD,
237 RBD_OBJ_WRITE_COPYUP,
926f9b3f
AE
238};
239
bf0d5f50 240struct rbd_obj_request {
43df3d35 241 struct ceph_object_extent ex;
c5b5ef6c 242 union {
3da691bf
ID
243 bool tried_parent; /* for reads */
244 enum rbd_obj_write_state write_state; /* for writes */
c5b5ef6c 245 };
bf0d5f50 246
51c3509e 247 struct rbd_img_request *img_request;
86bd7998
ID
248 struct ceph_file_extent *img_extents;
249 u32 num_img_extents;
bf0d5f50 250
788e2df3 251 union {
5359a17d 252 struct ceph_bio_iter bio_pos;
788e2df3 253 struct {
7e07efb1
ID
254 struct ceph_bvec_iter bvec_pos;
255 u32 bvec_count;
afb97888 256 u32 bvec_idx;
788e2df3
AE
257 };
258 };
7e07efb1
ID
259 struct bio_vec *copyup_bvecs;
260 u32 copyup_bvec_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50 266
bf0d5f50
AE
267 struct kref kref;
268};
269
0c425248 270enum img_req_flags {
9849e986 271 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 272 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
273};
274
bf0d5f50 275struct rbd_img_request {
bf0d5f50 276 struct rbd_device *rbd_dev;
9bb0248d 277 enum obj_operation_type op_type;
ecc633ca 278 enum obj_request_type data_type;
0c425248 279 unsigned long flags;
bf0d5f50 280 union {
9849e986 281 u64 snap_id; /* for reads */
bf0d5f50 282 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
283 };
284 union {
285 struct request *rq; /* block request */
286 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 287 };
15961b44 288 spinlock_t completion_lock;
55f27e09 289 u64 xferred;/* aggregate bytes transferred */
a5a337d4 290 int result; /* first nonzero obj_request result */
bf0d5f50 291
43df3d35 292 struct list_head object_extents; /* obj_req.ex structs */
bf0d5f50 293 u32 obj_request_count;
7114edac 294 u32 pending_count;
bf0d5f50
AE
295
296 struct kref kref;
297};
298
299#define for_each_obj_request(ireq, oreq) \
43df3d35 300 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 301#define for_each_obj_request_safe(ireq, oreq, n) \
43df3d35 302 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 303
99d16943
ID
304enum rbd_watch_state {
305 RBD_WATCH_STATE_UNREGISTERED,
306 RBD_WATCH_STATE_REGISTERED,
307 RBD_WATCH_STATE_ERROR,
308};
309
ed95b21a
ID
310enum rbd_lock_state {
311 RBD_LOCK_STATE_UNLOCKED,
312 RBD_LOCK_STATE_LOCKED,
313 RBD_LOCK_STATE_RELEASING,
314};
315
316/* WatchNotify::ClientId */
317struct rbd_client_id {
318 u64 gid;
319 u64 handle;
320};
321
f84344f3 322struct rbd_mapping {
99c1f08f 323 u64 size;
34b13184 324 u64 features;
f84344f3
AE
325};
326
602adf40
YS
327/*
328 * a single device
329 */
330struct rbd_device {
de71a297 331 int dev_id; /* blkdev unique id */
602adf40
YS
332
333 int major; /* blkdev assigned major */
dd82fff1 334 int minor;
602adf40 335 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 336
a30b71b9 337 u32 image_format; /* Either 1 or 2 */
602adf40
YS
338 struct rbd_client *rbd_client;
339
340 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
341
b82d167b 342 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
343
344 struct rbd_image_header header;
b82d167b 345 unsigned long flags; /* possibly lock protected */
0d7dbfce 346 struct rbd_spec *spec;
d147543d 347 struct rbd_options *opts;
0d6d1e9c 348 char *config_info; /* add{,_single_major} string */
602adf40 349
c41d13a3 350 struct ceph_object_id header_oid;
922dab61 351 struct ceph_object_locator header_oloc;
971f839a 352
1643dfa4 353 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 354
99d16943
ID
355 struct mutex watch_mutex;
356 enum rbd_watch_state watch_state;
922dab61 357 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
358 u64 watch_cookie;
359 struct delayed_work watch_dwork;
59c2be1e 360
ed95b21a
ID
361 struct rw_semaphore lock_rwsem;
362 enum rbd_lock_state lock_state;
cbbfb0ff 363 char lock_cookie[32];
ed95b21a
ID
364 struct rbd_client_id owner_cid;
365 struct work_struct acquired_lock_work;
366 struct work_struct released_lock_work;
367 struct delayed_work lock_dwork;
368 struct work_struct unlock_work;
369 wait_queue_head_t lock_waitq;
370
1643dfa4 371 struct workqueue_struct *task_wq;
59c2be1e 372
86b00e0d
AE
373 struct rbd_spec *parent_spec;
374 u64 parent_overlap;
a2acd00e 375 atomic_t parent_ref;
2f82ee54 376 struct rbd_device *parent;
86b00e0d 377
7ad18afa
CH
378 /* Block layer tags. */
379 struct blk_mq_tag_set tag_set;
380
c666601a
JD
381 /* protects updating the header */
382 struct rw_semaphore header_rwsem;
f84344f3
AE
383
384 struct rbd_mapping mapping;
602adf40
YS
385
386 struct list_head node;
dfc5606d 387
dfc5606d
YS
388 /* sysfs related */
389 struct device dev;
b82d167b 390 unsigned long open_count; /* protected by lock */
dfc5606d
YS
391};
392
b82d167b 393/*
87c0fded
ID
394 * Flag bits for rbd_dev->flags:
395 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
396 * by rbd_dev->lock
397 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 398 */
6d292906
AE
399enum rbd_dev_flags {
400 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 401 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 402 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
403};
404
cfbf6377 405static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 406
602adf40 407static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
408static DEFINE_SPINLOCK(rbd_dev_list_lock);
409
432b8587
AE
410static LIST_HEAD(rbd_client_list); /* clients */
411static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 412
78c2a44a
AE
413/* Slab caches for frequently-allocated structures */
414
1c2a9dfe 415static struct kmem_cache *rbd_img_request_cache;
868311b1 416static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 417
9b60e70b 418static int rbd_major;
f8a22fc2
ID
419static DEFINE_IDA(rbd_dev_id_ida);
420
f5ee37bd
ID
421static struct workqueue_struct *rbd_wq;
422
9b60e70b 423/*
3cfa3b16 424 * single-major requires >= 0.75 version of userspace rbd utility.
9b60e70b 425 */
3cfa3b16 426static bool single_major = true;
9b60e70b 427module_param(single_major, bool, S_IRUGO);
3cfa3b16 428MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
9b60e70b 429
f0f8cef5
AE
430static ssize_t rbd_add(struct bus_type *bus, const char *buf,
431 size_t count);
432static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
433 size_t count);
9b60e70b
ID
434static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
435 size_t count);
436static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
437 size_t count);
6d69bb53 438static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
f0f8cef5 439
9b60e70b
ID
440static int rbd_dev_id_to_minor(int dev_id)
441{
7e513d43 442 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
443}
444
445static int minor_to_rbd_dev_id(int minor)
446{
7e513d43 447 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
448}
449
ed95b21a
ID
450static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
451{
452 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
453 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
454}
455
456static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
457{
458 bool is_lock_owner;
459
460 down_read(&rbd_dev->lock_rwsem);
461 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
462 up_read(&rbd_dev->lock_rwsem);
463 return is_lock_owner;
464}
465
8767b293
ID
466static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
467{
468 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
469}
470
b15a21dd
GKH
471static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
472static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
473static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
474static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
8767b293 475static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
b15a21dd
GKH
476
477static struct attribute *rbd_bus_attrs[] = {
478 &bus_attr_add.attr,
479 &bus_attr_remove.attr,
9b60e70b
ID
480 &bus_attr_add_single_major.attr,
481 &bus_attr_remove_single_major.attr,
8767b293 482 &bus_attr_supported_features.attr,
b15a21dd 483 NULL,
f0f8cef5 484};
92c76dc0
ID
485
486static umode_t rbd_bus_is_visible(struct kobject *kobj,
487 struct attribute *attr, int index)
488{
9b60e70b
ID
489 if (!single_major &&
490 (attr == &bus_attr_add_single_major.attr ||
491 attr == &bus_attr_remove_single_major.attr))
492 return 0;
493
92c76dc0
ID
494 return attr->mode;
495}
496
497static const struct attribute_group rbd_bus_group = {
498 .attrs = rbd_bus_attrs,
499 .is_visible = rbd_bus_is_visible,
500};
501__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
502
503static struct bus_type rbd_bus_type = {
504 .name = "rbd",
b15a21dd 505 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
506};
507
508static void rbd_root_dev_release(struct device *dev)
509{
510}
511
512static struct device rbd_root_dev = {
513 .init_name = "rbd",
514 .release = rbd_root_dev_release,
515};
516
06ecc6cb
AE
517static __printf(2, 3)
518void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
519{
520 struct va_format vaf;
521 va_list args;
522
523 va_start(args, fmt);
524 vaf.fmt = fmt;
525 vaf.va = &args;
526
527 if (!rbd_dev)
528 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
529 else if (rbd_dev->disk)
530 printk(KERN_WARNING "%s: %s: %pV\n",
531 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
532 else if (rbd_dev->spec && rbd_dev->spec->image_name)
533 printk(KERN_WARNING "%s: image %s: %pV\n",
534 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
535 else if (rbd_dev->spec && rbd_dev->spec->image_id)
536 printk(KERN_WARNING "%s: id %s: %pV\n",
537 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
538 else /* punt */
539 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
540 RBD_DRV_NAME, rbd_dev, &vaf);
541 va_end(args);
542}
543
aafb230e
AE
544#ifdef RBD_DEBUG
545#define rbd_assert(expr) \
546 if (unlikely(!(expr))) { \
547 printk(KERN_ERR "\nAssertion failure in %s() " \
548 "at line %d:\n\n" \
549 "\trbd_assert(%s);\n\n", \
550 __func__, __LINE__, #expr); \
551 BUG(); \
552 }
553#else /* !RBD_DEBUG */
554# define rbd_assert(expr) ((void) 0)
555#endif /* !RBD_DEBUG */
dfc5606d 556
05a46afd 557static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 558
cc4a38bd 559static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 560static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 561static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 562static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
563static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
564 u64 snap_id);
2ad3d716
AE
565static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
566 u8 *order, u64 *snap_size);
567static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
568 u64 *snap_features);
59c2be1e 569
602adf40
YS
570static int rbd_open(struct block_device *bdev, fmode_t mode)
571{
f0f8cef5 572 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 573 bool removing = false;
602adf40 574
a14ea269 575 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
576 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
577 removing = true;
578 else
579 rbd_dev->open_count++;
a14ea269 580 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
581 if (removing)
582 return -ENOENT;
583
c3e946ce 584 (void) get_device(&rbd_dev->dev);
340c7a2b 585
602adf40
YS
586 return 0;
587}
588
db2a144b 589static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
590{
591 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
592 unsigned long open_count_before;
593
a14ea269 594 spin_lock_irq(&rbd_dev->lock);
b82d167b 595 open_count_before = rbd_dev->open_count--;
a14ea269 596 spin_unlock_irq(&rbd_dev->lock);
b82d167b 597 rbd_assert(open_count_before > 0);
dfc5606d 598
c3e946ce 599 put_device(&rbd_dev->dev);
dfc5606d
YS
600}
601
131fd9f6
GZ
602static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
603{
1de797bb 604 int ro;
131fd9f6 605
1de797bb 606 if (get_user(ro, (int __user *)arg))
131fd9f6
GZ
607 return -EFAULT;
608
1de797bb 609 /* Snapshots can't be marked read-write */
131fd9f6
GZ
610 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
611 return -EROFS;
612
1de797bb
ID
613 /* Let blkdev_roset() handle it */
614 return -ENOTTY;
131fd9f6
GZ
615}
616
617static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
618 unsigned int cmd, unsigned long arg)
619{
620 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
1de797bb 621 int ret;
131fd9f6 622
131fd9f6
GZ
623 switch (cmd) {
624 case BLKROSET:
625 ret = rbd_ioctl_set_ro(rbd_dev, arg);
626 break;
627 default:
628 ret = -ENOTTY;
629 }
630
131fd9f6
GZ
631 return ret;
632}
633
634#ifdef CONFIG_COMPAT
635static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
636 unsigned int cmd, unsigned long arg)
637{
638 return rbd_ioctl(bdev, mode, cmd, arg);
639}
640#endif /* CONFIG_COMPAT */
641
602adf40
YS
642static const struct block_device_operations rbd_bd_ops = {
643 .owner = THIS_MODULE,
644 .open = rbd_open,
dfc5606d 645 .release = rbd_release,
131fd9f6
GZ
646 .ioctl = rbd_ioctl,
647#ifdef CONFIG_COMPAT
648 .compat_ioctl = rbd_compat_ioctl,
649#endif
602adf40
YS
650};
651
652/*
7262cfca 653 * Initialize an rbd client instance. Success or not, this function
cfbf6377 654 * consumes ceph_opts. Caller holds client_mutex.
602adf40 655 */
f8c38929 656static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
657{
658 struct rbd_client *rbdc;
659 int ret = -ENOMEM;
660
37206ee5 661 dout("%s:\n", __func__);
602adf40
YS
662 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
663 if (!rbdc)
664 goto out_opt;
665
666 kref_init(&rbdc->kref);
667 INIT_LIST_HEAD(&rbdc->node);
668
74da4a0f 669 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 670 if (IS_ERR(rbdc->client))
08f75463 671 goto out_rbdc;
43ae4701 672 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
673
674 ret = ceph_open_session(rbdc->client);
675 if (ret < 0)
08f75463 676 goto out_client;
602adf40 677
432b8587 678 spin_lock(&rbd_client_list_lock);
602adf40 679 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 680 spin_unlock(&rbd_client_list_lock);
602adf40 681
37206ee5 682 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 683
602adf40 684 return rbdc;
08f75463 685out_client:
602adf40 686 ceph_destroy_client(rbdc->client);
08f75463 687out_rbdc:
602adf40
YS
688 kfree(rbdc);
689out_opt:
43ae4701
AE
690 if (ceph_opts)
691 ceph_destroy_options(ceph_opts);
37206ee5
AE
692 dout("%s: error %d\n", __func__, ret);
693
28f259b7 694 return ERR_PTR(ret);
602adf40
YS
695}
696
2f82ee54
AE
697static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
698{
699 kref_get(&rbdc->kref);
700
701 return rbdc;
702}
703
602adf40 704/*
1f7ba331
AE
705 * Find a ceph client with specific addr and configuration. If
706 * found, bump its reference count.
602adf40 707 */
1f7ba331 708static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
709{
710 struct rbd_client *client_node;
1f7ba331 711 bool found = false;
602adf40 712
43ae4701 713 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
714 return NULL;
715
1f7ba331
AE
716 spin_lock(&rbd_client_list_lock);
717 list_for_each_entry(client_node, &rbd_client_list, node) {
718 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
719 __rbd_get_client(client_node);
720
1f7ba331
AE
721 found = true;
722 break;
723 }
724 }
725 spin_unlock(&rbd_client_list_lock);
726
727 return found ? client_node : NULL;
602adf40
YS
728}
729
59c2be1e 730/*
210c104c 731 * (Per device) rbd map options
59c2be1e
YS
732 */
733enum {
b5584180 734 Opt_queue_depth,
34f55d0b 735 Opt_lock_timeout,
59c2be1e
YS
736 Opt_last_int,
737 /* int args above */
738 Opt_last_string,
739 /* string args above */
cc0538b6
AE
740 Opt_read_only,
741 Opt_read_write,
80de1912 742 Opt_lock_on_read,
e010dd0a 743 Opt_exclusive,
d9360540 744 Opt_notrim,
210c104c 745 Opt_err
59c2be1e
YS
746};
747
43ae4701 748static match_table_t rbd_opts_tokens = {
b5584180 749 {Opt_queue_depth, "queue_depth=%d"},
34f55d0b 750 {Opt_lock_timeout, "lock_timeout=%d"},
59c2be1e
YS
751 /* int args above */
752 /* string args above */
be466c1c 753 {Opt_read_only, "read_only"},
cc0538b6
AE
754 {Opt_read_only, "ro"}, /* Alternate spelling */
755 {Opt_read_write, "read_write"},
756 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 757 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 758 {Opt_exclusive, "exclusive"},
d9360540 759 {Opt_notrim, "notrim"},
210c104c 760 {Opt_err, NULL}
59c2be1e
YS
761};
762
98571b5a 763struct rbd_options {
b5584180 764 int queue_depth;
34f55d0b 765 unsigned long lock_timeout;
98571b5a 766 bool read_only;
80de1912 767 bool lock_on_read;
e010dd0a 768 bool exclusive;
d9360540 769 bool trim;
98571b5a
AE
770};
771
b5584180 772#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
34f55d0b 773#define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
98571b5a 774#define RBD_READ_ONLY_DEFAULT false
80de1912 775#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 776#define RBD_EXCLUSIVE_DEFAULT false
d9360540 777#define RBD_TRIM_DEFAULT true
98571b5a 778
59c2be1e
YS
779static int parse_rbd_opts_token(char *c, void *private)
780{
43ae4701 781 struct rbd_options *rbd_opts = private;
59c2be1e
YS
782 substring_t argstr[MAX_OPT_ARGS];
783 int token, intval, ret;
784
43ae4701 785 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
786 if (token < Opt_last_int) {
787 ret = match_int(&argstr[0], &intval);
788 if (ret < 0) {
210c104c 789 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
790 return ret;
791 }
792 dout("got int token %d val %d\n", token, intval);
793 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 794 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
795 } else {
796 dout("got token %d\n", token);
797 }
798
799 switch (token) {
b5584180
ID
800 case Opt_queue_depth:
801 if (intval < 1) {
802 pr_err("queue_depth out of range\n");
803 return -EINVAL;
804 }
805 rbd_opts->queue_depth = intval;
806 break;
34f55d0b
DY
807 case Opt_lock_timeout:
808 /* 0 is "wait forever" (i.e. infinite timeout) */
809 if (intval < 0 || intval > INT_MAX / 1000) {
810 pr_err("lock_timeout out of range\n");
811 return -EINVAL;
812 }
813 rbd_opts->lock_timeout = msecs_to_jiffies(intval * 1000);
814 break;
cc0538b6
AE
815 case Opt_read_only:
816 rbd_opts->read_only = true;
817 break;
818 case Opt_read_write:
819 rbd_opts->read_only = false;
820 break;
80de1912
ID
821 case Opt_lock_on_read:
822 rbd_opts->lock_on_read = true;
823 break;
e010dd0a
ID
824 case Opt_exclusive:
825 rbd_opts->exclusive = true;
826 break;
d9360540
ID
827 case Opt_notrim:
828 rbd_opts->trim = false;
829 break;
59c2be1e 830 default:
210c104c
ID
831 /* libceph prints "bad option" msg */
832 return -EINVAL;
59c2be1e 833 }
210c104c 834
59c2be1e
YS
835 return 0;
836}
837
6d2940c8
GZ
838static char* obj_op_name(enum obj_operation_type op_type)
839{
840 switch (op_type) {
841 case OBJ_OP_READ:
842 return "read";
843 case OBJ_OP_WRITE:
844 return "write";
90e98c52
GZ
845 case OBJ_OP_DISCARD:
846 return "discard";
6d2940c8
GZ
847 default:
848 return "???";
849 }
850}
851
602adf40
YS
852/*
853 * Destroy ceph client
d23a4b3f 854 *
432b8587 855 * Caller must hold rbd_client_list_lock.
602adf40
YS
856 */
857static void rbd_client_release(struct kref *kref)
858{
859 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
860
37206ee5 861 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 862 spin_lock(&rbd_client_list_lock);
602adf40 863 list_del(&rbdc->node);
cd9d9f5d 864 spin_unlock(&rbd_client_list_lock);
602adf40
YS
865
866 ceph_destroy_client(rbdc->client);
867 kfree(rbdc);
868}
869
870/*
871 * Drop reference to ceph client node. If it's not referenced anymore, release
872 * it.
873 */
9d3997fd 874static void rbd_put_client(struct rbd_client *rbdc)
602adf40 875{
c53d5893
AE
876 if (rbdc)
877 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
878}
879
dd435855
ID
880static int wait_for_latest_osdmap(struct ceph_client *client)
881{
882 u64 newest_epoch;
883 int ret;
884
885 ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
886 if (ret)
887 return ret;
888
889 if (client->osdc.osdmap->epoch >= newest_epoch)
890 return 0;
891
892 ceph_osdc_maybe_request_map(&client->osdc);
893 return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
894 client->options->mount_timeout);
895}
896
5feb0d8d
ID
897/*
898 * Get a ceph client with specific addr and configuration, if one does
899 * not exist create it. Either way, ceph_opts is consumed by this
900 * function.
901 */
902static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
903{
904 struct rbd_client *rbdc;
dd435855 905 int ret;
5feb0d8d
ID
906
907 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
908 rbdc = rbd_client_find(ceph_opts);
dd435855 909 if (rbdc) {
5feb0d8d 910 ceph_destroy_options(ceph_opts);
dd435855
ID
911
912 /*
913 * Using an existing client. Make sure ->pg_pools is up to
914 * date before we look up the pool id in do_rbd_add().
915 */
916 ret = wait_for_latest_osdmap(rbdc->client);
917 if (ret) {
918 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
919 rbd_put_client(rbdc);
920 rbdc = ERR_PTR(ret);
921 }
922 } else {
5feb0d8d 923 rbdc = rbd_client_create(ceph_opts);
dd435855 924 }
5feb0d8d
ID
925 mutex_unlock(&client_mutex);
926
927 return rbdc;
928}
929
a30b71b9
AE
930static bool rbd_image_format_valid(u32 image_format)
931{
932 return image_format == 1 || image_format == 2;
933}
934
8e94af8e
AE
935static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
936{
103a150f
AE
937 size_t size;
938 u32 snap_count;
939
940 /* The header has to start with the magic rbd header text */
941 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
942 return false;
943
db2388b6
AE
944 /* The bio layer requires at least sector-sized I/O */
945
946 if (ondisk->options.order < SECTOR_SHIFT)
947 return false;
948
949 /* If we use u64 in a few spots we may be able to loosen this */
950
951 if (ondisk->options.order > 8 * sizeof (int) - 1)
952 return false;
953
103a150f
AE
954 /*
955 * The size of a snapshot header has to fit in a size_t, and
956 * that limits the number of snapshots.
957 */
958 snap_count = le32_to_cpu(ondisk->snap_count);
959 size = SIZE_MAX - sizeof (struct ceph_snap_context);
960 if (snap_count > size / sizeof (__le64))
961 return false;
962
963 /*
964 * Not only that, but the size of the entire the snapshot
965 * header must also be representable in a size_t.
966 */
967 size -= snap_count * sizeof (__le64);
968 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
969 return false;
970
971 return true;
8e94af8e
AE
972}
973
5bc3fb17
ID
974/*
975 * returns the size of an object in the image
976 */
977static u32 rbd_obj_bytes(struct rbd_image_header *header)
978{
979 return 1U << header->obj_order;
980}
981
263423f8
ID
982static void rbd_init_layout(struct rbd_device *rbd_dev)
983{
984 if (rbd_dev->header.stripe_unit == 0 ||
985 rbd_dev->header.stripe_count == 0) {
986 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
987 rbd_dev->header.stripe_count = 1;
988 }
989
990 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
991 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
992 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
993 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
994 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
995 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
996}
997
602adf40 998/*
bb23e37a
AE
999 * Fill an rbd image header with information from the given format 1
1000 * on-disk header.
602adf40 1001 */
662518b1 1002static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 1003 struct rbd_image_header_ondisk *ondisk)
602adf40 1004{
662518b1 1005 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
1006 bool first_time = header->object_prefix == NULL;
1007 struct ceph_snap_context *snapc;
1008 char *object_prefix = NULL;
1009 char *snap_names = NULL;
1010 u64 *snap_sizes = NULL;
ccece235 1011 u32 snap_count;
bb23e37a 1012 int ret = -ENOMEM;
621901d6 1013 u32 i;
602adf40 1014
bb23e37a 1015 /* Allocate this now to avoid having to handle failure below */
6a52325f 1016
bb23e37a 1017 if (first_time) {
848d796c
ID
1018 object_prefix = kstrndup(ondisk->object_prefix,
1019 sizeof(ondisk->object_prefix),
1020 GFP_KERNEL);
bb23e37a
AE
1021 if (!object_prefix)
1022 return -ENOMEM;
bb23e37a 1023 }
00f1f36f 1024
bb23e37a 1025 /* Allocate the snapshot context and fill it in */
00f1f36f 1026
bb23e37a
AE
1027 snap_count = le32_to_cpu(ondisk->snap_count);
1028 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1029 if (!snapc)
1030 goto out_err;
1031 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1032 if (snap_count) {
bb23e37a 1033 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1034 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1035
bb23e37a 1036 /* We'll keep a copy of the snapshot names... */
621901d6 1037
bb23e37a
AE
1038 if (snap_names_len > (u64)SIZE_MAX)
1039 goto out_2big;
1040 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1041 if (!snap_names)
6a52325f
AE
1042 goto out_err;
1043
bb23e37a 1044 /* ...as well as the array of their sizes. */
88a25a5f
ME
1045 snap_sizes = kmalloc_array(snap_count,
1046 sizeof(*header->snap_sizes),
1047 GFP_KERNEL);
bb23e37a 1048 if (!snap_sizes)
6a52325f 1049 goto out_err;
bb23e37a 1050
f785cc1d 1051 /*
bb23e37a
AE
1052 * Copy the names, and fill in each snapshot's id
1053 * and size.
1054 *
99a41ebc 1055 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1056 * ondisk buffer we're working with has
f785cc1d
AE
1057 * snap_names_len bytes beyond the end of the
1058 * snapshot id array, this memcpy() is safe.
1059 */
bb23e37a
AE
1060 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1061 snaps = ondisk->snaps;
1062 for (i = 0; i < snap_count; i++) {
1063 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1064 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1065 }
602adf40 1066 }
6a52325f 1067
bb23e37a 1068 /* We won't fail any more, fill in the header */
621901d6 1069
bb23e37a
AE
1070 if (first_time) {
1071 header->object_prefix = object_prefix;
1072 header->obj_order = ondisk->options.order;
263423f8 1073 rbd_init_layout(rbd_dev);
602adf40 1074 } else {
662518b1
AE
1075 ceph_put_snap_context(header->snapc);
1076 kfree(header->snap_names);
1077 kfree(header->snap_sizes);
602adf40 1078 }
849b4260 1079
bb23e37a 1080 /* The remaining fields always get updated (when we refresh) */
621901d6 1081
f84344f3 1082 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1083 header->snapc = snapc;
1084 header->snap_names = snap_names;
1085 header->snap_sizes = snap_sizes;
468521c1 1086
602adf40 1087 return 0;
bb23e37a
AE
1088out_2big:
1089 ret = -EIO;
6a52325f 1090out_err:
bb23e37a
AE
1091 kfree(snap_sizes);
1092 kfree(snap_names);
1093 ceph_put_snap_context(snapc);
1094 kfree(object_prefix);
ccece235 1095
bb23e37a 1096 return ret;
602adf40
YS
1097}
1098
9682fc6d
AE
1099static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1100{
1101 const char *snap_name;
1102
1103 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1104
1105 /* Skip over names until we find the one we are looking for */
1106
1107 snap_name = rbd_dev->header.snap_names;
1108 while (which--)
1109 snap_name += strlen(snap_name) + 1;
1110
1111 return kstrdup(snap_name, GFP_KERNEL);
1112}
1113
30d1cff8
AE
1114/*
1115 * Snapshot id comparison function for use with qsort()/bsearch().
1116 * Note that result is for snapshots in *descending* order.
1117 */
1118static int snapid_compare_reverse(const void *s1, const void *s2)
1119{
1120 u64 snap_id1 = *(u64 *)s1;
1121 u64 snap_id2 = *(u64 *)s2;
1122
1123 if (snap_id1 < snap_id2)
1124 return 1;
1125 return snap_id1 == snap_id2 ? 0 : -1;
1126}
1127
1128/*
1129 * Search a snapshot context to see if the given snapshot id is
1130 * present.
1131 *
1132 * Returns the position of the snapshot id in the array if it's found,
1133 * or BAD_SNAP_INDEX otherwise.
1134 *
1135 * Note: The snapshot array is in kept sorted (by the osd) in
1136 * reverse order, highest snapshot id first.
1137 */
9682fc6d
AE
1138static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1139{
1140 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1141 u64 *found;
9682fc6d 1142
30d1cff8
AE
1143 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1144 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1145
30d1cff8 1146 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1147}
1148
2ad3d716
AE
1149static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1150 u64 snap_id)
9e15b77d 1151{
54cac61f 1152 u32 which;
da6a6b63 1153 const char *snap_name;
9e15b77d 1154
54cac61f
AE
1155 which = rbd_dev_snap_index(rbd_dev, snap_id);
1156 if (which == BAD_SNAP_INDEX)
da6a6b63 1157 return ERR_PTR(-ENOENT);
54cac61f 1158
da6a6b63
JD
1159 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1160 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1161}
1162
1163static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1164{
9e15b77d
AE
1165 if (snap_id == CEPH_NOSNAP)
1166 return RBD_SNAP_HEAD_NAME;
1167
54cac61f
AE
1168 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1169 if (rbd_dev->image_format == 1)
1170 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1171
54cac61f 1172 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1173}
1174
2ad3d716
AE
1175static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1176 u64 *snap_size)
602adf40 1177{
2ad3d716
AE
1178 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1179 if (snap_id == CEPH_NOSNAP) {
1180 *snap_size = rbd_dev->header.image_size;
1181 } else if (rbd_dev->image_format == 1) {
1182 u32 which;
602adf40 1183
2ad3d716
AE
1184 which = rbd_dev_snap_index(rbd_dev, snap_id);
1185 if (which == BAD_SNAP_INDEX)
1186 return -ENOENT;
e86924a8 1187
2ad3d716
AE
1188 *snap_size = rbd_dev->header.snap_sizes[which];
1189 } else {
1190 u64 size = 0;
1191 int ret;
1192
1193 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1194 if (ret)
1195 return ret;
1196
1197 *snap_size = size;
1198 }
1199 return 0;
602adf40
YS
1200}
1201
2ad3d716
AE
1202static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1203 u64 *snap_features)
602adf40 1204{
2ad3d716
AE
1205 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1206 if (snap_id == CEPH_NOSNAP) {
1207 *snap_features = rbd_dev->header.features;
1208 } else if (rbd_dev->image_format == 1) {
1209 *snap_features = 0; /* No features for format 1 */
602adf40 1210 } else {
2ad3d716
AE
1211 u64 features = 0;
1212 int ret;
8b0241f8 1213
2ad3d716
AE
1214 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1215 if (ret)
1216 return ret;
1217
1218 *snap_features = features;
1219 }
1220 return 0;
1221}
1222
1223static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1224{
8f4b7d98 1225 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1226 u64 size = 0;
1227 u64 features = 0;
1228 int ret;
1229
2ad3d716
AE
1230 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1231 if (ret)
1232 return ret;
1233 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1234 if (ret)
1235 return ret;
1236
1237 rbd_dev->mapping.size = size;
1238 rbd_dev->mapping.features = features;
1239
8b0241f8 1240 return 0;
602adf40
YS
1241}
1242
d1cf5788
AE
1243static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1244{
1245 rbd_dev->mapping.size = 0;
1246 rbd_dev->mapping.features = 0;
200a6a8b
AE
1247}
1248
5359a17d 1249static void zero_bvec(struct bio_vec *bv)
602adf40 1250{
602adf40 1251 void *buf;
5359a17d 1252 unsigned long flags;
602adf40 1253
5359a17d
ID
1254 buf = bvec_kmap_irq(bv, &flags);
1255 memset(buf, 0, bv->bv_len);
1256 flush_dcache_page(bv->bv_page);
1257 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1258}
1259
5359a17d 1260static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
b9434c5b 1261{
5359a17d 1262 struct ceph_bio_iter it = *bio_pos;
b9434c5b 1263
5359a17d
ID
1264 ceph_bio_iter_advance(&it, off);
1265 ceph_bio_iter_advance_step(&it, bytes, ({
1266 zero_bvec(&bv);
1267 }));
b9434c5b
AE
1268}
1269
7e07efb1 1270static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
602adf40 1271{
7e07efb1 1272 struct ceph_bvec_iter it = *bvec_pos;
602adf40 1273
7e07efb1
ID
1274 ceph_bvec_iter_advance(&it, off);
1275 ceph_bvec_iter_advance_step(&it, bytes, ({
1276 zero_bvec(&bv);
1277 }));
f7760dad
AE
1278}
1279
1280/*
3da691bf 1281 * Zero a range in @obj_req data buffer defined by a bio (list) or
afb97888 1282 * (private) bio_vec array.
f7760dad 1283 *
3da691bf 1284 * @off is relative to the start of the data buffer.
926f9b3f 1285 */
3da691bf
ID
1286static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1287 u32 bytes)
926f9b3f 1288{
ecc633ca 1289 switch (obj_req->img_request->data_type) {
3da691bf
ID
1290 case OBJ_REQUEST_BIO:
1291 zero_bios(&obj_req->bio_pos, off, bytes);
1292 break;
1293 case OBJ_REQUEST_BVECS:
afb97888 1294 case OBJ_REQUEST_OWN_BVECS:
3da691bf
ID
1295 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1296 break;
1297 default:
1298 rbd_assert(0);
6365d33a
AE
1299 }
1300}
1301
bf0d5f50
AE
1302static void rbd_obj_request_destroy(struct kref *kref);
1303static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1304{
1305 rbd_assert(obj_request != NULL);
37206ee5 1306 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1307 kref_read(&obj_request->kref));
bf0d5f50
AE
1308 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1309}
1310
0f2d5be7
AE
1311static void rbd_img_request_get(struct rbd_img_request *img_request)
1312{
1313 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1314 kref_read(&img_request->kref));
0f2d5be7
AE
1315 kref_get(&img_request->kref);
1316}
1317
bf0d5f50
AE
1318static void rbd_img_request_destroy(struct kref *kref);
1319static void rbd_img_request_put(struct rbd_img_request *img_request)
1320{
1321 rbd_assert(img_request != NULL);
37206ee5 1322 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1323 kref_read(&img_request->kref));
e93aca0a 1324 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1325}
1326
1327static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1328 struct rbd_obj_request *obj_request)
1329{
25dcf954
AE
1330 rbd_assert(obj_request->img_request == NULL);
1331
b155e86c 1332 /* Image request now owns object's original reference */
bf0d5f50 1333 obj_request->img_request = img_request;
25dcf954 1334 img_request->obj_request_count++;
7114edac 1335 img_request->pending_count++;
15961b44 1336 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
bf0d5f50
AE
1337}
1338
1339static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1340 struct rbd_obj_request *obj_request)
1341{
15961b44 1342 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
43df3d35 1343 list_del(&obj_request->ex.oe_item);
25dcf954
AE
1344 rbd_assert(img_request->obj_request_count > 0);
1345 img_request->obj_request_count--;
bf0d5f50 1346 rbd_assert(obj_request->img_request == img_request);
bf0d5f50
AE
1347 rbd_obj_request_put(obj_request);
1348}
1349
980917fc 1350static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1351{
980917fc
ID
1352 struct ceph_osd_request *osd_req = obj_request->osd_req;
1353
a90bb0c1 1354 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
43df3d35
ID
1355 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1356 obj_request->ex.oe_len, osd_req);
980917fc 1357 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1358}
1359
0c425248
AE
1360/*
1361 * The default/initial value for all image request flags is 0. Each
1362 * is conditionally set to 1 at image request initialization time
1363 * and currently never change thereafter.
1364 */
d0b2e944
AE
1365static void img_request_layered_set(struct rbd_img_request *img_request)
1366{
1367 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1368 smp_mb();
1369}
1370
a2acd00e
AE
1371static void img_request_layered_clear(struct rbd_img_request *img_request)
1372{
1373 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1374 smp_mb();
1375}
1376
d0b2e944
AE
1377static bool img_request_layered_test(struct rbd_img_request *img_request)
1378{
1379 smp_mb();
1380 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1381}
1382
3da691bf 1383static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
6e2a4505 1384{
3da691bf 1385 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
b9434c5b 1386
43df3d35
ID
1387 return !obj_req->ex.oe_off &&
1388 obj_req->ex.oe_len == rbd_dev->layout.object_size;
6e2a4505
AE
1389}
1390
3da691bf 1391static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
bf0d5f50 1392{
3da691bf 1393 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
bf0d5f50 1394
43df3d35 1395 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
3da691bf 1396 rbd_dev->layout.object_size;
0dcc685e
ID
1397}
1398
86bd7998 1399static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
bf0d5f50 1400{
86bd7998
ID
1401 return ceph_file_extents_bytes(obj_req->img_extents,
1402 obj_req->num_img_extents);
bf0d5f50
AE
1403}
1404
3da691bf 1405static bool rbd_img_is_write(struct rbd_img_request *img_req)
bf0d5f50 1406{
9bb0248d 1407 switch (img_req->op_type) {
3da691bf
ID
1408 case OBJ_OP_READ:
1409 return false;
1410 case OBJ_OP_WRITE:
1411 case OBJ_OP_DISCARD:
1412 return true;
1413 default:
c6244b3b 1414 BUG();
3da691bf 1415 }
90e98c52
GZ
1416}
1417
3da691bf 1418static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
2761713d 1419
85e084fe 1420static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50 1421{
3da691bf 1422 struct rbd_obj_request *obj_req = osd_req->r_priv;
bf0d5f50 1423
3da691bf
ID
1424 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1425 osd_req->r_result, obj_req);
1426 rbd_assert(osd_req == obj_req->osd_req);
bf0d5f50 1427
3da691bf
ID
1428 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1429 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1430 obj_req->xferred = osd_req->r_result;
1431 else
1432 /*
1433 * Writes aren't allowed to return a data payload. In some
1434 * guarded write cases (e.g. stat + zero on an empty object)
1435 * a stat response makes it through, but we don't care.
1436 */
1437 obj_req->xferred = 0;
bf0d5f50 1438
3da691bf 1439 rbd_obj_handle_request(obj_req);
bf0d5f50
AE
1440}
1441
9d4df01f 1442static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1443{
8c042b0d 1444 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1445
a162b308 1446 osd_req->r_flags = CEPH_OSD_FLAG_READ;
7c84883a 1447 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1448}
1449
1450static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1451{
9d4df01f 1452 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1453
a162b308 1454 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1134e091 1455 ktime_get_real_ts(&osd_req->r_mtime);
43df3d35 1456 osd_req->r_data_offset = obj_request->ex.oe_off;
430c28c3
AE
1457}
1458
bc81207e 1459static struct ceph_osd_request *
a162b308 1460rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
bc81207e 1461{
a162b308
ID
1462 struct rbd_img_request *img_req = obj_req->img_request;
1463 struct rbd_device *rbd_dev = img_req->rbd_dev;
bc81207e
ID
1464 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1465 struct ceph_osd_request *req;
a90bb0c1
ID
1466 const char *name_format = rbd_dev->image_format == 1 ?
1467 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e 1468
a162b308
ID
1469 req = ceph_osdc_alloc_request(osdc,
1470 (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1471 num_ops, false, GFP_NOIO);
bc81207e
ID
1472 if (!req)
1473 return NULL;
1474
bc81207e 1475 req->r_callback = rbd_osd_req_callback;
a162b308 1476 req->r_priv = obj_req;
bc81207e
ID
1477
1478 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
a90bb0c1 1479 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
43df3d35 1480 rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
bc81207e
ID
1481 goto err_req;
1482
1483 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1484 goto err_req;
1485
1486 return req;
1487
1488err_req:
1489 ceph_osdc_put_request(req);
1490 return NULL;
1491}
1492
bf0d5f50
AE
1493static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1494{
1495 ceph_osdc_put_request(osd_req);
1496}
1497
ecc633ca 1498static struct rbd_obj_request *rbd_obj_request_create(void)
bf0d5f50
AE
1499{
1500 struct rbd_obj_request *obj_request;
bf0d5f50 1501
5a60e876 1502 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 1503 if (!obj_request)
f907ad55 1504 return NULL;
f907ad55 1505
43df3d35 1506 ceph_object_extent_init(&obj_request->ex);
bf0d5f50
AE
1507 kref_init(&obj_request->kref);
1508
67e2b652 1509 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
1510 return obj_request;
1511}
1512
1513static void rbd_obj_request_destroy(struct kref *kref)
1514{
1515 struct rbd_obj_request *obj_request;
7e07efb1 1516 u32 i;
bf0d5f50
AE
1517
1518 obj_request = container_of(kref, struct rbd_obj_request, kref);
1519
37206ee5
AE
1520 dout("%s: obj %p\n", __func__, obj_request);
1521
bf0d5f50
AE
1522 if (obj_request->osd_req)
1523 rbd_osd_req_destroy(obj_request->osd_req);
1524
ecc633ca 1525 switch (obj_request->img_request->data_type) {
9969ebc5 1526 case OBJ_REQUEST_NODATA:
bf0d5f50 1527 case OBJ_REQUEST_BIO:
7e07efb1 1528 case OBJ_REQUEST_BVECS:
5359a17d 1529 break; /* Nothing to do */
afb97888
ID
1530 case OBJ_REQUEST_OWN_BVECS:
1531 kfree(obj_request->bvec_pos.bvecs);
788e2df3 1532 break;
7e07efb1
ID
1533 default:
1534 rbd_assert(0);
bf0d5f50
AE
1535 }
1536
86bd7998 1537 kfree(obj_request->img_extents);
7e07efb1
ID
1538 if (obj_request->copyup_bvecs) {
1539 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1540 if (obj_request->copyup_bvecs[i].bv_page)
1541 __free_page(obj_request->copyup_bvecs[i].bv_page);
1542 }
1543 kfree(obj_request->copyup_bvecs);
bf0d5f50
AE
1544 }
1545
868311b1 1546 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1547}
1548
fb65d228
AE
1549/* It's OK to call this for a device with no parent */
1550
1551static void rbd_spec_put(struct rbd_spec *spec);
1552static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1553{
1554 rbd_dev_remove_parent(rbd_dev);
1555 rbd_spec_put(rbd_dev->parent_spec);
1556 rbd_dev->parent_spec = NULL;
1557 rbd_dev->parent_overlap = 0;
1558}
1559
a2acd00e
AE
1560/*
1561 * Parent image reference counting is used to determine when an
1562 * image's parent fields can be safely torn down--after there are no
1563 * more in-flight requests to the parent image. When the last
1564 * reference is dropped, cleaning them up is safe.
1565 */
1566static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1567{
1568 int counter;
1569
1570 if (!rbd_dev->parent_spec)
1571 return;
1572
1573 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1574 if (counter > 0)
1575 return;
1576
1577 /* Last reference; clean up parent data structures */
1578
1579 if (!counter)
1580 rbd_dev_unparent(rbd_dev);
1581 else
9584d508 1582 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
1583}
1584
1585/*
1586 * If an image has a non-zero parent overlap, get a reference to its
1587 * parent.
1588 *
1589 * Returns true if the rbd device has a parent with a non-zero
1590 * overlap and a reference for it was successfully taken, or
1591 * false otherwise.
1592 */
1593static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1594{
ae43e9d0 1595 int counter = 0;
a2acd00e
AE
1596
1597 if (!rbd_dev->parent_spec)
1598 return false;
1599
ae43e9d0
ID
1600 down_read(&rbd_dev->header_rwsem);
1601 if (rbd_dev->parent_overlap)
1602 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1603 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
1604
1605 if (counter < 0)
9584d508 1606 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 1607
ae43e9d0 1608 return counter > 0;
a2acd00e
AE
1609}
1610
bf0d5f50
AE
1611/*
1612 * Caller is responsible for filling in the list of object requests
1613 * that comprises the image request, and the Linux request pointer
1614 * (if there is one).
1615 */
cc344fa1
AE
1616static struct rbd_img_request *rbd_img_request_create(
1617 struct rbd_device *rbd_dev,
6d2940c8 1618 enum obj_operation_type op_type,
4e752f0a 1619 struct ceph_snap_context *snapc)
bf0d5f50
AE
1620{
1621 struct rbd_img_request *img_request;
bf0d5f50 1622
a0c5895b 1623 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
1624 if (!img_request)
1625 return NULL;
1626
bf0d5f50 1627 img_request->rbd_dev = rbd_dev;
9bb0248d 1628 img_request->op_type = op_type;
9bb0248d 1629 if (!rbd_img_is_write(img_request))
bf0d5f50 1630 img_request->snap_id = rbd_dev->spec->snap_id;
9bb0248d
ID
1631 else
1632 img_request->snapc = snapc;
1633
a2acd00e 1634 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1635 img_request_layered_set(img_request);
a0c5895b 1636
bf0d5f50 1637 spin_lock_init(&img_request->completion_lock);
43df3d35 1638 INIT_LIST_HEAD(&img_request->object_extents);
bf0d5f50
AE
1639 kref_init(&img_request->kref);
1640
dfd9875f
ID
1641 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1642 obj_op_name(op_type), img_request);
bf0d5f50
AE
1643 return img_request;
1644}
1645
1646static void rbd_img_request_destroy(struct kref *kref)
1647{
1648 struct rbd_img_request *img_request;
1649 struct rbd_obj_request *obj_request;
1650 struct rbd_obj_request *next_obj_request;
1651
1652 img_request = container_of(kref, struct rbd_img_request, kref);
1653
37206ee5
AE
1654 dout("%s: img %p\n", __func__, img_request);
1655
bf0d5f50
AE
1656 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1657 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1658 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1659
a2acd00e
AE
1660 if (img_request_layered_test(img_request)) {
1661 img_request_layered_clear(img_request);
1662 rbd_dev_parent_put(img_request->rbd_dev);
1663 }
1664
9bb0248d 1665 if (rbd_img_is_write(img_request))
812164f8 1666 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1667
1c2a9dfe 1668 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1669}
1670
86bd7998
ID
1671static void prune_extents(struct ceph_file_extent *img_extents,
1672 u32 *num_img_extents, u64 overlap)
e93f3152 1673{
86bd7998 1674 u32 cnt = *num_img_extents;
e93f3152 1675
86bd7998
ID
1676 /* drop extents completely beyond the overlap */
1677 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1678 cnt--;
e93f3152 1679
86bd7998
ID
1680 if (cnt) {
1681 struct ceph_file_extent *ex = &img_extents[cnt - 1];
e93f3152 1682
86bd7998
ID
1683 /* trim final overlapping extent */
1684 if (ex->fe_off + ex->fe_len > overlap)
1685 ex->fe_len = overlap - ex->fe_off;
1686 }
e93f3152 1687
86bd7998 1688 *num_img_extents = cnt;
e93f3152
AE
1689}
1690
86bd7998
ID
1691/*
1692 * Determine the byte range(s) covered by either just the object extent
1693 * or the entire object in the parent image.
1694 */
1695static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1696 bool entire)
e93f3152 1697{
86bd7998
ID
1698 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1699 int ret;
e93f3152 1700
86bd7998
ID
1701 if (!rbd_dev->parent_overlap)
1702 return 0;
e93f3152 1703
86bd7998
ID
1704 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1705 entire ? 0 : obj_req->ex.oe_off,
1706 entire ? rbd_dev->layout.object_size :
1707 obj_req->ex.oe_len,
1708 &obj_req->img_extents,
1709 &obj_req->num_img_extents);
1710 if (ret)
1711 return ret;
e93f3152 1712
86bd7998
ID
1713 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1714 rbd_dev->parent_overlap);
1715 return 0;
e93f3152
AE
1716}
1717
3da691bf 1718static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1217857f 1719{
ecc633ca 1720 switch (obj_req->img_request->data_type) {
3da691bf
ID
1721 case OBJ_REQUEST_BIO:
1722 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1723 &obj_req->bio_pos,
43df3d35 1724 obj_req->ex.oe_len);
3da691bf
ID
1725 break;
1726 case OBJ_REQUEST_BVECS:
afb97888 1727 case OBJ_REQUEST_OWN_BVECS:
3da691bf 1728 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
43df3d35 1729 obj_req->ex.oe_len);
afb97888 1730 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
3da691bf
ID
1731 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1732 &obj_req->bvec_pos);
1733 break;
1734 default:
1735 rbd_assert(0);
1217857f 1736 }
3da691bf 1737}
1217857f 1738
3da691bf
ID
1739static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1740{
a162b308 1741 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
3da691bf
ID
1742 if (!obj_req->osd_req)
1743 return -ENOMEM;
2a842aca 1744
3da691bf 1745 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
43df3d35 1746 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf 1747 rbd_osd_req_setup_data(obj_req, 0);
7ad18afa 1748
3da691bf
ID
1749 rbd_osd_req_format_read(obj_req);
1750 return 0;
1751}
1752
1753static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1754 unsigned int which)
1755{
1756 struct page **pages;
8b3e1a56 1757
3da691bf
ID
1758 /*
1759 * The response data for a STAT call consists of:
1760 * le64 length;
1761 * struct {
1762 * le32 tv_sec;
1763 * le32 tv_nsec;
1764 * } mtime;
1765 */
1766 pages = ceph_alloc_page_vector(1, GFP_NOIO);
1767 if (IS_ERR(pages))
1768 return PTR_ERR(pages);
1769
1770 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1771 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1772 8 + sizeof(struct ceph_timespec),
1773 0, false, true);
1774 return 0;
1217857f
AE
1775}
1776
3da691bf
ID
1777static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1778 unsigned int which)
2169238d 1779{
3da691bf
ID
1780 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1781 u16 opcode;
2169238d 1782
3da691bf
ID
1783 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1784 rbd_dev->layout.object_size,
1785 rbd_dev->layout.object_size);
2169238d 1786
3da691bf
ID
1787 if (rbd_obj_is_entire(obj_req))
1788 opcode = CEPH_OSD_OP_WRITEFULL;
1789 else
1790 opcode = CEPH_OSD_OP_WRITE;
2169238d 1791
3da691bf 1792 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
43df3d35 1793 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf 1794 rbd_osd_req_setup_data(obj_req, which++);
2169238d 1795
3da691bf
ID
1796 rbd_assert(which == obj_req->osd_req->r_num_ops);
1797 rbd_osd_req_format_write(obj_req);
1798}
2169238d 1799
3da691bf
ID
1800static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1801{
3da691bf
ID
1802 unsigned int num_osd_ops, which = 0;
1803 int ret;
1804
86bd7998
ID
1805 /* reverse map the entire object onto the parent */
1806 ret = rbd_obj_calc_img_extents(obj_req, true);
1807 if (ret)
1808 return ret;
1809
1810 if (obj_req->num_img_extents) {
3da691bf
ID
1811 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1812 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1813 } else {
1814 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1815 num_osd_ops = 2; /* setallochint + write/writefull */
2169238d
AE
1816 }
1817
a162b308 1818 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1819 if (!obj_req->osd_req)
1820 return -ENOMEM;
2169238d 1821
86bd7998 1822 if (obj_req->num_img_extents) {
3da691bf
ID
1823 ret = __rbd_obj_setup_stat(obj_req, which++);
1824 if (ret)
1825 return ret;
1826 }
1827
1828 __rbd_obj_setup_write(obj_req, which);
1829 return 0;
2169238d
AE
1830}
1831
3da691bf
ID
1832static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1833 unsigned int which)
1834{
3b434a2a
JD
1835 u16 opcode;
1836
3da691bf 1837 if (rbd_obj_is_entire(obj_req)) {
86bd7998 1838 if (obj_req->num_img_extents) {
2bb1e56e
ID
1839 osd_req_op_init(obj_req->osd_req, which++,
1840 CEPH_OSD_OP_CREATE, 0);
3b434a2a
JD
1841 opcode = CEPH_OSD_OP_TRUNCATE;
1842 } else {
3da691bf
ID
1843 osd_req_op_init(obj_req->osd_req, which++,
1844 CEPH_OSD_OP_DELETE, 0);
1845 opcode = 0;
3b434a2a 1846 }
3da691bf
ID
1847 } else if (rbd_obj_is_tail(obj_req)) {
1848 opcode = CEPH_OSD_OP_TRUNCATE;
3b434a2a 1849 } else {
3da691bf 1850 opcode = CEPH_OSD_OP_ZERO;
3b434a2a
JD
1851 }
1852
3da691bf
ID
1853 if (opcode)
1854 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
43df3d35 1855 obj_req->ex.oe_off, obj_req->ex.oe_len,
3da691bf
ID
1856 0, 0);
1857
1858 rbd_assert(which == obj_req->osd_req->r_num_ops);
1859 rbd_osd_req_format_write(obj_req);
3b434a2a
JD
1860}
1861
3da691bf 1862static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
bf0d5f50 1863{
3da691bf
ID
1864 unsigned int num_osd_ops, which = 0;
1865 int ret;
37206ee5 1866
86bd7998
ID
1867 /* reverse map the entire object onto the parent */
1868 ret = rbd_obj_calc_img_extents(obj_req, true);
1869 if (ret)
1870 return ret;
f1a4739f 1871
3da691bf
ID
1872 if (rbd_obj_is_entire(obj_req)) {
1873 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2bb1e56e
ID
1874 if (obj_req->num_img_extents)
1875 num_osd_ops = 2; /* create + truncate */
1876 else
1877 num_osd_ops = 1; /* delete */
3da691bf 1878 } else {
86bd7998 1879 if (obj_req->num_img_extents) {
3da691bf
ID
1880 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1881 num_osd_ops = 2; /* stat + truncate/zero */
1882 } else {
1883 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1884 num_osd_ops = 1; /* truncate/zero */
1885 }
f1a4739f
AE
1886 }
1887
a162b308 1888 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1889 if (!obj_req->osd_req)
1890 return -ENOMEM;
bf0d5f50 1891
86bd7998 1892 if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
3da691bf
ID
1893 ret = __rbd_obj_setup_stat(obj_req, which++);
1894 if (ret)
1895 return ret;
1896 }
3b434a2a 1897
3da691bf
ID
1898 __rbd_obj_setup_discard(obj_req, which);
1899 return 0;
1900}
9d4df01f 1901
3da691bf
ID
1902/*
1903 * For each object request in @img_req, allocate an OSD request, add
1904 * individual OSD ops and prepare them for submission. The number of
1905 * OSD ops depends on op_type and the overlap point (if any).
1906 */
1907static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1908{
1909 struct rbd_obj_request *obj_req;
1910 int ret;
430c28c3 1911
3da691bf 1912 for_each_obj_request(img_req, obj_req) {
9bb0248d 1913 switch (img_req->op_type) {
3da691bf
ID
1914 case OBJ_OP_READ:
1915 ret = rbd_obj_setup_read(obj_req);
1916 break;
1917 case OBJ_OP_WRITE:
1918 ret = rbd_obj_setup_write(obj_req);
1919 break;
1920 case OBJ_OP_DISCARD:
1921 ret = rbd_obj_setup_discard(obj_req);
1922 break;
1923 default:
1924 rbd_assert(0);
1925 }
1926 if (ret)
1927 return ret;
bf0d5f50
AE
1928 }
1929
1930 return 0;
3da691bf 1931}
bf0d5f50 1932
5a237819
ID
1933union rbd_img_fill_iter {
1934 struct ceph_bio_iter bio_iter;
1935 struct ceph_bvec_iter bvec_iter;
1936};
bf0d5f50 1937
5a237819
ID
1938struct rbd_img_fill_ctx {
1939 enum obj_request_type pos_type;
1940 union rbd_img_fill_iter *pos;
1941 union rbd_img_fill_iter iter;
1942 ceph_object_extent_fn_t set_pos_fn;
afb97888
ID
1943 ceph_object_extent_fn_t count_fn;
1944 ceph_object_extent_fn_t copy_fn;
5a237819 1945};
bf0d5f50 1946
5a237819 1947static struct ceph_object_extent *alloc_object_extent(void *arg)
0eefd470 1948{
5a237819
ID
1949 struct rbd_img_request *img_req = arg;
1950 struct rbd_obj_request *obj_req;
0eefd470 1951
5a237819
ID
1952 obj_req = rbd_obj_request_create();
1953 if (!obj_req)
1954 return NULL;
2761713d 1955
5a237819
ID
1956 rbd_img_obj_request_add(img_req, obj_req);
1957 return &obj_req->ex;
1958}
0eefd470 1959
afb97888
ID
1960/*
1961 * While su != os && sc == 1 is technically not fancy (it's the same
1962 * layout as su == os && sc == 1), we can't use the nocopy path for it
1963 * because ->set_pos_fn() should be called only once per object.
1964 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1965 * treat su != os && sc == 1 as fancy.
1966 */
1967static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1968{
1969 return l->stripe_unit != l->object_size;
1970}
0eefd470 1971
afb97888
ID
1972static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1973 struct ceph_file_extent *img_extents,
1974 u32 num_img_extents,
1975 struct rbd_img_fill_ctx *fctx)
1976{
1977 u32 i;
1978 int ret;
1979
1980 img_req->data_type = fctx->pos_type;
0eefd470
AE
1981
1982 /*
afb97888
ID
1983 * Create object requests and set each object request's starting
1984 * position in the provided bio (list) or bio_vec array.
0eefd470 1985 */
afb97888
ID
1986 fctx->iter = *fctx->pos;
1987 for (i = 0; i < num_img_extents; i++) {
1988 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
1989 img_extents[i].fe_off,
1990 img_extents[i].fe_len,
1991 &img_req->object_extents,
1992 alloc_object_extent, img_req,
1993 fctx->set_pos_fn, &fctx->iter);
1994 if (ret)
1995 return ret;
1996 }
0eefd470 1997
afb97888 1998 return __rbd_img_fill_request(img_req);
0eefd470
AE
1999}
2000
5a237819
ID
2001/*
2002 * Map a list of image extents to a list of object extents, create the
2003 * corresponding object requests (normally each to a different object,
2004 * but not always) and add them to @img_req. For each object request,
afb97888 2005 * set up its data descriptor to point to the corresponding chunk(s) of
5a237819
ID
2006 * @fctx->pos data buffer.
2007 *
afb97888
ID
2008 * Because ceph_file_to_extents() will merge adjacent object extents
2009 * together, each object request's data descriptor may point to multiple
2010 * different chunks of @fctx->pos data buffer.
2011 *
5a237819
ID
2012 * @fctx->pos data buffer is assumed to be large enough.
2013 */
2014static int rbd_img_fill_request(struct rbd_img_request *img_req,
2015 struct ceph_file_extent *img_extents,
2016 u32 num_img_extents,
2017 struct rbd_img_fill_ctx *fctx)
3d7efd18 2018{
afb97888
ID
2019 struct rbd_device *rbd_dev = img_req->rbd_dev;
2020 struct rbd_obj_request *obj_req;
5a237819
ID
2021 u32 i;
2022 int ret;
2023
afb97888
ID
2024 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2025 !rbd_layout_is_fancy(&rbd_dev->layout))
2026 return rbd_img_fill_request_nocopy(img_req, img_extents,
2027 num_img_extents, fctx);
3d7efd18 2028
afb97888 2029 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
0eefd470 2030
bbea1c1a 2031 /*
afb97888
ID
2032 * Create object requests and determine ->bvec_count for each object
2033 * request. Note that ->bvec_count sum over all object requests may
2034 * be greater than the number of bio_vecs in the provided bio (list)
2035 * or bio_vec array because when mapped, those bio_vecs can straddle
2036 * stripe unit boundaries.
bbea1c1a 2037 */
5a237819
ID
2038 fctx->iter = *fctx->pos;
2039 for (i = 0; i < num_img_extents; i++) {
afb97888 2040 ret = ceph_file_to_extents(&rbd_dev->layout,
5a237819
ID
2041 img_extents[i].fe_off,
2042 img_extents[i].fe_len,
2043 &img_req->object_extents,
2044 alloc_object_extent, img_req,
afb97888
ID
2045 fctx->count_fn, &fctx->iter);
2046 if (ret)
2047 return ret;
bbea1c1a 2048 }
0eefd470 2049
afb97888
ID
2050 for_each_obj_request(img_req, obj_req) {
2051 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2052 sizeof(*obj_req->bvec_pos.bvecs),
2053 GFP_NOIO);
2054 if (!obj_req->bvec_pos.bvecs)
2055 return -ENOMEM;
2056 }
0eefd470 2057
8785b1d4 2058 /*
afb97888
ID
2059 * Fill in each object request's private bio_vec array, splitting and
2060 * rearranging the provided bio_vecs in stripe unit chunks as needed.
8785b1d4 2061 */
afb97888
ID
2062 fctx->iter = *fctx->pos;
2063 for (i = 0; i < num_img_extents; i++) {
2064 ret = ceph_iterate_extents(&rbd_dev->layout,
2065 img_extents[i].fe_off,
2066 img_extents[i].fe_len,
2067 &img_req->object_extents,
2068 fctx->copy_fn, &fctx->iter);
5a237819
ID
2069 if (ret)
2070 return ret;
2071 }
3d7efd18 2072
5a237819
ID
2073 return __rbd_img_fill_request(img_req);
2074}
2075
2076static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2077 u64 off, u64 len)
2078{
2079 struct ceph_file_extent ex = { off, len };
2080 union rbd_img_fill_iter dummy;
2081 struct rbd_img_fill_ctx fctx = {
2082 .pos_type = OBJ_REQUEST_NODATA,
2083 .pos = &dummy,
2084 };
2085
2086 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2087}
2088
2089static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2090{
2091 struct rbd_obj_request *obj_req =
2092 container_of(ex, struct rbd_obj_request, ex);
2093 struct ceph_bio_iter *it = arg;
3d7efd18 2094
5a237819
ID
2095 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2096 obj_req->bio_pos = *it;
2097 ceph_bio_iter_advance(it, bytes);
2098}
3d7efd18 2099
afb97888
ID
2100static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2101{
2102 struct rbd_obj_request *obj_req =
2103 container_of(ex, struct rbd_obj_request, ex);
2104 struct ceph_bio_iter *it = arg;
0eefd470 2105
afb97888
ID
2106 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2107 ceph_bio_iter_advance_step(it, bytes, ({
2108 obj_req->bvec_count++;
2109 }));
0eefd470 2110
afb97888 2111}
0eefd470 2112
afb97888
ID
2113static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2114{
2115 struct rbd_obj_request *obj_req =
2116 container_of(ex, struct rbd_obj_request, ex);
2117 struct ceph_bio_iter *it = arg;
0eefd470 2118
afb97888
ID
2119 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2120 ceph_bio_iter_advance_step(it, bytes, ({
2121 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2122 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2123 }));
3d7efd18
AE
2124}
2125
5a237819
ID
2126static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2127 struct ceph_file_extent *img_extents,
2128 u32 num_img_extents,
2129 struct ceph_bio_iter *bio_pos)
2130{
2131 struct rbd_img_fill_ctx fctx = {
2132 .pos_type = OBJ_REQUEST_BIO,
2133 .pos = (union rbd_img_fill_iter *)bio_pos,
2134 .set_pos_fn = set_bio_pos,
afb97888
ID
2135 .count_fn = count_bio_bvecs,
2136 .copy_fn = copy_bio_bvecs,
5a237819 2137 };
3d7efd18 2138
5a237819
ID
2139 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2140 &fctx);
2141}
3d7efd18 2142
5a237819
ID
2143static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2144 u64 off, u64 len, struct bio *bio)
2145{
2146 struct ceph_file_extent ex = { off, len };
2147 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
3d7efd18 2148
5a237819
ID
2149 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2150}
a9e8ba2c 2151
5a237819
ID
2152static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2153{
2154 struct rbd_obj_request *obj_req =
2155 container_of(ex, struct rbd_obj_request, ex);
2156 struct ceph_bvec_iter *it = arg;
3d7efd18 2157
5a237819
ID
2158 obj_req->bvec_pos = *it;
2159 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2160 ceph_bvec_iter_advance(it, bytes);
2161}
3d7efd18 2162
afb97888
ID
2163static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2164{
2165 struct rbd_obj_request *obj_req =
2166 container_of(ex, struct rbd_obj_request, ex);
2167 struct ceph_bvec_iter *it = arg;
058aa991 2168
afb97888
ID
2169 ceph_bvec_iter_advance_step(it, bytes, ({
2170 obj_req->bvec_count++;
2171 }));
2172}
058aa991 2173
afb97888
ID
2174static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2175{
2176 struct rbd_obj_request *obj_req =
2177 container_of(ex, struct rbd_obj_request, ex);
2178 struct ceph_bvec_iter *it = arg;
3d7efd18 2179
afb97888
ID
2180 ceph_bvec_iter_advance_step(it, bytes, ({
2181 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2182 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2183 }));
3d7efd18
AE
2184}
2185
5a237819
ID
2186static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2187 struct ceph_file_extent *img_extents,
2188 u32 num_img_extents,
2189 struct ceph_bvec_iter *bvec_pos)
c5b5ef6c 2190{
5a237819
ID
2191 struct rbd_img_fill_ctx fctx = {
2192 .pos_type = OBJ_REQUEST_BVECS,
2193 .pos = (union rbd_img_fill_iter *)bvec_pos,
2194 .set_pos_fn = set_bvec_pos,
afb97888
ID
2195 .count_fn = count_bvecs,
2196 .copy_fn = copy_bvecs,
5a237819 2197 };
c5b5ef6c 2198
5a237819
ID
2199 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2200 &fctx);
2201}
c5b5ef6c 2202
5a237819
ID
2203static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2204 struct ceph_file_extent *img_extents,
2205 u32 num_img_extents,
2206 struct bio_vec *bvecs)
2207{
2208 struct ceph_bvec_iter it = {
2209 .bvecs = bvecs,
2210 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2211 num_img_extents) },
2212 };
c5b5ef6c 2213
5a237819
ID
2214 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2215 &it);
2216}
c5b5ef6c 2217
efbd1a11 2218static void rbd_img_request_submit(struct rbd_img_request *img_request)
bf0d5f50 2219{
bf0d5f50 2220 struct rbd_obj_request *obj_request;
c5b5ef6c 2221
37206ee5 2222 dout("%s: img %p\n", __func__, img_request);
c2e82414 2223
663ae2cc 2224 rbd_img_request_get(img_request);
efbd1a11 2225 for_each_obj_request(img_request, obj_request)
3da691bf 2226 rbd_obj_request_submit(obj_request);
c2e82414 2227
663ae2cc 2228 rbd_img_request_put(img_request);
c5b5ef6c
AE
2229}
2230
86bd7998 2231static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
c5b5ef6c 2232{
3da691bf
ID
2233 struct rbd_img_request *img_req = obj_req->img_request;
2234 struct rbd_img_request *child_img_req;
c5b5ef6c
AE
2235 int ret;
2236
e93aca0a
ID
2237 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2238 OBJ_OP_READ, NULL);
3da691bf 2239 if (!child_img_req)
710214e3
ID
2240 return -ENOMEM;
2241
e93aca0a
ID
2242 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2243 child_img_req->obj_request = obj_req;
a90bb0c1 2244
3da691bf 2245 if (!rbd_img_is_write(img_req)) {
ecc633ca 2246 switch (img_req->data_type) {
3da691bf 2247 case OBJ_REQUEST_BIO:
5a237819
ID
2248 ret = __rbd_img_fill_from_bio(child_img_req,
2249 obj_req->img_extents,
2250 obj_req->num_img_extents,
2251 &obj_req->bio_pos);
3da691bf
ID
2252 break;
2253 case OBJ_REQUEST_BVECS:
afb97888 2254 case OBJ_REQUEST_OWN_BVECS:
5a237819
ID
2255 ret = __rbd_img_fill_from_bvecs(child_img_req,
2256 obj_req->img_extents,
2257 obj_req->num_img_extents,
2258 &obj_req->bvec_pos);
3da691bf
ID
2259 break;
2260 default:
2261 rbd_assert(0);
2262 }
2263 } else {
5a237819
ID
2264 ret = rbd_img_fill_from_bvecs(child_img_req,
2265 obj_req->img_extents,
2266 obj_req->num_img_extents,
2267 obj_req->copyup_bvecs);
3da691bf
ID
2268 }
2269 if (ret) {
2270 rbd_img_request_put(child_img_req);
2271 return ret;
2272 }
2273
2274 rbd_img_request_submit(child_img_req);
2275 return 0;
2276}
2277
2278static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2279{
2280 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2281 int ret;
2282
2283 if (obj_req->result == -ENOENT &&
86bd7998
ID
2284 rbd_dev->parent_overlap && !obj_req->tried_parent) {
2285 /* reverse map this object extent onto the parent */
2286 ret = rbd_obj_calc_img_extents(obj_req, false);
3da691bf
ID
2287 if (ret) {
2288 obj_req->result = ret;
2289 return true;
2290 }
86bd7998
ID
2291
2292 if (obj_req->num_img_extents) {
2293 obj_req->tried_parent = true;
2294 ret = rbd_obj_read_from_parent(obj_req);
2295 if (ret) {
2296 obj_req->result = ret;
2297 return true;
2298 }
2299 return false;
2300 }
710214e3
ID
2301 }
2302
c5b5ef6c 2303 /*
3da691bf
ID
2304 * -ENOENT means a hole in the image -- zero-fill the entire
2305 * length of the request. A short read also implies zero-fill
2306 * to the end of the request. In both cases we update xferred
2307 * count to indicate the whole request was satisfied.
c5b5ef6c 2308 */
3da691bf 2309 if (obj_req->result == -ENOENT ||
43df3d35 2310 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
3da691bf
ID
2311 rbd_assert(!obj_req->xferred || !obj_req->result);
2312 rbd_obj_zero_range(obj_req, obj_req->xferred,
43df3d35 2313 obj_req->ex.oe_len - obj_req->xferred);
3da691bf 2314 obj_req->result = 0;
43df3d35 2315 obj_req->xferred = obj_req->ex.oe_len;
710214e3 2316 }
c5b5ef6c 2317
3da691bf
ID
2318 return true;
2319}
c5b5ef6c 2320
3da691bf
ID
2321/*
2322 * copyup_bvecs pages are never highmem pages
2323 */
2324static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2325{
2326 struct ceph_bvec_iter it = {
2327 .bvecs = bvecs,
2328 .iter = { .bi_size = bytes },
2329 };
c5b5ef6c 2330
3da691bf
ID
2331 ceph_bvec_iter_advance_step(&it, bytes, ({
2332 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2333 bv.bv_len))
2334 return false;
2335 }));
2336 return true;
c5b5ef6c
AE
2337}
2338
3da691bf 2339static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
b454e36d 2340{
3da691bf 2341 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
70d045f6 2342
3da691bf
ID
2343 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2344 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2345 rbd_osd_req_destroy(obj_req->osd_req);
70d045f6 2346
b454e36d 2347 /*
3da691bf
ID
2348 * Create a copyup request with the same number of OSD ops as
2349 * the original request. The original request was stat + op(s),
2350 * the new copyup request will be copyup + the same op(s).
b454e36d 2351 */
a162b308 2352 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
2353 if (!obj_req->osd_req)
2354 return -ENOMEM;
b454e36d 2355
c622d226 2356 /*
3da691bf
ID
2357 * Only send non-zero copyup data to save some I/O and network
2358 * bandwidth -- zero copyup data is equivalent to the object not
2359 * existing.
c622d226 2360 */
3da691bf
ID
2361 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2362 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2363 bytes = 0;
2364 }
c622d226 2365
3da691bf
ID
2366 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2367 "copyup");
2368 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2369 obj_req->copyup_bvecs, bytes);
2370
9bb0248d 2371 switch (obj_req->img_request->op_type) {
3da691bf
ID
2372 case OBJ_OP_WRITE:
2373 __rbd_obj_setup_write(obj_req, 1);
2374 break;
2375 case OBJ_OP_DISCARD:
2376 rbd_assert(!rbd_obj_is_entire(obj_req));
2377 __rbd_obj_setup_discard(obj_req, 1);
2378 break;
2379 default:
2380 rbd_assert(0);
2381 }
70d045f6 2382
3da691bf 2383 rbd_obj_request_submit(obj_req);
3da691bf 2384 return 0;
70d045f6
ID
2385}
2386
7e07efb1 2387static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
70d045f6 2388{
7e07efb1 2389 u32 i;
b454e36d 2390
7e07efb1
ID
2391 rbd_assert(!obj_req->copyup_bvecs);
2392 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2393 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2394 sizeof(*obj_req->copyup_bvecs),
2395 GFP_NOIO);
2396 if (!obj_req->copyup_bvecs)
2397 return -ENOMEM;
b454e36d 2398
7e07efb1
ID
2399 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2400 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2401
2402 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2403 if (!obj_req->copyup_bvecs[i].bv_page)
2404 return -ENOMEM;
3d7efd18 2405
7e07efb1
ID
2406 obj_req->copyup_bvecs[i].bv_offset = 0;
2407 obj_req->copyup_bvecs[i].bv_len = len;
2408 obj_overlap -= len;
2409 }
b454e36d 2410
7e07efb1
ID
2411 rbd_assert(!obj_overlap);
2412 return 0;
b454e36d
AE
2413}
2414
3da691bf 2415static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
bf0d5f50 2416{
3da691bf 2417 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3da691bf 2418 int ret;
bf0d5f50 2419
86bd7998
ID
2420 rbd_assert(obj_req->num_img_extents);
2421 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2422 rbd_dev->parent_overlap);
2423 if (!obj_req->num_img_extents) {
3da691bf
ID
2424 /*
2425 * The overlap has become 0 (most likely because the
2426 * image has been flattened). Use rbd_obj_issue_copyup()
2427 * to re-submit the original write request -- the copyup
2428 * operation itself will be a no-op, since someone must
2429 * have populated the child object while we weren't
2430 * looking. Move to WRITE_FLAT state as we'll be done
2431 * with the operation once the null copyup completes.
2432 */
2433 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2434 return rbd_obj_issue_copyup(obj_req, 0);
bf0d5f50
AE
2435 }
2436
86bd7998 2437 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3da691bf
ID
2438 if (ret)
2439 return ret;
2440
2441 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
86bd7998 2442 return rbd_obj_read_from_parent(obj_req);
bf0d5f50 2443}
8b3e1a56 2444
3da691bf 2445static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
8b3e1a56 2446{
3da691bf 2447 int ret;
8b3e1a56 2448
3da691bf
ID
2449again:
2450 switch (obj_req->write_state) {
2451 case RBD_OBJ_WRITE_GUARD:
2452 rbd_assert(!obj_req->xferred);
2453 if (obj_req->result == -ENOENT) {
2454 /*
2455 * The target object doesn't exist. Read the data for
2456 * the entire target object up to the overlap point (if
2457 * any) from the parent, so we can use it for a copyup.
2458 */
2459 ret = rbd_obj_handle_write_guard(obj_req);
2460 if (ret) {
2461 obj_req->result = ret;
2462 return true;
2463 }
2464 return false;
2465 }
2466 /* fall through */
2467 case RBD_OBJ_WRITE_FLAT:
2468 if (!obj_req->result)
2469 /*
2470 * There is no such thing as a successful short
2471 * write -- indicate the whole request was satisfied.
2472 */
43df3d35 2473 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2474 return true;
2475 case RBD_OBJ_WRITE_COPYUP:
2476 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2477 if (obj_req->result)
2478 goto again;
8b3e1a56 2479
3da691bf
ID
2480 rbd_assert(obj_req->xferred);
2481 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2482 if (ret) {
2483 obj_req->result = ret;
2484 return true;
2485 }
2486 return false;
2487 default:
c6244b3b 2488 BUG();
3da691bf
ID
2489 }
2490}
02c74fba 2491
3da691bf
ID
2492/*
2493 * Returns true if @obj_req is completed, or false otherwise.
2494 */
2495static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2496{
9bb0248d 2497 switch (obj_req->img_request->op_type) {
3da691bf
ID
2498 case OBJ_OP_READ:
2499 return rbd_obj_handle_read(obj_req);
2500 case OBJ_OP_WRITE:
2501 return rbd_obj_handle_write(obj_req);
2502 case OBJ_OP_DISCARD:
2503 if (rbd_obj_handle_write(obj_req)) {
2504 /*
2505 * Hide -ENOENT from delete/truncate/zero -- discarding
2506 * a non-existent object is not a problem.
2507 */
2508 if (obj_req->result == -ENOENT) {
2509 obj_req->result = 0;
43df3d35 2510 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2511 }
2512 return true;
2513 }
2514 return false;
2515 default:
c6244b3b 2516 BUG();
3da691bf
ID
2517 }
2518}
02c74fba 2519
7114edac
ID
2520static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2521{
2522 struct rbd_img_request *img_req = obj_req->img_request;
2523
2524 rbd_assert((!obj_req->result &&
43df3d35 2525 obj_req->xferred == obj_req->ex.oe_len) ||
7114edac
ID
2526 (obj_req->result < 0 && !obj_req->xferred));
2527 if (!obj_req->result) {
2528 img_req->xferred += obj_req->xferred;
980917fc 2529 return;
02c74fba 2530 }
a9e8ba2c 2531
7114edac
ID
2532 rbd_warn(img_req->rbd_dev,
2533 "%s at objno %llu %llu~%llu result %d xferred %llu",
43df3d35
ID
2534 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2535 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
7114edac
ID
2536 obj_req->xferred);
2537 if (!img_req->result) {
2538 img_req->result = obj_req->result;
2539 img_req->xferred = 0;
2540 }
2541}
a9e8ba2c 2542
3da691bf
ID
2543static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2544{
2545 struct rbd_obj_request *obj_req = img_req->obj_request;
a9e8ba2c 2546
3da691bf 2547 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
86bd7998
ID
2548 rbd_assert((!img_req->result &&
2549 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2550 (img_req->result < 0 && !img_req->xferred));
8b3e1a56 2551
3da691bf
ID
2552 obj_req->result = img_req->result;
2553 obj_req->xferred = img_req->xferred;
2554 rbd_img_request_put(img_req);
8b3e1a56
AE
2555}
2556
7114edac 2557static void rbd_img_end_request(struct rbd_img_request *img_req)
8b3e1a56 2558{
7114edac
ID
2559 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2560 rbd_assert((!img_req->result &&
2561 img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2562 (img_req->result < 0 && !img_req->xferred));
8b3e1a56 2563
7114edac
ID
2564 blk_mq_end_request(img_req->rq,
2565 errno_to_blk_status(img_req->result));
2566 rbd_img_request_put(img_req);
3da691bf 2567}
8b3e1a56 2568
3da691bf
ID
2569static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2570{
7114edac 2571 struct rbd_img_request *img_req;
8b3e1a56 2572
7114edac 2573again:
3da691bf
ID
2574 if (!__rbd_obj_handle_request(obj_req))
2575 return;
8b3e1a56 2576
7114edac
ID
2577 img_req = obj_req->img_request;
2578 spin_lock(&img_req->completion_lock);
2579 rbd_obj_end_request(obj_req);
2580 rbd_assert(img_req->pending_count);
2581 if (--img_req->pending_count) {
2582 spin_unlock(&img_req->completion_lock);
2583 return;
2584 }
8b3e1a56 2585
7114edac
ID
2586 spin_unlock(&img_req->completion_lock);
2587 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2588 obj_req = img_req->obj_request;
2589 rbd_img_end_child_request(img_req);
2590 goto again;
2591 }
2592 rbd_img_end_request(img_req);
8b3e1a56 2593}
bf0d5f50 2594
ed95b21a 2595static const struct rbd_client_id rbd_empty_cid;
b8d70035 2596
ed95b21a
ID
2597static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2598 const struct rbd_client_id *rhs)
2599{
2600 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2601}
2602
2603static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2604{
2605 struct rbd_client_id cid;
2606
2607 mutex_lock(&rbd_dev->watch_mutex);
2608 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2609 cid.handle = rbd_dev->watch_cookie;
2610 mutex_unlock(&rbd_dev->watch_mutex);
2611 return cid;
2612}
2613
2614/*
2615 * lock_rwsem must be held for write
2616 */
2617static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2618 const struct rbd_client_id *cid)
2619{
2620 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2621 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2622 cid->gid, cid->handle);
2623 rbd_dev->owner_cid = *cid; /* struct */
2624}
2625
2626static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2627{
2628 mutex_lock(&rbd_dev->watch_mutex);
2629 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2630 mutex_unlock(&rbd_dev->watch_mutex);
2631}
2632
edd8ca80
FM
2633static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2634{
2635 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2636
2637 strcpy(rbd_dev->lock_cookie, cookie);
2638 rbd_set_owner_cid(rbd_dev, &cid);
2639 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2640}
2641
ed95b21a
ID
2642/*
2643 * lock_rwsem must be held for write
2644 */
2645static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 2646{
922dab61 2647 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2648 char cookie[32];
e627db08 2649 int ret;
b8d70035 2650
cbbfb0ff
ID
2651 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2652 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 2653
ed95b21a
ID
2654 format_lock_cookie(rbd_dev, cookie);
2655 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2656 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2657 RBD_LOCK_TAG, "", 0);
e627db08 2658 if (ret)
ed95b21a 2659 return ret;
b8d70035 2660
ed95b21a 2661 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
edd8ca80 2662 __rbd_lock(rbd_dev, cookie);
ed95b21a 2663 return 0;
b8d70035
AE
2664}
2665
ed95b21a
ID
2666/*
2667 * lock_rwsem must be held for write
2668 */
bbead745 2669static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 2670{
922dab61 2671 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
2672 int ret;
2673
cbbfb0ff
ID
2674 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2675 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 2676
ed95b21a 2677 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 2678 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
2679 if (ret && ret != -ENOENT)
2680 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 2681
bbead745
ID
2682 /* treat errors as the image is unlocked */
2683 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 2684 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
2685 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2686 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
2687}
2688
ed95b21a
ID
2689static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2690 enum rbd_notify_op notify_op,
2691 struct page ***preply_pages,
2692 size_t *preply_len)
9969ebc5
AE
2693{
2694 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2695 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
08a79102
KS
2696 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2697 int buf_size = sizeof(buf);
ed95b21a 2698 void *p = buf;
9969ebc5 2699
ed95b21a 2700 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 2701
ed95b21a
ID
2702 /* encode *LockPayload NotifyMessage (op + ClientId) */
2703 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2704 ceph_encode_32(&p, notify_op);
2705 ceph_encode_64(&p, cid.gid);
2706 ceph_encode_64(&p, cid.handle);
8eb87565 2707
ed95b21a
ID
2708 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2709 &rbd_dev->header_oloc, buf, buf_size,
2710 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
2711}
2712
ed95b21a
ID
2713static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2714 enum rbd_notify_op notify_op)
b30a01f2 2715{
ed95b21a
ID
2716 struct page **reply_pages;
2717 size_t reply_len;
b30a01f2 2718
ed95b21a
ID
2719 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2720 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2721}
b30a01f2 2722
ed95b21a
ID
2723static void rbd_notify_acquired_lock(struct work_struct *work)
2724{
2725 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2726 acquired_lock_work);
76756a51 2727
ed95b21a 2728 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
2729}
2730
ed95b21a 2731static void rbd_notify_released_lock(struct work_struct *work)
c525f036 2732{
ed95b21a
ID
2733 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2734 released_lock_work);
811c6688 2735
ed95b21a 2736 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
2737}
2738
ed95b21a 2739static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 2740{
ed95b21a
ID
2741 struct page **reply_pages;
2742 size_t reply_len;
2743 bool lock_owner_responded = false;
36be9a76
AE
2744 int ret;
2745
ed95b21a 2746 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 2747
ed95b21a
ID
2748 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2749 &reply_pages, &reply_len);
2750 if (ret && ret != -ETIMEDOUT) {
2751 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 2752 goto out;
ed95b21a 2753 }
36be9a76 2754
ed95b21a
ID
2755 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2756 void *p = page_address(reply_pages[0]);
2757 void *const end = p + reply_len;
2758 u32 n;
36be9a76 2759
ed95b21a
ID
2760 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2761 while (n--) {
2762 u8 struct_v;
2763 u32 len;
36be9a76 2764
ed95b21a
ID
2765 ceph_decode_need(&p, end, 8 + 8, e_inval);
2766 p += 8 + 8; /* skip gid and cookie */
04017e29 2767
ed95b21a
ID
2768 ceph_decode_32_safe(&p, end, len, e_inval);
2769 if (!len)
2770 continue;
2771
2772 if (lock_owner_responded) {
2773 rbd_warn(rbd_dev,
2774 "duplicate lock owners detected");
2775 ret = -EIO;
2776 goto out;
2777 }
2778
2779 lock_owner_responded = true;
2780 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2781 &struct_v, &len);
2782 if (ret) {
2783 rbd_warn(rbd_dev,
2784 "failed to decode ResponseMessage: %d",
2785 ret);
2786 goto e_inval;
2787 }
2788
2789 ret = ceph_decode_32(&p);
2790 }
2791 }
2792
2793 if (!lock_owner_responded) {
2794 rbd_warn(rbd_dev, "no lock owners detected");
2795 ret = -ETIMEDOUT;
2796 }
2797
2798out:
2799 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2800 return ret;
2801
2802e_inval:
2803 ret = -EINVAL;
2804 goto out;
2805}
2806
2807static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2808{
2809 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2810
2811 cancel_delayed_work(&rbd_dev->lock_dwork);
2812 if (wake_all)
2813 wake_up_all(&rbd_dev->lock_waitq);
2814 else
2815 wake_up(&rbd_dev->lock_waitq);
2816}
2817
2818static int get_lock_owner_info(struct rbd_device *rbd_dev,
2819 struct ceph_locker **lockers, u32 *num_lockers)
2820{
2821 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2822 u8 lock_type;
2823 char *lock_tag;
2824 int ret;
2825
2826 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2827
2828 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2829 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2830 &lock_type, &lock_tag, lockers, num_lockers);
2831 if (ret)
2832 return ret;
2833
2834 if (*num_lockers == 0) {
2835 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2836 goto out;
2837 }
2838
2839 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2840 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2841 lock_tag);
2842 ret = -EBUSY;
2843 goto out;
2844 }
2845
2846 if (lock_type == CEPH_CLS_LOCK_SHARED) {
2847 rbd_warn(rbd_dev, "shared lock type detected");
2848 ret = -EBUSY;
2849 goto out;
2850 }
2851
2852 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2853 strlen(RBD_LOCK_COOKIE_PREFIX))) {
2854 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2855 (*lockers)[0].id.cookie);
2856 ret = -EBUSY;
2857 goto out;
2858 }
2859
2860out:
2861 kfree(lock_tag);
2862 return ret;
2863}
2864
2865static int find_watcher(struct rbd_device *rbd_dev,
2866 const struct ceph_locker *locker)
2867{
2868 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2869 struct ceph_watch_item *watchers;
2870 u32 num_watchers;
2871 u64 cookie;
2872 int i;
2873 int ret;
2874
2875 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2876 &rbd_dev->header_oloc, &watchers,
2877 &num_watchers);
2878 if (ret)
2879 return ret;
2880
2881 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2882 for (i = 0; i < num_watchers; i++) {
2883 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2884 sizeof(locker->info.addr)) &&
2885 watchers[i].cookie == cookie) {
2886 struct rbd_client_id cid = {
2887 .gid = le64_to_cpu(watchers[i].name.num),
2888 .handle = cookie,
2889 };
2890
2891 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2892 rbd_dev, cid.gid, cid.handle);
2893 rbd_set_owner_cid(rbd_dev, &cid);
2894 ret = 1;
2895 goto out;
2896 }
2897 }
2898
2899 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2900 ret = 0;
2901out:
2902 kfree(watchers);
2903 return ret;
2904}
2905
2906/*
2907 * lock_rwsem must be held for write
2908 */
2909static int rbd_try_lock(struct rbd_device *rbd_dev)
2910{
2911 struct ceph_client *client = rbd_dev->rbd_client->client;
2912 struct ceph_locker *lockers;
2913 u32 num_lockers;
2914 int ret;
2915
2916 for (;;) {
2917 ret = rbd_lock(rbd_dev);
2918 if (ret != -EBUSY)
2919 return ret;
2920
2921 /* determine if the current lock holder is still alive */
2922 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2923 if (ret)
2924 return ret;
2925
2926 if (num_lockers == 0)
2927 goto again;
2928
2929 ret = find_watcher(rbd_dev, lockers);
2930 if (ret) {
2931 if (ret > 0)
2932 ret = 0; /* have to request lock */
2933 goto out;
2934 }
2935
2936 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2937 ENTITY_NAME(lockers[0].id.name));
2938
2939 ret = ceph_monc_blacklist_add(&client->monc,
2940 &lockers[0].info.addr);
2941 if (ret) {
2942 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2943 ENTITY_NAME(lockers[0].id.name), ret);
2944 goto out;
2945 }
2946
2947 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2948 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2949 lockers[0].id.cookie,
2950 &lockers[0].id.name);
2951 if (ret && ret != -ENOENT)
2952 goto out;
2953
2954again:
2955 ceph_free_lockers(lockers, num_lockers);
2956 }
2957
2958out:
2959 ceph_free_lockers(lockers, num_lockers);
2960 return ret;
2961}
2962
2963/*
2964 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2965 */
2966static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2967 int *pret)
2968{
2969 enum rbd_lock_state lock_state;
2970
2971 down_read(&rbd_dev->lock_rwsem);
2972 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
2973 rbd_dev->lock_state);
2974 if (__rbd_is_lock_owner(rbd_dev)) {
2975 lock_state = rbd_dev->lock_state;
2976 up_read(&rbd_dev->lock_rwsem);
2977 return lock_state;
2978 }
2979
2980 up_read(&rbd_dev->lock_rwsem);
2981 down_write(&rbd_dev->lock_rwsem);
2982 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
2983 rbd_dev->lock_state);
2984 if (!__rbd_is_lock_owner(rbd_dev)) {
2985 *pret = rbd_try_lock(rbd_dev);
2986 if (*pret)
2987 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
2988 }
2989
2990 lock_state = rbd_dev->lock_state;
2991 up_write(&rbd_dev->lock_rwsem);
2992 return lock_state;
2993}
2994
2995static void rbd_acquire_lock(struct work_struct *work)
2996{
2997 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
2998 struct rbd_device, lock_dwork);
2999 enum rbd_lock_state lock_state;
37f13252 3000 int ret = 0;
ed95b21a
ID
3001
3002 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3003again:
3004 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3005 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3006 if (lock_state == RBD_LOCK_STATE_LOCKED)
3007 wake_requests(rbd_dev, true);
3008 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3009 rbd_dev, lock_state, ret);
3010 return;
3011 }
3012
3013 ret = rbd_request_lock(rbd_dev);
3014 if (ret == -ETIMEDOUT) {
3015 goto again; /* treat this as a dead client */
e010dd0a
ID
3016 } else if (ret == -EROFS) {
3017 rbd_warn(rbd_dev, "peer will not release lock");
3018 /*
3019 * If this is rbd_add_acquire_lock(), we want to fail
3020 * immediately -- reuse BLACKLISTED flag. Otherwise we
3021 * want to block.
3022 */
3023 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3024 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3025 /* wake "rbd map --exclusive" process */
3026 wake_requests(rbd_dev, false);
3027 }
ed95b21a
ID
3028 } else if (ret < 0) {
3029 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3030 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3031 RBD_RETRY_DELAY);
3032 } else {
3033 /*
3034 * lock owner acked, but resend if we don't see them
3035 * release the lock
3036 */
3037 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3038 rbd_dev);
3039 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3040 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3041 }
3042}
3043
3044/*
3045 * lock_rwsem must be held for write
3046 */
3047static bool rbd_release_lock(struct rbd_device *rbd_dev)
3048{
3049 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3050 rbd_dev->lock_state);
3051 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3052 return false;
3053
3054 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3055 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3056 /*
ed95b21a 3057 * Ensure that all in-flight IO is flushed.
52bb1f9b 3058 *
ed95b21a
ID
3059 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3060 * may be shared with other devices.
52bb1f9b 3061 */
ed95b21a
ID
3062 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3063 up_read(&rbd_dev->lock_rwsem);
3064
3065 down_write(&rbd_dev->lock_rwsem);
3066 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3067 rbd_dev->lock_state);
3068 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3069 return false;
3070
bbead745
ID
3071 rbd_unlock(rbd_dev);
3072 /*
3073 * Give others a chance to grab the lock - we would re-acquire
3074 * almost immediately if we got new IO during ceph_osdc_sync()
3075 * otherwise. We need to ack our own notifications, so this
3076 * lock_dwork will be requeued from rbd_wait_state_locked()
3077 * after wake_requests() in rbd_handle_released_lock().
3078 */
3079 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3080 return true;
3081}
3082
3083static void rbd_release_lock_work(struct work_struct *work)
3084{
3085 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3086 unlock_work);
3087
3088 down_write(&rbd_dev->lock_rwsem);
3089 rbd_release_lock(rbd_dev);
3090 up_write(&rbd_dev->lock_rwsem);
3091}
3092
3093static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3094 void **p)
3095{
3096 struct rbd_client_id cid = { 0 };
3097
3098 if (struct_v >= 2) {
3099 cid.gid = ceph_decode_64(p);
3100 cid.handle = ceph_decode_64(p);
3101 }
3102
3103 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3104 cid.handle);
3105 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3106 down_write(&rbd_dev->lock_rwsem);
3107 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3108 /*
3109 * we already know that the remote client is
3110 * the owner
3111 */
3112 up_write(&rbd_dev->lock_rwsem);
3113 return;
3114 }
3115
3116 rbd_set_owner_cid(rbd_dev, &cid);
3117 downgrade_write(&rbd_dev->lock_rwsem);
3118 } else {
3119 down_read(&rbd_dev->lock_rwsem);
3120 }
3121
3122 if (!__rbd_is_lock_owner(rbd_dev))
3123 wake_requests(rbd_dev, false);
3124 up_read(&rbd_dev->lock_rwsem);
3125}
3126
3127static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3128 void **p)
3129{
3130 struct rbd_client_id cid = { 0 };
3131
3132 if (struct_v >= 2) {
3133 cid.gid = ceph_decode_64(p);
3134 cid.handle = ceph_decode_64(p);
3135 }
3136
3137 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3138 cid.handle);
3139 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3140 down_write(&rbd_dev->lock_rwsem);
3141 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3142 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3143 __func__, rbd_dev, cid.gid, cid.handle,
3144 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3145 up_write(&rbd_dev->lock_rwsem);
3146 return;
3147 }
3148
3149 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3150 downgrade_write(&rbd_dev->lock_rwsem);
3151 } else {
3152 down_read(&rbd_dev->lock_rwsem);
3153 }
3154
3155 if (!__rbd_is_lock_owner(rbd_dev))
3156 wake_requests(rbd_dev, false);
3157 up_read(&rbd_dev->lock_rwsem);
3158}
3159
3b77faa0
ID
3160/*
3161 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3162 * ResponseMessage is needed.
3163 */
3164static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3165 void **p)
ed95b21a
ID
3166{
3167 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3168 struct rbd_client_id cid = { 0 };
3b77faa0 3169 int result = 1;
ed95b21a
ID
3170
3171 if (struct_v >= 2) {
3172 cid.gid = ceph_decode_64(p);
3173 cid.handle = ceph_decode_64(p);
3174 }
3175
3176 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3177 cid.handle);
3178 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3179 return result;
ed95b21a
ID
3180
3181 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3182 if (__rbd_is_lock_owner(rbd_dev)) {
3183 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3184 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3185 goto out_unlock;
3186
3187 /*
3188 * encode ResponseMessage(0) so the peer can detect
3189 * a missing owner
3190 */
3191 result = 0;
3192
3193 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3194 if (!rbd_dev->opts->exclusive) {
3195 dout("%s rbd_dev %p queueing unlock_work\n",
3196 __func__, rbd_dev);
3197 queue_work(rbd_dev->task_wq,
3198 &rbd_dev->unlock_work);
3199 } else {
3200 /* refuse to release the lock */
3201 result = -EROFS;
3202 }
ed95b21a
ID
3203 }
3204 }
3b77faa0
ID
3205
3206out_unlock:
ed95b21a 3207 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3208 return result;
ed95b21a
ID
3209}
3210
3211static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3212 u64 notify_id, u64 cookie, s32 *result)
3213{
3214 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
08a79102
KS
3215 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3216 int buf_size = sizeof(buf);
ed95b21a
ID
3217 int ret;
3218
3219 if (result) {
3220 void *p = buf;
3221
3222 /* encode ResponseMessage */
3223 ceph_start_encoding(&p, 1, 1,
3224 buf_size - CEPH_ENCODING_START_BLK_LEN);
3225 ceph_encode_32(&p, *result);
3226 } else {
3227 buf_size = 0;
3228 }
b8d70035 3229
922dab61
ID
3230 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3231 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3232 buf, buf_size);
52bb1f9b 3233 if (ret)
ed95b21a
ID
3234 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3235}
3236
3237static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3238 u64 cookie)
3239{
3240 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3241 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3242}
3243
3244static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3245 u64 notify_id, u64 cookie, s32 result)
3246{
3247 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3248 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3249}
3250
3251static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3252 u64 notifier_id, void *data, size_t data_len)
3253{
3254 struct rbd_device *rbd_dev = arg;
3255 void *p = data;
3256 void *const end = p + data_len;
d4c2269b 3257 u8 struct_v = 0;
ed95b21a
ID
3258 u32 len;
3259 u32 notify_op;
3260 int ret;
3261
3262 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3263 __func__, rbd_dev, cookie, notify_id, data_len);
3264 if (data_len) {
3265 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3266 &struct_v, &len);
3267 if (ret) {
3268 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3269 ret);
3270 return;
3271 }
3272
3273 notify_op = ceph_decode_32(&p);
3274 } else {
3275 /* legacy notification for header updates */
3276 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3277 len = 0;
3278 }
3279
3280 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3281 switch (notify_op) {
3282 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3283 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3284 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3285 break;
3286 case RBD_NOTIFY_OP_RELEASED_LOCK:
3287 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3288 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3289 break;
3290 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3291 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3292 if (ret <= 0)
ed95b21a 3293 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3294 cookie, ret);
ed95b21a
ID
3295 else
3296 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3297 break;
3298 case RBD_NOTIFY_OP_HEADER_UPDATE:
3299 ret = rbd_dev_refresh(rbd_dev);
3300 if (ret)
3301 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3302
3303 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3304 break;
3305 default:
3306 if (rbd_is_lock_owner(rbd_dev))
3307 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3308 cookie, -EOPNOTSUPP);
3309 else
3310 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3311 break;
3312 }
b8d70035
AE
3313}
3314
99d16943
ID
3315static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3316
922dab61 3317static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3318{
922dab61 3319 struct rbd_device *rbd_dev = arg;
bb040aa0 3320
922dab61 3321 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3322
ed95b21a
ID
3323 down_write(&rbd_dev->lock_rwsem);
3324 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3325 up_write(&rbd_dev->lock_rwsem);
3326
99d16943
ID
3327 mutex_lock(&rbd_dev->watch_mutex);
3328 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3329 __rbd_unregister_watch(rbd_dev);
3330 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3331
99d16943 3332 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3333 }
99d16943 3334 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3335}
3336
9969ebc5 3337/*
99d16943 3338 * watch_mutex must be locked
9969ebc5 3339 */
99d16943 3340static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3341{
3342 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3343 struct ceph_osd_linger_request *handle;
9969ebc5 3344
922dab61 3345 rbd_assert(!rbd_dev->watch_handle);
99d16943 3346 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3347
922dab61
ID
3348 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3349 &rbd_dev->header_oloc, rbd_watch_cb,
3350 rbd_watch_errcb, rbd_dev);
3351 if (IS_ERR(handle))
3352 return PTR_ERR(handle);
8eb87565 3353
922dab61 3354 rbd_dev->watch_handle = handle;
b30a01f2 3355 return 0;
b30a01f2
ID
3356}
3357
99d16943
ID
3358/*
3359 * watch_mutex must be locked
3360 */
3361static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3362{
922dab61
ID
3363 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3364 int ret;
b30a01f2 3365
99d16943
ID
3366 rbd_assert(rbd_dev->watch_handle);
3367 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3368
922dab61
ID
3369 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3370 if (ret)
3371 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3372
922dab61 3373 rbd_dev->watch_handle = NULL;
c525f036
ID
3374}
3375
99d16943
ID
3376static int rbd_register_watch(struct rbd_device *rbd_dev)
3377{
3378 int ret;
3379
3380 mutex_lock(&rbd_dev->watch_mutex);
3381 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3382 ret = __rbd_register_watch(rbd_dev);
3383 if (ret)
3384 goto out;
3385
3386 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3387 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3388
3389out:
3390 mutex_unlock(&rbd_dev->watch_mutex);
3391 return ret;
3392}
3393
3394static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3395{
99d16943
ID
3396 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3397
3398 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3399 cancel_work_sync(&rbd_dev->acquired_lock_work);
3400 cancel_work_sync(&rbd_dev->released_lock_work);
3401 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3402 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3403}
3404
3405static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3406{
ed95b21a 3407 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3408 cancel_tasks_sync(rbd_dev);
3409
3410 mutex_lock(&rbd_dev->watch_mutex);
3411 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3412 __rbd_unregister_watch(rbd_dev);
3413 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3414 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3415
811c6688 3416 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3417}
3418
14bb211d
ID
3419/*
3420 * lock_rwsem must be held for write
3421 */
3422static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3423{
3424 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3425 char cookie[32];
3426 int ret;
3427
3428 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3429
3430 format_lock_cookie(rbd_dev, cookie);
3431 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3432 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3433 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3434 RBD_LOCK_TAG, cookie);
3435 if (ret) {
3436 if (ret != -EOPNOTSUPP)
3437 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3438 ret);
3439
3440 /*
3441 * Lock cookie cannot be updated on older OSDs, so do
3442 * a manual release and queue an acquire.
3443 */
3444 if (rbd_release_lock(rbd_dev))
3445 queue_delayed_work(rbd_dev->task_wq,
3446 &rbd_dev->lock_dwork, 0);
3447 } else {
edd8ca80 3448 __rbd_lock(rbd_dev, cookie);
14bb211d
ID
3449 }
3450}
3451
99d16943
ID
3452static void rbd_reregister_watch(struct work_struct *work)
3453{
3454 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3455 struct rbd_device, watch_dwork);
3456 int ret;
3457
3458 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3459
3460 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3461 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3462 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3463 return;
87c0fded 3464 }
99d16943
ID
3465
3466 ret = __rbd_register_watch(rbd_dev);
3467 if (ret) {
3468 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3469 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3470 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3471 wake_requests(rbd_dev, true);
87c0fded 3472 } else {
99d16943
ID
3473 queue_delayed_work(rbd_dev->task_wq,
3474 &rbd_dev->watch_dwork,
3475 RBD_RETRY_DELAY);
87c0fded
ID
3476 }
3477 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3478 return;
99d16943
ID
3479 }
3480
3481 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3482 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3483 mutex_unlock(&rbd_dev->watch_mutex);
3484
14bb211d
ID
3485 down_write(&rbd_dev->lock_rwsem);
3486 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3487 rbd_reacquire_lock(rbd_dev);
3488 up_write(&rbd_dev->lock_rwsem);
3489
99d16943
ID
3490 ret = rbd_dev_refresh(rbd_dev);
3491 if (ret)
f6870cc9 3492 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
99d16943
ID
3493}
3494
36be9a76 3495/*
f40eb349
AE
3496 * Synchronous osd object method call. Returns the number of bytes
3497 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3498 */
3499static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3500 struct ceph_object_id *oid,
3501 struct ceph_object_locator *oloc,
36be9a76 3502 const char *method_name,
4157976b 3503 const void *outbound,
36be9a76 3504 size_t outbound_size,
4157976b 3505 void *inbound,
e2a58ee5 3506 size_t inbound_size)
36be9a76 3507{
ecd4a68a
ID
3508 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3509 struct page *req_page = NULL;
3510 struct page *reply_page;
36be9a76
AE
3511 int ret;
3512
3513 /*
6010a451
AE
3514 * Method calls are ultimately read operations. The result
3515 * should placed into the inbound buffer provided. They
3516 * also supply outbound data--parameters for the object
3517 * method. Currently if this is present it will be a
3518 * snapshot id.
36be9a76 3519 */
ecd4a68a
ID
3520 if (outbound) {
3521 if (outbound_size > PAGE_SIZE)
3522 return -E2BIG;
36be9a76 3523
ecd4a68a
ID
3524 req_page = alloc_page(GFP_KERNEL);
3525 if (!req_page)
3526 return -ENOMEM;
04017e29 3527
ecd4a68a 3528 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3529 }
36be9a76 3530
ecd4a68a
ID
3531 reply_page = alloc_page(GFP_KERNEL);
3532 if (!reply_page) {
3533 if (req_page)
3534 __free_page(req_page);
3535 return -ENOMEM;
3536 }
57385b51 3537
ecd4a68a
ID
3538 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3539 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3540 reply_page, &inbound_size);
3541 if (!ret) {
3542 memcpy(inbound, page_address(reply_page), inbound_size);
3543 ret = inbound_size;
3544 }
36be9a76 3545
ecd4a68a
ID
3546 if (req_page)
3547 __free_page(req_page);
3548 __free_page(reply_page);
36be9a76
AE
3549 return ret;
3550}
3551
ed95b21a
ID
3552/*
3553 * lock_rwsem must be held for read
3554 */
2f18d466 3555static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
ed95b21a
ID
3556{
3557 DEFINE_WAIT(wait);
34f55d0b 3558 unsigned long timeout;
2f18d466
ID
3559 int ret = 0;
3560
3561 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
3562 return -EBLACKLISTED;
3563
3564 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3565 return 0;
3566
3567 if (!may_acquire) {
3568 rbd_warn(rbd_dev, "exclusive lock required");
3569 return -EROFS;
3570 }
ed95b21a
ID
3571
3572 do {
3573 /*
3574 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3575 * and cancel_delayed_work() in wake_requests().
3576 */
3577 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3578 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3579 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3580 TASK_UNINTERRUPTIBLE);
3581 up_read(&rbd_dev->lock_rwsem);
34f55d0b
DY
3582 timeout = schedule_timeout(ceph_timeout_jiffies(
3583 rbd_dev->opts->lock_timeout));
ed95b21a 3584 down_read(&rbd_dev->lock_rwsem);
2f18d466
ID
3585 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3586 ret = -EBLACKLISTED;
3587 break;
3588 }
34f55d0b
DY
3589 if (!timeout) {
3590 rbd_warn(rbd_dev, "timed out waiting for lock");
3591 ret = -ETIMEDOUT;
3592 break;
3593 }
2f18d466 3594 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
87c0fded 3595
ed95b21a 3596 finish_wait(&rbd_dev->lock_waitq, &wait);
2f18d466 3597 return ret;
ed95b21a
ID
3598}
3599
7ad18afa 3600static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 3601{
7ad18afa
CH
3602 struct request *rq = blk_mq_rq_from_pdu(work);
3603 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 3604 struct rbd_img_request *img_request;
4e752f0a 3605 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
3606 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3607 u64 length = blk_rq_bytes(rq);
6d2940c8 3608 enum obj_operation_type op_type;
4e752f0a 3609 u64 mapping_size;
80de1912 3610 bool must_be_locked;
bf0d5f50
AE
3611 int result;
3612
aebf526b
CH
3613 switch (req_op(rq)) {
3614 case REQ_OP_DISCARD:
6ac56951 3615 case REQ_OP_WRITE_ZEROES:
90e98c52 3616 op_type = OBJ_OP_DISCARD;
aebf526b
CH
3617 break;
3618 case REQ_OP_WRITE:
6d2940c8 3619 op_type = OBJ_OP_WRITE;
aebf526b
CH
3620 break;
3621 case REQ_OP_READ:
6d2940c8 3622 op_type = OBJ_OP_READ;
aebf526b
CH
3623 break;
3624 default:
3625 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3626 result = -EIO;
3627 goto err;
3628 }
6d2940c8 3629
bc1ecc65 3630 /* Ignore/skip any zero-length requests */
bf0d5f50 3631
bc1ecc65
ID
3632 if (!length) {
3633 dout("%s: zero-length request\n", __func__);
3634 result = 0;
3635 goto err_rq;
3636 }
bf0d5f50 3637
9568c93e
ID
3638 rbd_assert(op_type == OBJ_OP_READ ||
3639 rbd_dev->spec->snap_id == CEPH_NOSNAP);
4dda41d3 3640
bc1ecc65
ID
3641 /*
3642 * Quit early if the mapped snapshot no longer exists. It's
3643 * still possible the snapshot will have disappeared by the
3644 * time our request arrives at the osd, but there's no sense in
3645 * sending it if we already know.
3646 */
3647 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3648 dout("request for non-existent snapshot");
3649 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3650 result = -ENXIO;
3651 goto err_rq;
3652 }
4dda41d3 3653
bc1ecc65
ID
3654 if (offset && length > U64_MAX - offset + 1) {
3655 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3656 length);
3657 result = -EINVAL;
3658 goto err_rq; /* Shouldn't happen */
3659 }
4dda41d3 3660
7ad18afa
CH
3661 blk_mq_start_request(rq);
3662
4e752f0a
JD
3663 down_read(&rbd_dev->header_rwsem);
3664 mapping_size = rbd_dev->mapping.size;
6d2940c8 3665 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
3666 snapc = rbd_dev->header.snapc;
3667 ceph_get_snap_context(snapc);
3668 }
3669 up_read(&rbd_dev->header_rwsem);
3670
3671 if (offset + length > mapping_size) {
bc1ecc65 3672 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 3673 length, mapping_size);
bc1ecc65
ID
3674 result = -EIO;
3675 goto err_rq;
3676 }
bf0d5f50 3677
f9bebd58
ID
3678 must_be_locked =
3679 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3680 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
3681 if (must_be_locked) {
3682 down_read(&rbd_dev->lock_rwsem);
2f18d466
ID
3683 result = rbd_wait_state_locked(rbd_dev,
3684 !rbd_dev->opts->exclusive);
3685 if (result)
87c0fded 3686 goto err_unlock;
ed95b21a
ID
3687 }
3688
dfd9875f 3689 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
bc1ecc65
ID
3690 if (!img_request) {
3691 result = -ENOMEM;
ed95b21a 3692 goto err_unlock;
bc1ecc65
ID
3693 }
3694 img_request->rq = rq;
70b16db8 3695 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 3696
90e98c52 3697 if (op_type == OBJ_OP_DISCARD)
5a237819 3698 result = rbd_img_fill_nodata(img_request, offset, length);
90e98c52 3699 else
5a237819
ID
3700 result = rbd_img_fill_from_bio(img_request, offset, length,
3701 rq->bio);
bc1ecc65
ID
3702 if (result)
3703 goto err_img_request;
bf0d5f50 3704
efbd1a11 3705 rbd_img_request_submit(img_request);
ed95b21a
ID
3706 if (must_be_locked)
3707 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 3708 return;
bf0d5f50 3709
bc1ecc65
ID
3710err_img_request:
3711 rbd_img_request_put(img_request);
ed95b21a
ID
3712err_unlock:
3713 if (must_be_locked)
3714 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
3715err_rq:
3716 if (result)
3717 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 3718 obj_op_name(op_type), length, offset, result);
e96a650a 3719 ceph_put_snap_context(snapc);
7ad18afa 3720err:
2a842aca 3721 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 3722}
bf0d5f50 3723
fc17b653 3724static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 3725 const struct blk_mq_queue_data *bd)
bc1ecc65 3726{
7ad18afa
CH
3727 struct request *rq = bd->rq;
3728 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 3729
7ad18afa 3730 queue_work(rbd_wq, work);
fc17b653 3731 return BLK_STS_OK;
bf0d5f50
AE
3732}
3733
602adf40
YS
3734static void rbd_free_disk(struct rbd_device *rbd_dev)
3735{
5769ed0c
ID
3736 blk_cleanup_queue(rbd_dev->disk->queue);
3737 blk_mq_free_tag_set(&rbd_dev->tag_set);
3738 put_disk(rbd_dev->disk);
a0cab924 3739 rbd_dev->disk = NULL;
602adf40
YS
3740}
3741
788e2df3 3742static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
3743 struct ceph_object_id *oid,
3744 struct ceph_object_locator *oloc,
3745 void *buf, int buf_len)
788e2df3
AE
3746
3747{
fe5478e0
ID
3748 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3749 struct ceph_osd_request *req;
3750 struct page **pages;
3751 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
3752 int ret;
3753
fe5478e0
ID
3754 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3755 if (!req)
3756 return -ENOMEM;
788e2df3 3757
fe5478e0
ID
3758 ceph_oid_copy(&req->r_base_oid, oid);
3759 ceph_oloc_copy(&req->r_base_oloc, oloc);
3760 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 3761
fe5478e0 3762 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 3763 if (ret)
fe5478e0 3764 goto out_req;
788e2df3 3765
fe5478e0
ID
3766 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3767 if (IS_ERR(pages)) {
3768 ret = PTR_ERR(pages);
3769 goto out_req;
3770 }
1ceae7ef 3771
fe5478e0
ID
3772 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3773 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3774 true);
3775
3776 ceph_osdc_start_request(osdc, req, false);
3777 ret = ceph_osdc_wait_request(osdc, req);
3778 if (ret >= 0)
3779 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 3780
fe5478e0
ID
3781out_req:
3782 ceph_osdc_put_request(req);
788e2df3
AE
3783 return ret;
3784}
3785
602adf40 3786/*
662518b1
AE
3787 * Read the complete header for the given rbd device. On successful
3788 * return, the rbd_dev->header field will contain up-to-date
3789 * information about the image.
602adf40 3790 */
99a41ebc 3791static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3792{
4156d998 3793 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3794 u32 snap_count = 0;
4156d998
AE
3795 u64 names_size = 0;
3796 u32 want_count;
3797 int ret;
602adf40 3798
00f1f36f 3799 /*
4156d998
AE
3800 * The complete header will include an array of its 64-bit
3801 * snapshot ids, followed by the names of those snapshots as
3802 * a contiguous block of NUL-terminated strings. Note that
3803 * the number of snapshots could change by the time we read
3804 * it in, in which case we re-read it.
00f1f36f 3805 */
4156d998
AE
3806 do {
3807 size_t size;
3808
3809 kfree(ondisk);
3810
3811 size = sizeof (*ondisk);
3812 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3813 size += names_size;
3814 ondisk = kmalloc(size, GFP_KERNEL);
3815 if (!ondisk)
662518b1 3816 return -ENOMEM;
4156d998 3817
fe5478e0
ID
3818 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3819 &rbd_dev->header_oloc, ondisk, size);
4156d998 3820 if (ret < 0)
662518b1 3821 goto out;
c0cd10db 3822 if ((size_t)ret < size) {
4156d998 3823 ret = -ENXIO;
06ecc6cb
AE
3824 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3825 size, ret);
662518b1 3826 goto out;
4156d998
AE
3827 }
3828 if (!rbd_dev_ondisk_valid(ondisk)) {
3829 ret = -ENXIO;
06ecc6cb 3830 rbd_warn(rbd_dev, "invalid header");
662518b1 3831 goto out;
81e759fb 3832 }
602adf40 3833
4156d998
AE
3834 names_size = le64_to_cpu(ondisk->snap_names_len);
3835 want_count = snap_count;
3836 snap_count = le32_to_cpu(ondisk->snap_count);
3837 } while (snap_count != want_count);
00f1f36f 3838
662518b1
AE
3839 ret = rbd_header_from_disk(rbd_dev, ondisk);
3840out:
4156d998
AE
3841 kfree(ondisk);
3842
3843 return ret;
602adf40
YS
3844}
3845
15228ede
AE
3846/*
3847 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3848 * has disappeared from the (just updated) snapshot context.
3849 */
3850static void rbd_exists_validate(struct rbd_device *rbd_dev)
3851{
3852 u64 snap_id;
3853
3854 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3855 return;
3856
3857 snap_id = rbd_dev->spec->snap_id;
3858 if (snap_id == CEPH_NOSNAP)
3859 return;
3860
3861 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3862 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3863}
3864
9875201e
JD
3865static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3866{
3867 sector_t size;
9875201e
JD
3868
3869 /*
811c6688
ID
3870 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3871 * try to update its size. If REMOVING is set, updating size
3872 * is just useless work since the device can't be opened.
9875201e 3873 */
811c6688
ID
3874 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3875 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
3876 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3877 dout("setting size to %llu sectors", (unsigned long long)size);
3878 set_capacity(rbd_dev->disk, size);
3879 revalidate_disk(rbd_dev->disk);
3880 }
3881}
3882
cc4a38bd 3883static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3884{
e627db08 3885 u64 mapping_size;
1fe5e993
AE
3886 int ret;
3887
cfbf6377 3888 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3889 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
3890
3891 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 3892 if (ret)
73e39e4d 3893 goto out;
15228ede 3894
e8f59b59
ID
3895 /*
3896 * If there is a parent, see if it has disappeared due to the
3897 * mapped image getting flattened.
3898 */
3899 if (rbd_dev->parent) {
3900 ret = rbd_dev_v2_parent_info(rbd_dev);
3901 if (ret)
73e39e4d 3902 goto out;
e8f59b59
ID
3903 }
3904
5ff1108c 3905 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 3906 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
3907 } else {
3908 /* validate mapped snapshot's EXISTS flag */
3909 rbd_exists_validate(rbd_dev);
3910 }
15228ede 3911
73e39e4d 3912out:
cfbf6377 3913 up_write(&rbd_dev->header_rwsem);
73e39e4d 3914 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 3915 rbd_dev_update_size(rbd_dev);
1fe5e993 3916
73e39e4d 3917 return ret;
1fe5e993
AE
3918}
3919
d6296d39
CH
3920static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3921 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
3922{
3923 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3924
3925 INIT_WORK(work, rbd_queue_workfn);
3926 return 0;
3927}
3928
f363b089 3929static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 3930 .queue_rq = rbd_queue_rq,
7ad18afa
CH
3931 .init_request = rbd_init_request,
3932};
3933
602adf40
YS
3934static int rbd_init_disk(struct rbd_device *rbd_dev)
3935{
3936 struct gendisk *disk;
3937 struct request_queue *q;
420efbdf
ID
3938 unsigned int objset_bytes =
3939 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
7ad18afa 3940 int err;
602adf40 3941
602adf40 3942 /* create gendisk info */
7e513d43
ID
3943 disk = alloc_disk(single_major ?
3944 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3945 RBD_MINORS_PER_MAJOR);
602adf40 3946 if (!disk)
1fcdb8aa 3947 return -ENOMEM;
602adf40 3948
f0f8cef5 3949 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3950 rbd_dev->dev_id);
602adf40 3951 disk->major = rbd_dev->major;
dd82fff1 3952 disk->first_minor = rbd_dev->minor;
7e513d43
ID
3953 if (single_major)
3954 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
3955 disk->fops = &rbd_bd_ops;
3956 disk->private_data = rbd_dev;
3957
7ad18afa
CH
3958 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3959 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 3960 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 3961 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 3962 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
3963 rbd_dev->tag_set.nr_hw_queues = 1;
3964 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3965
3966 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3967 if (err)
602adf40 3968 goto out_disk;
029bcbd8 3969
7ad18afa
CH
3970 q = blk_mq_init_queue(&rbd_dev->tag_set);
3971 if (IS_ERR(q)) {
3972 err = PTR_ERR(q);
3973 goto out_tag_set;
3974 }
3975
8b904b5b 3976 blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
d8a2c89c 3977 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 3978
420efbdf 3979 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
0d9fde4f 3980 q->limits.max_sectors = queue_max_hw_sectors(q);
21acdf45 3981 blk_queue_max_segments(q, USHRT_MAX);
24f1df60 3982 blk_queue_max_segment_size(q, UINT_MAX);
420efbdf
ID
3983 blk_queue_io_min(q, objset_bytes);
3984 blk_queue_io_opt(q, objset_bytes);
029bcbd8 3985
d9360540
ID
3986 if (rbd_dev->opts->trim) {
3987 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
3988 q->limits.discard_granularity = objset_bytes;
3989 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
3990 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
3991 }
90e98c52 3992
bae818ee 3993 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 3994 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 3995
5769ed0c
ID
3996 /*
3997 * disk_release() expects a queue ref from add_disk() and will
3998 * put it. Hold an extra ref until add_disk() is called.
3999 */
4000 WARN_ON(!blk_get_queue(q));
602adf40 4001 disk->queue = q;
602adf40
YS
4002 q->queuedata = rbd_dev;
4003
4004 rbd_dev->disk = disk;
602adf40 4005
602adf40 4006 return 0;
7ad18afa
CH
4007out_tag_set:
4008 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4009out_disk:
4010 put_disk(disk);
7ad18afa 4011 return err;
602adf40
YS
4012}
4013
dfc5606d
YS
4014/*
4015 sysfs
4016*/
4017
593a9e7b
AE
4018static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4019{
4020 return container_of(dev, struct rbd_device, dev);
4021}
4022
dfc5606d
YS
4023static ssize_t rbd_size_show(struct device *dev,
4024 struct device_attribute *attr, char *buf)
4025{
593a9e7b 4026 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4027
fc71d833
AE
4028 return sprintf(buf, "%llu\n",
4029 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4030}
4031
34b13184
AE
4032/*
4033 * Note this shows the features for whatever's mapped, which is not
4034 * necessarily the base image.
4035 */
4036static ssize_t rbd_features_show(struct device *dev,
4037 struct device_attribute *attr, char *buf)
4038{
4039 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4040
4041 return sprintf(buf, "0x%016llx\n",
fc71d833 4042 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4043}
4044
dfc5606d
YS
4045static ssize_t rbd_major_show(struct device *dev,
4046 struct device_attribute *attr, char *buf)
4047{
593a9e7b 4048 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4049
fc71d833
AE
4050 if (rbd_dev->major)
4051 return sprintf(buf, "%d\n", rbd_dev->major);
4052
4053 return sprintf(buf, "(none)\n");
dd82fff1
ID
4054}
4055
4056static ssize_t rbd_minor_show(struct device *dev,
4057 struct device_attribute *attr, char *buf)
4058{
4059 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4060
dd82fff1 4061 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4062}
4063
005a07bf
ID
4064static ssize_t rbd_client_addr_show(struct device *dev,
4065 struct device_attribute *attr, char *buf)
4066{
4067 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4068 struct ceph_entity_addr *client_addr =
4069 ceph_client_addr(rbd_dev->rbd_client->client);
4070
4071 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4072 le32_to_cpu(client_addr->nonce));
4073}
4074
dfc5606d
YS
4075static ssize_t rbd_client_id_show(struct device *dev,
4076 struct device_attribute *attr, char *buf)
602adf40 4077{
593a9e7b 4078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4079
1dbb4399 4080 return sprintf(buf, "client%lld\n",
033268a5 4081 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4082}
4083
267fb90b
MC
4084static ssize_t rbd_cluster_fsid_show(struct device *dev,
4085 struct device_attribute *attr, char *buf)
4086{
4087 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4088
4089 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4090}
4091
0d6d1e9c
MC
4092static ssize_t rbd_config_info_show(struct device *dev,
4093 struct device_attribute *attr, char *buf)
4094{
4095 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4096
4097 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4098}
4099
dfc5606d
YS
4100static ssize_t rbd_pool_show(struct device *dev,
4101 struct device_attribute *attr, char *buf)
602adf40 4102{
593a9e7b 4103 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4104
0d7dbfce 4105 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4106}
4107
9bb2f334
AE
4108static ssize_t rbd_pool_id_show(struct device *dev,
4109 struct device_attribute *attr, char *buf)
4110{
4111 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4112
0d7dbfce 4113 return sprintf(buf, "%llu\n",
fc71d833 4114 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4115}
4116
dfc5606d
YS
4117static ssize_t rbd_name_show(struct device *dev,
4118 struct device_attribute *attr, char *buf)
4119{
593a9e7b 4120 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4121
a92ffdf8
AE
4122 if (rbd_dev->spec->image_name)
4123 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4124
4125 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4126}
4127
589d30e0
AE
4128static ssize_t rbd_image_id_show(struct device *dev,
4129 struct device_attribute *attr, char *buf)
4130{
4131 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4132
0d7dbfce 4133 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4134}
4135
34b13184
AE
4136/*
4137 * Shows the name of the currently-mapped snapshot (or
4138 * RBD_SNAP_HEAD_NAME for the base image).
4139 */
dfc5606d
YS
4140static ssize_t rbd_snap_show(struct device *dev,
4141 struct device_attribute *attr,
4142 char *buf)
4143{
593a9e7b 4144 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4145
0d7dbfce 4146 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4147}
4148
92a58671
MC
4149static ssize_t rbd_snap_id_show(struct device *dev,
4150 struct device_attribute *attr, char *buf)
4151{
4152 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4153
4154 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4155}
4156
86b00e0d 4157/*
ff96128f
ID
4158 * For a v2 image, shows the chain of parent images, separated by empty
4159 * lines. For v1 images or if there is no parent, shows "(no parent
4160 * image)".
86b00e0d
AE
4161 */
4162static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4163 struct device_attribute *attr,
4164 char *buf)
86b00e0d
AE
4165{
4166 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4167 ssize_t count = 0;
86b00e0d 4168
ff96128f 4169 if (!rbd_dev->parent)
86b00e0d
AE
4170 return sprintf(buf, "(no parent image)\n");
4171
ff96128f
ID
4172 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4173 struct rbd_spec *spec = rbd_dev->parent_spec;
4174
4175 count += sprintf(&buf[count], "%s"
4176 "pool_id %llu\npool_name %s\n"
4177 "image_id %s\nimage_name %s\n"
4178 "snap_id %llu\nsnap_name %s\n"
4179 "overlap %llu\n",
4180 !count ? "" : "\n", /* first? */
4181 spec->pool_id, spec->pool_name,
4182 spec->image_id, spec->image_name ?: "(unknown)",
4183 spec->snap_id, spec->snap_name,
4184 rbd_dev->parent_overlap);
4185 }
4186
4187 return count;
86b00e0d
AE
4188}
4189
dfc5606d
YS
4190static ssize_t rbd_image_refresh(struct device *dev,
4191 struct device_attribute *attr,
4192 const char *buf,
4193 size_t size)
4194{
593a9e7b 4195 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4196 int ret;
602adf40 4197
cc4a38bd 4198 ret = rbd_dev_refresh(rbd_dev);
e627db08 4199 if (ret)
52bb1f9b 4200 return ret;
b813623a 4201
52bb1f9b 4202 return size;
dfc5606d 4203}
602adf40 4204
dfc5606d 4205static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4206static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4207static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4208static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4209static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4210static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4211static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4212static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4213static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4214static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4215static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4216static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4217static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4218static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4219static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4220static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4221
4222static struct attribute *rbd_attrs[] = {
4223 &dev_attr_size.attr,
34b13184 4224 &dev_attr_features.attr,
dfc5606d 4225 &dev_attr_major.attr,
dd82fff1 4226 &dev_attr_minor.attr,
005a07bf 4227 &dev_attr_client_addr.attr,
dfc5606d 4228 &dev_attr_client_id.attr,
267fb90b 4229 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4230 &dev_attr_config_info.attr,
dfc5606d 4231 &dev_attr_pool.attr,
9bb2f334 4232 &dev_attr_pool_id.attr,
dfc5606d 4233 &dev_attr_name.attr,
589d30e0 4234 &dev_attr_image_id.attr,
dfc5606d 4235 &dev_attr_current_snap.attr,
92a58671 4236 &dev_attr_snap_id.attr,
86b00e0d 4237 &dev_attr_parent.attr,
dfc5606d 4238 &dev_attr_refresh.attr,
dfc5606d
YS
4239 NULL
4240};
4241
4242static struct attribute_group rbd_attr_group = {
4243 .attrs = rbd_attrs,
4244};
4245
4246static const struct attribute_group *rbd_attr_groups[] = {
4247 &rbd_attr_group,
4248 NULL
4249};
4250
6cac4695 4251static void rbd_dev_release(struct device *dev);
dfc5606d 4252
b9942bc9 4253static const struct device_type rbd_device_type = {
dfc5606d
YS
4254 .name = "rbd",
4255 .groups = rbd_attr_groups,
6cac4695 4256 .release = rbd_dev_release,
dfc5606d
YS
4257};
4258
8b8fb99c
AE
4259static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4260{
4261 kref_get(&spec->kref);
4262
4263 return spec;
4264}
4265
4266static void rbd_spec_free(struct kref *kref);
4267static void rbd_spec_put(struct rbd_spec *spec)
4268{
4269 if (spec)
4270 kref_put(&spec->kref, rbd_spec_free);
4271}
4272
4273static struct rbd_spec *rbd_spec_alloc(void)
4274{
4275 struct rbd_spec *spec;
4276
4277 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4278 if (!spec)
4279 return NULL;
04077599
ID
4280
4281 spec->pool_id = CEPH_NOPOOL;
4282 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4283 kref_init(&spec->kref);
4284
8b8fb99c
AE
4285 return spec;
4286}
4287
4288static void rbd_spec_free(struct kref *kref)
4289{
4290 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4291
4292 kfree(spec->pool_name);
4293 kfree(spec->image_id);
4294 kfree(spec->image_name);
4295 kfree(spec->snap_name);
4296 kfree(spec);
4297}
4298
1643dfa4 4299static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4300{
99d16943 4301 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4302 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4303
c41d13a3 4304 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4305 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4306 kfree(rbd_dev->config_info);
c41d13a3 4307
dd5ac32d
ID
4308 rbd_put_client(rbd_dev->rbd_client);
4309 rbd_spec_put(rbd_dev->spec);
4310 kfree(rbd_dev->opts);
4311 kfree(rbd_dev);
1643dfa4
ID
4312}
4313
4314static void rbd_dev_release(struct device *dev)
4315{
4316 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4317 bool need_put = !!rbd_dev->opts;
4318
4319 if (need_put) {
4320 destroy_workqueue(rbd_dev->task_wq);
4321 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4322 }
4323
4324 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4325
4326 /*
4327 * This is racy, but way better than putting module outside of
4328 * the release callback. The race window is pretty small, so
4329 * doing something similar to dm (dm-builtin.c) is overkill.
4330 */
4331 if (need_put)
4332 module_put(THIS_MODULE);
4333}
4334
1643dfa4
ID
4335static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4336 struct rbd_spec *spec)
c53d5893
AE
4337{
4338 struct rbd_device *rbd_dev;
4339
1643dfa4 4340 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4341 if (!rbd_dev)
4342 return NULL;
4343
4344 spin_lock_init(&rbd_dev->lock);
4345 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4346 init_rwsem(&rbd_dev->header_rwsem);
4347
7e97332e 4348 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4349 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4350 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4351
99d16943
ID
4352 mutex_init(&rbd_dev->watch_mutex);
4353 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4354 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4355
ed95b21a
ID
4356 init_rwsem(&rbd_dev->lock_rwsem);
4357 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4358 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4359 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4360 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4361 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4362 init_waitqueue_head(&rbd_dev->lock_waitq);
4363
dd5ac32d
ID
4364 rbd_dev->dev.bus = &rbd_bus_type;
4365 rbd_dev->dev.type = &rbd_device_type;
4366 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4367 device_initialize(&rbd_dev->dev);
4368
c53d5893 4369 rbd_dev->rbd_client = rbdc;
d147543d 4370 rbd_dev->spec = spec;
0903e875 4371
1643dfa4
ID
4372 return rbd_dev;
4373}
4374
4375/*
4376 * Create a mapping rbd_dev.
4377 */
4378static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4379 struct rbd_spec *spec,
4380 struct rbd_options *opts)
4381{
4382 struct rbd_device *rbd_dev;
4383
4384 rbd_dev = __rbd_dev_create(rbdc, spec);
4385 if (!rbd_dev)
4386 return NULL;
4387
4388 rbd_dev->opts = opts;
4389
4390 /* get an id and fill in device name */
4391 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4392 minor_to_rbd_dev_id(1 << MINORBITS),
4393 GFP_KERNEL);
4394 if (rbd_dev->dev_id < 0)
4395 goto fail_rbd_dev;
4396
4397 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4398 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4399 rbd_dev->name);
4400 if (!rbd_dev->task_wq)
4401 goto fail_dev_id;
dd5ac32d 4402
1643dfa4
ID
4403 /* we have a ref from do_rbd_add() */
4404 __module_get(THIS_MODULE);
dd5ac32d 4405
1643dfa4 4406 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4407 return rbd_dev;
1643dfa4
ID
4408
4409fail_dev_id:
4410 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4411fail_rbd_dev:
4412 rbd_dev_free(rbd_dev);
4413 return NULL;
c53d5893
AE
4414}
4415
4416static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4417{
dd5ac32d
ID
4418 if (rbd_dev)
4419 put_device(&rbd_dev->dev);
c53d5893
AE
4420}
4421
9d475de5
AE
4422/*
4423 * Get the size and object order for an image snapshot, or if
4424 * snap_id is CEPH_NOSNAP, gets this information for the base
4425 * image.
4426 */
4427static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4428 u8 *order, u64 *snap_size)
4429{
4430 __le64 snapid = cpu_to_le64(snap_id);
4431 int ret;
4432 struct {
4433 u8 order;
4434 __le64 size;
4435 } __attribute__ ((packed)) size_buf = { 0 };
4436
ecd4a68a
ID
4437 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4438 &rbd_dev->header_oloc, "get_size",
4439 &snapid, sizeof(snapid),
4440 &size_buf, sizeof(size_buf));
36be9a76 4441 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4442 if (ret < 0)
4443 return ret;
57385b51
AE
4444 if (ret < sizeof (size_buf))
4445 return -ERANGE;
9d475de5 4446
c3545579 4447 if (order) {
c86f86e9 4448 *order = size_buf.order;
c3545579
JD
4449 dout(" order %u", (unsigned int)*order);
4450 }
9d475de5
AE
4451 *snap_size = le64_to_cpu(size_buf.size);
4452
c3545579
JD
4453 dout(" snap_id 0x%016llx snap_size = %llu\n",
4454 (unsigned long long)snap_id,
57385b51 4455 (unsigned long long)*snap_size);
9d475de5
AE
4456
4457 return 0;
4458}
4459
4460static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4461{
4462 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4463 &rbd_dev->header.obj_order,
4464 &rbd_dev->header.image_size);
4465}
4466
1e130199
AE
4467static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4468{
4469 void *reply_buf;
4470 int ret;
4471 void *p;
4472
4473 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4474 if (!reply_buf)
4475 return -ENOMEM;
4476
ecd4a68a
ID
4477 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4478 &rbd_dev->header_oloc, "get_object_prefix",
4479 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4480 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4481 if (ret < 0)
4482 goto out;
4483
4484 p = reply_buf;
4485 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4486 p + ret, NULL, GFP_NOIO);
4487 ret = 0;
1e130199
AE
4488
4489 if (IS_ERR(rbd_dev->header.object_prefix)) {
4490 ret = PTR_ERR(rbd_dev->header.object_prefix);
4491 rbd_dev->header.object_prefix = NULL;
4492 } else {
4493 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4494 }
1e130199
AE
4495out:
4496 kfree(reply_buf);
4497
4498 return ret;
4499}
4500
b1b5402a
AE
4501static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4502 u64 *snap_features)
4503{
4504 __le64 snapid = cpu_to_le64(snap_id);
4505 struct {
4506 __le64 features;
4507 __le64 incompat;
4157976b 4508 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4509 u64 unsup;
b1b5402a
AE
4510 int ret;
4511
ecd4a68a
ID
4512 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4513 &rbd_dev->header_oloc, "get_features",
4514 &snapid, sizeof(snapid),
4515 &features_buf, sizeof(features_buf));
36be9a76 4516 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4517 if (ret < 0)
4518 return ret;
57385b51
AE
4519 if (ret < sizeof (features_buf))
4520 return -ERANGE;
d889140c 4521
d3767f0f
ID
4522 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4523 if (unsup) {
4524 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4525 unsup);
b8f5c6ed 4526 return -ENXIO;
d3767f0f 4527 }
d889140c 4528
b1b5402a
AE
4529 *snap_features = le64_to_cpu(features_buf.features);
4530
4531 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4532 (unsigned long long)snap_id,
4533 (unsigned long long)*snap_features,
4534 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4535
4536 return 0;
4537}
4538
4539static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4540{
4541 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4542 &rbd_dev->header.features);
4543}
4544
86b00e0d
AE
4545static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4546{
4547 struct rbd_spec *parent_spec;
4548 size_t size;
4549 void *reply_buf = NULL;
4550 __le64 snapid;
4551 void *p;
4552 void *end;
642a2537 4553 u64 pool_id;
86b00e0d 4554 char *image_id;
3b5cf2a2 4555 u64 snap_id;
86b00e0d 4556 u64 overlap;
86b00e0d
AE
4557 int ret;
4558
4559 parent_spec = rbd_spec_alloc();
4560 if (!parent_spec)
4561 return -ENOMEM;
4562
4563 size = sizeof (__le64) + /* pool_id */
4564 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4565 sizeof (__le64) + /* snap_id */
4566 sizeof (__le64); /* overlap */
4567 reply_buf = kmalloc(size, GFP_KERNEL);
4568 if (!reply_buf) {
4569 ret = -ENOMEM;
4570 goto out_err;
4571 }
4572
4d9b67cd 4573 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
4574 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4575 &rbd_dev->header_oloc, "get_parent",
4576 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4577 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
4578 if (ret < 0)
4579 goto out_err;
4580
86b00e0d 4581 p = reply_buf;
57385b51
AE
4582 end = reply_buf + ret;
4583 ret = -ERANGE;
642a2537 4584 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
4585 if (pool_id == CEPH_NOPOOL) {
4586 /*
4587 * Either the parent never existed, or we have
4588 * record of it but the image got flattened so it no
4589 * longer has a parent. When the parent of a
4590 * layered image disappears we immediately set the
4591 * overlap to 0. The effect of this is that all new
4592 * requests will be treated as if the image had no
4593 * parent.
4594 */
4595 if (rbd_dev->parent_overlap) {
4596 rbd_dev->parent_overlap = 0;
392a9dad
AE
4597 rbd_dev_parent_put(rbd_dev);
4598 pr_info("%s: clone image has been flattened\n",
4599 rbd_dev->disk->disk_name);
4600 }
4601
86b00e0d 4602 goto out; /* No parent? No problem. */
392a9dad 4603 }
86b00e0d 4604
0903e875
AE
4605 /* The ceph file layout needs to fit pool id in 32 bits */
4606
4607 ret = -EIO;
642a2537 4608 if (pool_id > (u64)U32_MAX) {
9584d508 4609 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 4610 (unsigned long long)pool_id, U32_MAX);
57385b51 4611 goto out_err;
c0cd10db 4612 }
0903e875 4613
979ed480 4614 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
4615 if (IS_ERR(image_id)) {
4616 ret = PTR_ERR(image_id);
4617 goto out_err;
4618 }
3b5cf2a2 4619 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
4620 ceph_decode_64_safe(&p, end, overlap, out_err);
4621
3b5cf2a2
AE
4622 /*
4623 * The parent won't change (except when the clone is
4624 * flattened, already handled that). So we only need to
4625 * record the parent spec we have not already done so.
4626 */
4627 if (!rbd_dev->parent_spec) {
4628 parent_spec->pool_id = pool_id;
4629 parent_spec->image_id = image_id;
4630 parent_spec->snap_id = snap_id;
70cf49cf
AE
4631 rbd_dev->parent_spec = parent_spec;
4632 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
4633 } else {
4634 kfree(image_id);
3b5cf2a2
AE
4635 }
4636
4637 /*
cf32bd9c
ID
4638 * We always update the parent overlap. If it's zero we issue
4639 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 4640 */
3b5cf2a2 4641 if (!overlap) {
3b5cf2a2 4642 if (parent_spec) {
cf32bd9c
ID
4643 /* refresh, careful to warn just once */
4644 if (rbd_dev->parent_overlap)
4645 rbd_warn(rbd_dev,
4646 "clone now standalone (overlap became 0)");
3b5cf2a2 4647 } else {
cf32bd9c
ID
4648 /* initial probe */
4649 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 4650 }
70cf49cf 4651 }
cf32bd9c
ID
4652 rbd_dev->parent_overlap = overlap;
4653
86b00e0d
AE
4654out:
4655 ret = 0;
4656out_err:
4657 kfree(reply_buf);
4658 rbd_spec_put(parent_spec);
4659
4660 return ret;
4661}
4662
cc070d59
AE
4663static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4664{
4665 struct {
4666 __le64 stripe_unit;
4667 __le64 stripe_count;
4668 } __attribute__ ((packed)) striping_info_buf = { 0 };
4669 size_t size = sizeof (striping_info_buf);
4670 void *p;
cc070d59
AE
4671 int ret;
4672
ecd4a68a
ID
4673 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4674 &rbd_dev->header_oloc, "get_stripe_unit_count",
4675 NULL, 0, &striping_info_buf, size);
cc070d59
AE
4676 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4677 if (ret < 0)
4678 return ret;
4679 if (ret < size)
4680 return -ERANGE;
4681
cc070d59 4682 p = &striping_info_buf;
b1331852
ID
4683 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4684 rbd_dev->header.stripe_count = ceph_decode_64(&p);
cc070d59
AE
4685 return 0;
4686}
4687
7e97332e
ID
4688static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4689{
4690 __le64 data_pool_id;
4691 int ret;
4692
4693 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4694 &rbd_dev->header_oloc, "get_data_pool",
4695 NULL, 0, &data_pool_id, sizeof(data_pool_id));
4696 if (ret < 0)
4697 return ret;
4698 if (ret < sizeof(data_pool_id))
4699 return -EBADMSG;
4700
4701 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4702 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4703 return 0;
4704}
4705
9e15b77d
AE
4706static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4707{
ecd4a68a 4708 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
4709 size_t image_id_size;
4710 char *image_id;
4711 void *p;
4712 void *end;
4713 size_t size;
4714 void *reply_buf = NULL;
4715 size_t len = 0;
4716 char *image_name = NULL;
4717 int ret;
4718
4719 rbd_assert(!rbd_dev->spec->image_name);
4720
69e7a02f
AE
4721 len = strlen(rbd_dev->spec->image_id);
4722 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4723 image_id = kmalloc(image_id_size, GFP_KERNEL);
4724 if (!image_id)
4725 return NULL;
4726
4727 p = image_id;
4157976b 4728 end = image_id + image_id_size;
57385b51 4729 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4730
4731 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4732 reply_buf = kmalloc(size, GFP_KERNEL);
4733 if (!reply_buf)
4734 goto out;
4735
ecd4a68a
ID
4736 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4737 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4738 "dir_get_name", image_id, image_id_size,
4739 reply_buf, size);
9e15b77d
AE
4740 if (ret < 0)
4741 goto out;
4742 p = reply_buf;
f40eb349
AE
4743 end = reply_buf + ret;
4744
9e15b77d
AE
4745 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4746 if (IS_ERR(image_name))
4747 image_name = NULL;
4748 else
4749 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4750out:
4751 kfree(reply_buf);
4752 kfree(image_id);
4753
4754 return image_name;
4755}
4756
2ad3d716
AE
4757static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4758{
4759 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4760 const char *snap_name;
4761 u32 which = 0;
4762
4763 /* Skip over names until we find the one we are looking for */
4764
4765 snap_name = rbd_dev->header.snap_names;
4766 while (which < snapc->num_snaps) {
4767 if (!strcmp(name, snap_name))
4768 return snapc->snaps[which];
4769 snap_name += strlen(snap_name) + 1;
4770 which++;
4771 }
4772 return CEPH_NOSNAP;
4773}
4774
4775static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4776{
4777 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4778 u32 which;
4779 bool found = false;
4780 u64 snap_id;
4781
4782 for (which = 0; !found && which < snapc->num_snaps; which++) {
4783 const char *snap_name;
4784
4785 snap_id = snapc->snaps[which];
4786 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4787 if (IS_ERR(snap_name)) {
4788 /* ignore no-longer existing snapshots */
4789 if (PTR_ERR(snap_name) == -ENOENT)
4790 continue;
4791 else
4792 break;
4793 }
2ad3d716
AE
4794 found = !strcmp(name, snap_name);
4795 kfree(snap_name);
4796 }
4797 return found ? snap_id : CEPH_NOSNAP;
4798}
4799
4800/*
4801 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4802 * no snapshot by that name is found, or if an error occurs.
4803 */
4804static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4805{
4806 if (rbd_dev->image_format == 1)
4807 return rbd_v1_snap_id_by_name(rbd_dev, name);
4808
4809 return rbd_v2_snap_id_by_name(rbd_dev, name);
4810}
4811
9e15b77d 4812/*
04077599
ID
4813 * An image being mapped will have everything but the snap id.
4814 */
4815static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4816{
4817 struct rbd_spec *spec = rbd_dev->spec;
4818
4819 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4820 rbd_assert(spec->image_id && spec->image_name);
4821 rbd_assert(spec->snap_name);
4822
4823 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4824 u64 snap_id;
4825
4826 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4827 if (snap_id == CEPH_NOSNAP)
4828 return -ENOENT;
4829
4830 spec->snap_id = snap_id;
4831 } else {
4832 spec->snap_id = CEPH_NOSNAP;
4833 }
4834
4835 return 0;
4836}
4837
4838/*
4839 * A parent image will have all ids but none of the names.
e1d4213f 4840 *
04077599
ID
4841 * All names in an rbd spec are dynamically allocated. It's OK if we
4842 * can't figure out the name for an image id.
9e15b77d 4843 */
04077599 4844static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 4845{
2e9f7f1c
AE
4846 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4847 struct rbd_spec *spec = rbd_dev->spec;
4848 const char *pool_name;
4849 const char *image_name;
4850 const char *snap_name;
9e15b77d
AE
4851 int ret;
4852
04077599
ID
4853 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4854 rbd_assert(spec->image_id);
4855 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 4856
2e9f7f1c 4857 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4858
2e9f7f1c
AE
4859 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4860 if (!pool_name) {
4861 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4862 return -EIO;
4863 }
2e9f7f1c
AE
4864 pool_name = kstrdup(pool_name, GFP_KERNEL);
4865 if (!pool_name)
9e15b77d
AE
4866 return -ENOMEM;
4867
4868 /* Fetch the image name; tolerate failure here */
4869
2e9f7f1c
AE
4870 image_name = rbd_dev_image_name(rbd_dev);
4871 if (!image_name)
06ecc6cb 4872 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4873
04077599 4874 /* Fetch the snapshot name */
9e15b77d 4875
2e9f7f1c 4876 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4877 if (IS_ERR(snap_name)) {
4878 ret = PTR_ERR(snap_name);
9e15b77d 4879 goto out_err;
2e9f7f1c
AE
4880 }
4881
4882 spec->pool_name = pool_name;
4883 spec->image_name = image_name;
4884 spec->snap_name = snap_name;
9e15b77d
AE
4885
4886 return 0;
04077599 4887
9e15b77d 4888out_err:
2e9f7f1c
AE
4889 kfree(image_name);
4890 kfree(pool_name);
9e15b77d
AE
4891 return ret;
4892}
4893
cc4a38bd 4894static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4895{
4896 size_t size;
4897 int ret;
4898 void *reply_buf;
4899 void *p;
4900 void *end;
4901 u64 seq;
4902 u32 snap_count;
4903 struct ceph_snap_context *snapc;
4904 u32 i;
4905
4906 /*
4907 * We'll need room for the seq value (maximum snapshot id),
4908 * snapshot count, and array of that many snapshot ids.
4909 * For now we have a fixed upper limit on the number we're
4910 * prepared to receive.
4911 */
4912 size = sizeof (__le64) + sizeof (__le32) +
4913 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4914 reply_buf = kzalloc(size, GFP_KERNEL);
4915 if (!reply_buf)
4916 return -ENOMEM;
4917
ecd4a68a
ID
4918 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4919 &rbd_dev->header_oloc, "get_snapcontext",
4920 NULL, 0, reply_buf, size);
36be9a76 4921 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4922 if (ret < 0)
4923 goto out;
4924
35d489f9 4925 p = reply_buf;
57385b51
AE
4926 end = reply_buf + ret;
4927 ret = -ERANGE;
35d489f9
AE
4928 ceph_decode_64_safe(&p, end, seq, out);
4929 ceph_decode_32_safe(&p, end, snap_count, out);
4930
4931 /*
4932 * Make sure the reported number of snapshot ids wouldn't go
4933 * beyond the end of our buffer. But before checking that,
4934 * make sure the computed size of the snapshot context we
4935 * allocate is representable in a size_t.
4936 */
4937 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4938 / sizeof (u64)) {
4939 ret = -EINVAL;
4940 goto out;
4941 }
4942 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4943 goto out;
468521c1 4944 ret = 0;
35d489f9 4945
812164f8 4946 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4947 if (!snapc) {
4948 ret = -ENOMEM;
4949 goto out;
4950 }
35d489f9 4951 snapc->seq = seq;
35d489f9
AE
4952 for (i = 0; i < snap_count; i++)
4953 snapc->snaps[i] = ceph_decode_64(&p);
4954
49ece554 4955 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4956 rbd_dev->header.snapc = snapc;
4957
4958 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4959 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4960out:
4961 kfree(reply_buf);
4962
57385b51 4963 return ret;
35d489f9
AE
4964}
4965
54cac61f
AE
4966static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4967 u64 snap_id)
b8b1e2db
AE
4968{
4969 size_t size;
4970 void *reply_buf;
54cac61f 4971 __le64 snapid;
b8b1e2db
AE
4972 int ret;
4973 void *p;
4974 void *end;
b8b1e2db
AE
4975 char *snap_name;
4976
4977 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4978 reply_buf = kmalloc(size, GFP_KERNEL);
4979 if (!reply_buf)
4980 return ERR_PTR(-ENOMEM);
4981
54cac61f 4982 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
4983 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4984 &rbd_dev->header_oloc, "get_snapshot_name",
4985 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4986 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4987 if (ret < 0) {
4988 snap_name = ERR_PTR(ret);
b8b1e2db 4989 goto out;
f40eb349 4990 }
b8b1e2db
AE
4991
4992 p = reply_buf;
f40eb349 4993 end = reply_buf + ret;
e5c35534 4994 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4995 if (IS_ERR(snap_name))
b8b1e2db 4996 goto out;
b8b1e2db 4997
f40eb349 4998 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4999 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5000out:
5001 kfree(reply_buf);
5002
f40eb349 5003 return snap_name;
b8b1e2db
AE
5004}
5005
2df3fac7 5006static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5007{
2df3fac7 5008 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5009 int ret;
117973fb 5010
1617e40c
JD
5011 ret = rbd_dev_v2_image_size(rbd_dev);
5012 if (ret)
cfbf6377 5013 return ret;
1617e40c 5014
2df3fac7
AE
5015 if (first_time) {
5016 ret = rbd_dev_v2_header_onetime(rbd_dev);
5017 if (ret)
cfbf6377 5018 return ret;
2df3fac7
AE
5019 }
5020
cc4a38bd 5021 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5022 if (ret && first_time) {
5023 kfree(rbd_dev->header.object_prefix);
5024 rbd_dev->header.object_prefix = NULL;
5025 }
117973fb
AE
5026
5027 return ret;
5028}
5029
a720ae09
ID
5030static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5031{
5032 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5033
5034 if (rbd_dev->image_format == 1)
5035 return rbd_dev_v1_header_info(rbd_dev);
5036
5037 return rbd_dev_v2_header_info(rbd_dev);
5038}
5039
e28fff26
AE
5040/*
5041 * Skips over white space at *buf, and updates *buf to point to the
5042 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5043 * the token (string of non-white space characters) found. Note
5044 * that *buf must be terminated with '\0'.
e28fff26
AE
5045 */
5046static inline size_t next_token(const char **buf)
5047{
5048 /*
5049 * These are the characters that produce nonzero for
5050 * isspace() in the "C" and "POSIX" locales.
5051 */
5052 const char *spaces = " \f\n\r\t\v";
5053
5054 *buf += strspn(*buf, spaces); /* Find start of token */
5055
5056 return strcspn(*buf, spaces); /* Return token length */
5057}
5058
ea3352f4
AE
5059/*
5060 * Finds the next token in *buf, dynamically allocates a buffer big
5061 * enough to hold a copy of it, and copies the token into the new
5062 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5063 * that a duplicate buffer is created even for a zero-length token.
5064 *
5065 * Returns a pointer to the newly-allocated duplicate, or a null
5066 * pointer if memory for the duplicate was not available. If
5067 * the lenp argument is a non-null pointer, the length of the token
5068 * (not including the '\0') is returned in *lenp.
5069 *
5070 * If successful, the *buf pointer will be updated to point beyond
5071 * the end of the found token.
5072 *
5073 * Note: uses GFP_KERNEL for allocation.
5074 */
5075static inline char *dup_token(const char **buf, size_t *lenp)
5076{
5077 char *dup;
5078 size_t len;
5079
5080 len = next_token(buf);
4caf35f9 5081 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5082 if (!dup)
5083 return NULL;
ea3352f4
AE
5084 *(dup + len) = '\0';
5085 *buf += len;
5086
5087 if (lenp)
5088 *lenp = len;
5089
5090 return dup;
5091}
5092
a725f65e 5093/*
859c31df
AE
5094 * Parse the options provided for an "rbd add" (i.e., rbd image
5095 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5096 * and the data written is passed here via a NUL-terminated buffer.
5097 * Returns 0 if successful or an error code otherwise.
d22f76e7 5098 *
859c31df
AE
5099 * The information extracted from these options is recorded in
5100 * the other parameters which return dynamically-allocated
5101 * structures:
5102 * ceph_opts
5103 * The address of a pointer that will refer to a ceph options
5104 * structure. Caller must release the returned pointer using
5105 * ceph_destroy_options() when it is no longer needed.
5106 * rbd_opts
5107 * Address of an rbd options pointer. Fully initialized by
5108 * this function; caller must release with kfree().
5109 * spec
5110 * Address of an rbd image specification pointer. Fully
5111 * initialized by this function based on parsed options.
5112 * Caller must release with rbd_spec_put().
5113 *
5114 * The options passed take this form:
5115 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5116 * where:
5117 * <mon_addrs>
5118 * A comma-separated list of one or more monitor addresses.
5119 * A monitor address is an ip address, optionally followed
5120 * by a port number (separated by a colon).
5121 * I.e.: ip1[:port1][,ip2[:port2]...]
5122 * <options>
5123 * A comma-separated list of ceph and/or rbd options.
5124 * <pool_name>
5125 * The name of the rados pool containing the rbd image.
5126 * <image_name>
5127 * The name of the image in that pool to map.
5128 * <snap_id>
5129 * An optional snapshot id. If provided, the mapping will
5130 * present data from the image at the time that snapshot was
5131 * created. The image head is used if no snapshot id is
5132 * provided. Snapshot mappings are always read-only.
a725f65e 5133 */
859c31df 5134static int rbd_add_parse_args(const char *buf,
dc79b113 5135 struct ceph_options **ceph_opts,
859c31df
AE
5136 struct rbd_options **opts,
5137 struct rbd_spec **rbd_spec)
e28fff26 5138{
d22f76e7 5139 size_t len;
859c31df 5140 char *options;
0ddebc0c 5141 const char *mon_addrs;
ecb4dc22 5142 char *snap_name;
0ddebc0c 5143 size_t mon_addrs_size;
859c31df 5144 struct rbd_spec *spec = NULL;
4e9afeba 5145 struct rbd_options *rbd_opts = NULL;
859c31df 5146 struct ceph_options *copts;
dc79b113 5147 int ret;
e28fff26
AE
5148
5149 /* The first four tokens are required */
5150
7ef3214a 5151 len = next_token(&buf);
4fb5d671
AE
5152 if (!len) {
5153 rbd_warn(NULL, "no monitor address(es) provided");
5154 return -EINVAL;
5155 }
0ddebc0c 5156 mon_addrs = buf;
f28e565a 5157 mon_addrs_size = len + 1;
7ef3214a 5158 buf += len;
a725f65e 5159
dc79b113 5160 ret = -EINVAL;
f28e565a
AE
5161 options = dup_token(&buf, NULL);
5162 if (!options)
dc79b113 5163 return -ENOMEM;
4fb5d671
AE
5164 if (!*options) {
5165 rbd_warn(NULL, "no options provided");
5166 goto out_err;
5167 }
e28fff26 5168
859c31df
AE
5169 spec = rbd_spec_alloc();
5170 if (!spec)
f28e565a 5171 goto out_mem;
859c31df
AE
5172
5173 spec->pool_name = dup_token(&buf, NULL);
5174 if (!spec->pool_name)
5175 goto out_mem;
4fb5d671
AE
5176 if (!*spec->pool_name) {
5177 rbd_warn(NULL, "no pool name provided");
5178 goto out_err;
5179 }
e28fff26 5180
69e7a02f 5181 spec->image_name = dup_token(&buf, NULL);
859c31df 5182 if (!spec->image_name)
f28e565a 5183 goto out_mem;
4fb5d671
AE
5184 if (!*spec->image_name) {
5185 rbd_warn(NULL, "no image name provided");
5186 goto out_err;
5187 }
d4b125e9 5188
f28e565a
AE
5189 /*
5190 * Snapshot name is optional; default is to use "-"
5191 * (indicating the head/no snapshot).
5192 */
3feeb894 5193 len = next_token(&buf);
820a5f3e 5194 if (!len) {
3feeb894
AE
5195 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5196 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5197 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5198 ret = -ENAMETOOLONG;
f28e565a 5199 goto out_err;
849b4260 5200 }
ecb4dc22
AE
5201 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5202 if (!snap_name)
f28e565a 5203 goto out_mem;
ecb4dc22
AE
5204 *(snap_name + len) = '\0';
5205 spec->snap_name = snap_name;
e5c35534 5206
0ddebc0c 5207 /* Initialize all rbd options to the defaults */
e28fff26 5208
4e9afeba
AE
5209 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5210 if (!rbd_opts)
5211 goto out_mem;
5212
5213 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5214 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
34f55d0b 5215 rbd_opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
80de1912 5216 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
e010dd0a 5217 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
d9360540 5218 rbd_opts->trim = RBD_TRIM_DEFAULT;
d22f76e7 5219
859c31df 5220 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5221 mon_addrs + mon_addrs_size - 1,
4e9afeba 5222 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5223 if (IS_ERR(copts)) {
5224 ret = PTR_ERR(copts);
dc79b113
AE
5225 goto out_err;
5226 }
859c31df
AE
5227 kfree(options);
5228
5229 *ceph_opts = copts;
4e9afeba 5230 *opts = rbd_opts;
859c31df 5231 *rbd_spec = spec;
0ddebc0c 5232
dc79b113 5233 return 0;
f28e565a 5234out_mem:
dc79b113 5235 ret = -ENOMEM;
d22f76e7 5236out_err:
859c31df
AE
5237 kfree(rbd_opts);
5238 rbd_spec_put(spec);
f28e565a 5239 kfree(options);
d22f76e7 5240
dc79b113 5241 return ret;
a725f65e
AE
5242}
5243
e010dd0a
ID
5244static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5245{
5246 down_write(&rbd_dev->lock_rwsem);
5247 if (__rbd_is_lock_owner(rbd_dev))
5248 rbd_unlock(rbd_dev);
5249 up_write(&rbd_dev->lock_rwsem);
5250}
5251
5252static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5253{
2f18d466
ID
5254 int ret;
5255
e010dd0a
ID
5256 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5257 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5258 return -EINVAL;
5259 }
5260
5261 /* FIXME: "rbd map --exclusive" should be in interruptible */
5262 down_read(&rbd_dev->lock_rwsem);
2f18d466 5263 ret = rbd_wait_state_locked(rbd_dev, true);
e010dd0a 5264 up_read(&rbd_dev->lock_rwsem);
2f18d466 5265 if (ret) {
e010dd0a
ID
5266 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5267 return -EROFS;
5268 }
5269
5270 return 0;
5271}
5272
589d30e0
AE
5273/*
5274 * An rbd format 2 image has a unique identifier, distinct from the
5275 * name given to it by the user. Internally, that identifier is
5276 * what's used to specify the names of objects related to the image.
5277 *
5278 * A special "rbd id" object is used to map an rbd image name to its
5279 * id. If that object doesn't exist, then there is no v2 rbd image
5280 * with the supplied name.
5281 *
5282 * This function will record the given rbd_dev's image_id field if
5283 * it can be determined, and in that case will return 0. If any
5284 * errors occur a negative errno will be returned and the rbd_dev's
5285 * image_id field will be unchanged (and should be NULL).
5286 */
5287static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5288{
5289 int ret;
5290 size_t size;
ecd4a68a 5291 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5292 void *response;
c0fba368 5293 char *image_id;
2f82ee54 5294
2c0d0a10
AE
5295 /*
5296 * When probing a parent image, the image id is already
5297 * known (and the image name likely is not). There's no
c0fba368
AE
5298 * need to fetch the image id again in this case. We
5299 * do still need to set the image format though.
2c0d0a10 5300 */
c0fba368
AE
5301 if (rbd_dev->spec->image_id) {
5302 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5303
2c0d0a10 5304 return 0;
c0fba368 5305 }
2c0d0a10 5306
589d30e0
AE
5307 /*
5308 * First, see if the format 2 image id file exists, and if
5309 * so, get the image's persistent id from it.
5310 */
ecd4a68a
ID
5311 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5312 rbd_dev->spec->image_name);
5313 if (ret)
5314 return ret;
5315
5316 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5317
5318 /* Response will be an encoded string, which includes a length */
5319
5320 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5321 response = kzalloc(size, GFP_NOIO);
5322 if (!response) {
5323 ret = -ENOMEM;
5324 goto out;
5325 }
5326
c0fba368
AE
5327 /* If it doesn't exist we'll assume it's a format 1 image */
5328
ecd4a68a
ID
5329 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5330 "get_id", NULL, 0,
5331 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5332 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5333 if (ret == -ENOENT) {
5334 image_id = kstrdup("", GFP_KERNEL);
5335 ret = image_id ? 0 : -ENOMEM;
5336 if (!ret)
5337 rbd_dev->image_format = 1;
7dd440c9 5338 } else if (ret >= 0) {
c0fba368
AE
5339 void *p = response;
5340
5341 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5342 NULL, GFP_NOIO);
461f758a 5343 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5344 if (!ret)
5345 rbd_dev->image_format = 2;
c0fba368
AE
5346 }
5347
5348 if (!ret) {
5349 rbd_dev->spec->image_id = image_id;
5350 dout("image_id is %s\n", image_id);
589d30e0
AE
5351 }
5352out:
5353 kfree(response);
ecd4a68a 5354 ceph_oid_destroy(&oid);
589d30e0
AE
5355 return ret;
5356}
5357
3abef3b3
AE
5358/*
5359 * Undo whatever state changes are made by v1 or v2 header info
5360 * call.
5361 */
6fd48b3b
AE
5362static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5363{
5364 struct rbd_image_header *header;
5365
e69b8d41 5366 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5367
5368 /* Free dynamic fields from the header, then zero it out */
5369
5370 header = &rbd_dev->header;
812164f8 5371 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5372 kfree(header->snap_sizes);
5373 kfree(header->snap_names);
5374 kfree(header->object_prefix);
5375 memset(header, 0, sizeof (*header));
5376}
5377
2df3fac7 5378static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5379{
5380 int ret;
a30b71b9 5381
1e130199 5382 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5383 if (ret)
b1b5402a
AE
5384 goto out_err;
5385
2df3fac7
AE
5386 /*
5387 * Get the and check features for the image. Currently the
5388 * features are assumed to never change.
5389 */
b1b5402a 5390 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5391 if (ret)
9d475de5 5392 goto out_err;
35d489f9 5393
cc070d59
AE
5394 /* If the image supports fancy striping, get its parameters */
5395
5396 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5397 ret = rbd_dev_v2_striping_info(rbd_dev);
5398 if (ret < 0)
5399 goto out_err;
5400 }
a30b71b9 5401
7e97332e
ID
5402 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5403 ret = rbd_dev_v2_data_pool(rbd_dev);
5404 if (ret)
5405 goto out_err;
5406 }
5407
263423f8 5408 rbd_init_layout(rbd_dev);
35152979 5409 return 0;
263423f8 5410
9d475de5 5411out_err:
642a2537 5412 rbd_dev->header.features = 0;
1e130199
AE
5413 kfree(rbd_dev->header.object_prefix);
5414 rbd_dev->header.object_prefix = NULL;
9d475de5 5415 return ret;
a30b71b9
AE
5416}
5417
6d69bb53
ID
5418/*
5419 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5420 * rbd_dev_image_probe() recursion depth, which means it's also the
5421 * length of the already discovered part of the parent chain.
5422 */
5423static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5424{
2f82ee54 5425 struct rbd_device *parent = NULL;
124afba2
AE
5426 int ret;
5427
5428 if (!rbd_dev->parent_spec)
5429 return 0;
124afba2 5430
6d69bb53
ID
5431 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5432 pr_info("parent chain is too long (%d)\n", depth);
5433 ret = -EINVAL;
5434 goto out_err;
5435 }
5436
1643dfa4 5437 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5438 if (!parent) {
5439 ret = -ENOMEM;
124afba2 5440 goto out_err;
1f2c6651
ID
5441 }
5442
5443 /*
5444 * Images related by parent/child relationships always share
5445 * rbd_client and spec/parent_spec, so bump their refcounts.
5446 */
5447 __rbd_get_client(rbd_dev->rbd_client);
5448 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5449
6d69bb53 5450 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5451 if (ret < 0)
5452 goto out_err;
1f2c6651 5453
124afba2 5454 rbd_dev->parent = parent;
a2acd00e 5455 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5456 return 0;
1f2c6651 5457
124afba2 5458out_err:
1f2c6651 5459 rbd_dev_unparent(rbd_dev);
1761b229 5460 rbd_dev_destroy(parent);
124afba2
AE
5461 return ret;
5462}
5463
5769ed0c
ID
5464static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5465{
5466 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5467 rbd_dev_mapping_clear(rbd_dev);
5468 rbd_free_disk(rbd_dev);
5469 if (!single_major)
5470 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5471}
5472
811c6688
ID
5473/*
5474 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5475 * upon return.
5476 */
200a6a8b 5477static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5478{
83a06263 5479 int ret;
d1cf5788 5480
9b60e70b 5481 /* Record our major and minor device numbers. */
83a06263 5482
9b60e70b
ID
5483 if (!single_major) {
5484 ret = register_blkdev(0, rbd_dev->name);
5485 if (ret < 0)
1643dfa4 5486 goto err_out_unlock;
9b60e70b
ID
5487
5488 rbd_dev->major = ret;
5489 rbd_dev->minor = 0;
5490 } else {
5491 rbd_dev->major = rbd_major;
5492 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5493 }
83a06263
AE
5494
5495 /* Set up the blkdev mapping. */
5496
5497 ret = rbd_init_disk(rbd_dev);
5498 if (ret)
5499 goto err_out_blkdev;
5500
f35a4dee 5501 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5502 if (ret)
5503 goto err_out_disk;
bc1ecc65 5504
f35a4dee 5505 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
9568c93e 5506 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
f35a4dee 5507
5769ed0c 5508 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 5509 if (ret)
f5ee37bd 5510 goto err_out_mapping;
83a06263 5511
129b79d4 5512 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5513 up_write(&rbd_dev->header_rwsem);
5769ed0c 5514 return 0;
2f82ee54 5515
f35a4dee
AE
5516err_out_mapping:
5517 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5518err_out_disk:
5519 rbd_free_disk(rbd_dev);
5520err_out_blkdev:
9b60e70b
ID
5521 if (!single_major)
5522 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5523err_out_unlock:
5524 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5525 return ret;
5526}
5527
332bb12d
AE
5528static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5529{
5530 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5531 int ret;
332bb12d
AE
5532
5533 /* Record the header object name for this rbd image. */
5534
5535 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 5536 if (rbd_dev->image_format == 1)
c41d13a3
ID
5537 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5538 spec->image_name, RBD_SUFFIX);
332bb12d 5539 else
c41d13a3
ID
5540 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5541 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5542
c41d13a3 5543 return ret;
332bb12d
AE
5544}
5545
200a6a8b
AE
5546static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5547{
6fd48b3b 5548 rbd_dev_unprobe(rbd_dev);
fd22aef8
ID
5549 if (rbd_dev->opts)
5550 rbd_unregister_watch(rbd_dev);
6fd48b3b
AE
5551 rbd_dev->image_format = 0;
5552 kfree(rbd_dev->spec->image_id);
5553 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
5554}
5555
a30b71b9
AE
5556/*
5557 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
5558 * device. If this image is the one being mapped (i.e., not a
5559 * parent), initiate a watch on its header object before using that
5560 * object to get detailed information about the rbd image.
a30b71b9 5561 */
6d69bb53 5562static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
5563{
5564 int ret;
5565
5566 /*
3abef3b3
AE
5567 * Get the id from the image id object. Unless there's an
5568 * error, rbd_dev->spec->image_id will be filled in with
5569 * a dynamically-allocated string, and rbd_dev->image_format
5570 * will be set to either 1 or 2.
a30b71b9
AE
5571 */
5572 ret = rbd_dev_image_id(rbd_dev);
5573 if (ret)
c0fba368 5574 return ret;
c0fba368 5575
332bb12d
AE
5576 ret = rbd_dev_header_name(rbd_dev);
5577 if (ret)
5578 goto err_out_format;
5579
6d69bb53 5580 if (!depth) {
99d16943 5581 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
5582 if (ret) {
5583 if (ret == -ENOENT)
5584 pr_info("image %s/%s does not exist\n",
5585 rbd_dev->spec->pool_name,
5586 rbd_dev->spec->image_name);
c41d13a3 5587 goto err_out_format;
1fe48023 5588 }
1f3ef788 5589 }
b644de2b 5590
a720ae09 5591 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 5592 if (ret)
b644de2b 5593 goto err_out_watch;
83a06263 5594
04077599
ID
5595 /*
5596 * If this image is the one being mapped, we have pool name and
5597 * id, image name and id, and snap name - need to fill snap id.
5598 * Otherwise this is a parent image, identified by pool, image
5599 * and snap ids - need to fill in names for those ids.
5600 */
6d69bb53 5601 if (!depth)
04077599
ID
5602 ret = rbd_spec_fill_snap_id(rbd_dev);
5603 else
5604 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
5605 if (ret) {
5606 if (ret == -ENOENT)
5607 pr_info("snap %s/%s@%s does not exist\n",
5608 rbd_dev->spec->pool_name,
5609 rbd_dev->spec->image_name,
5610 rbd_dev->spec->snap_name);
33dca39f 5611 goto err_out_probe;
1fe48023 5612 }
9bb81c9b 5613
e8f59b59
ID
5614 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5615 ret = rbd_dev_v2_parent_info(rbd_dev);
5616 if (ret)
5617 goto err_out_probe;
5618
5619 /*
5620 * Need to warn users if this image is the one being
5621 * mapped and has a parent.
5622 */
6d69bb53 5623 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
5624 rbd_warn(rbd_dev,
5625 "WARNING: kernel layering is EXPERIMENTAL!");
5626 }
5627
6d69bb53 5628 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
5629 if (ret)
5630 goto err_out_probe;
5631
5632 dout("discovered format %u image, header name is %s\n",
c41d13a3 5633 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 5634 return 0;
e8f59b59 5635
6fd48b3b
AE
5636err_out_probe:
5637 rbd_dev_unprobe(rbd_dev);
b644de2b 5638err_out_watch:
6d69bb53 5639 if (!depth)
99d16943 5640 rbd_unregister_watch(rbd_dev);
332bb12d
AE
5641err_out_format:
5642 rbd_dev->image_format = 0;
5655c4d9
AE
5643 kfree(rbd_dev->spec->image_id);
5644 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
5645 return ret;
5646}
5647
9b60e70b
ID
5648static ssize_t do_rbd_add(struct bus_type *bus,
5649 const char *buf,
5650 size_t count)
602adf40 5651{
cb8627c7 5652 struct rbd_device *rbd_dev = NULL;
dc79b113 5653 struct ceph_options *ceph_opts = NULL;
4e9afeba 5654 struct rbd_options *rbd_opts = NULL;
859c31df 5655 struct rbd_spec *spec = NULL;
9d3997fd 5656 struct rbd_client *rbdc;
b51c83c2 5657 int rc;
602adf40
YS
5658
5659 if (!try_module_get(THIS_MODULE))
5660 return -ENODEV;
5661
602adf40 5662 /* parse add command */
859c31df 5663 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5664 if (rc < 0)
dd5ac32d 5665 goto out;
78cea76e 5666
9d3997fd
AE
5667 rbdc = rbd_get_client(ceph_opts);
5668 if (IS_ERR(rbdc)) {
5669 rc = PTR_ERR(rbdc);
0ddebc0c 5670 goto err_out_args;
9d3997fd 5671 }
602adf40 5672
602adf40 5673 /* pick the pool */
dd435855 5674 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
1fe48023
ID
5675 if (rc < 0) {
5676 if (rc == -ENOENT)
5677 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 5678 goto err_out_client;
1fe48023 5679 }
c0cd10db 5680 spec->pool_id = (u64)rc;
859c31df 5681
d147543d 5682 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
5683 if (!rbd_dev) {
5684 rc = -ENOMEM;
bd4ba655 5685 goto err_out_client;
b51c83c2 5686 }
c53d5893
AE
5687 rbdc = NULL; /* rbd_dev now owns this */
5688 spec = NULL; /* rbd_dev now owns this */
d147543d 5689 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 5690
0d6d1e9c
MC
5691 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5692 if (!rbd_dev->config_info) {
5693 rc = -ENOMEM;
5694 goto err_out_rbd_dev;
5695 }
5696
811c6688 5697 down_write(&rbd_dev->header_rwsem);
6d69bb53 5698 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
5699 if (rc < 0) {
5700 up_write(&rbd_dev->header_rwsem);
c53d5893 5701 goto err_out_rbd_dev;
0d6d1e9c 5702 }
05fd6f6f 5703
7ce4eef7 5704 /* If we are mapping a snapshot it must be marked read-only */
7ce4eef7 5705 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9568c93e 5706 rbd_dev->opts->read_only = true;
7ce4eef7 5707
b536f69a 5708 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 5709 if (rc)
8b679ec5 5710 goto err_out_image_probe;
3abef3b3 5711
e010dd0a
ID
5712 if (rbd_dev->opts->exclusive) {
5713 rc = rbd_add_acquire_lock(rbd_dev);
5714 if (rc)
5715 goto err_out_device_setup;
3abef3b3
AE
5716 }
5717
5769ed0c
ID
5718 /* Everything's ready. Announce the disk to the world. */
5719
5720 rc = device_add(&rbd_dev->dev);
5721 if (rc)
e010dd0a 5722 goto err_out_image_lock;
5769ed0c
ID
5723
5724 add_disk(rbd_dev->disk);
5725 /* see rbd_init_disk() */
5726 blk_put_queue(rbd_dev->disk->queue);
5727
5728 spin_lock(&rbd_dev_list_lock);
5729 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5730 spin_unlock(&rbd_dev_list_lock);
5731
5732 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5733 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5734 rbd_dev->header.features);
dd5ac32d
ID
5735 rc = count;
5736out:
5737 module_put(THIS_MODULE);
5738 return rc;
b536f69a 5739
e010dd0a
ID
5740err_out_image_lock:
5741 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
5742err_out_device_setup:
5743 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
5744err_out_image_probe:
5745 rbd_dev_image_release(rbd_dev);
c53d5893
AE
5746err_out_rbd_dev:
5747 rbd_dev_destroy(rbd_dev);
bd4ba655 5748err_out_client:
9d3997fd 5749 rbd_put_client(rbdc);
0ddebc0c 5750err_out_args:
859c31df 5751 rbd_spec_put(spec);
d147543d 5752 kfree(rbd_opts);
dd5ac32d 5753 goto out;
602adf40
YS
5754}
5755
9b60e70b
ID
5756static ssize_t rbd_add(struct bus_type *bus,
5757 const char *buf,
5758 size_t count)
5759{
5760 if (single_major)
5761 return -EINVAL;
5762
5763 return do_rbd_add(bus, buf, count);
5764}
5765
5766static ssize_t rbd_add_single_major(struct bus_type *bus,
5767 const char *buf,
5768 size_t count)
5769{
5770 return do_rbd_add(bus, buf, count);
5771}
5772
05a46afd
AE
5773static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5774{
ad945fc1 5775 while (rbd_dev->parent) {
05a46afd
AE
5776 struct rbd_device *first = rbd_dev;
5777 struct rbd_device *second = first->parent;
5778 struct rbd_device *third;
5779
5780 /*
5781 * Follow to the parent with no grandparent and
5782 * remove it.
5783 */
5784 while (second && (third = second->parent)) {
5785 first = second;
5786 second = third;
5787 }
ad945fc1 5788 rbd_assert(second);
8ad42cd0 5789 rbd_dev_image_release(second);
8b679ec5 5790 rbd_dev_destroy(second);
ad945fc1
AE
5791 first->parent = NULL;
5792 first->parent_overlap = 0;
5793
5794 rbd_assert(first->parent_spec);
05a46afd
AE
5795 rbd_spec_put(first->parent_spec);
5796 first->parent_spec = NULL;
05a46afd
AE
5797 }
5798}
5799
9b60e70b
ID
5800static ssize_t do_rbd_remove(struct bus_type *bus,
5801 const char *buf,
5802 size_t count)
602adf40
YS
5803{
5804 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5805 struct list_head *tmp;
5806 int dev_id;
0276dca6 5807 char opt_buf[6];
82a442d2 5808 bool already = false;
0276dca6 5809 bool force = false;
0d8189e1 5810 int ret;
602adf40 5811
0276dca6
MC
5812 dev_id = -1;
5813 opt_buf[0] = '\0';
5814 sscanf(buf, "%d %5s", &dev_id, opt_buf);
5815 if (dev_id < 0) {
5816 pr_err("dev_id out of range\n");
602adf40 5817 return -EINVAL;
0276dca6
MC
5818 }
5819 if (opt_buf[0] != '\0') {
5820 if (!strcmp(opt_buf, "force")) {
5821 force = true;
5822 } else {
5823 pr_err("bad remove option at '%s'\n", opt_buf);
5824 return -EINVAL;
5825 }
5826 }
602adf40 5827
751cc0e3
AE
5828 ret = -ENOENT;
5829 spin_lock(&rbd_dev_list_lock);
5830 list_for_each(tmp, &rbd_dev_list) {
5831 rbd_dev = list_entry(tmp, struct rbd_device, node);
5832 if (rbd_dev->dev_id == dev_id) {
5833 ret = 0;
5834 break;
5835 }
42382b70 5836 }
751cc0e3
AE
5837 if (!ret) {
5838 spin_lock_irq(&rbd_dev->lock);
0276dca6 5839 if (rbd_dev->open_count && !force)
751cc0e3
AE
5840 ret = -EBUSY;
5841 else
82a442d2
AE
5842 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5843 &rbd_dev->flags);
751cc0e3
AE
5844 spin_unlock_irq(&rbd_dev->lock);
5845 }
5846 spin_unlock(&rbd_dev_list_lock);
82a442d2 5847 if (ret < 0 || already)
1ba0f1e7 5848 return ret;
751cc0e3 5849
0276dca6
MC
5850 if (force) {
5851 /*
5852 * Prevent new IO from being queued and wait for existing
5853 * IO to complete/fail.
5854 */
5855 blk_mq_freeze_queue(rbd_dev->disk->queue);
5856 blk_set_queue_dying(rbd_dev->disk->queue);
5857 }
5858
5769ed0c
ID
5859 del_gendisk(rbd_dev->disk);
5860 spin_lock(&rbd_dev_list_lock);
5861 list_del_init(&rbd_dev->node);
5862 spin_unlock(&rbd_dev_list_lock);
5863 device_del(&rbd_dev->dev);
fca27065 5864
e010dd0a 5865 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 5866 rbd_dev_device_release(rbd_dev);
8ad42cd0 5867 rbd_dev_image_release(rbd_dev);
8b679ec5 5868 rbd_dev_destroy(rbd_dev);
1ba0f1e7 5869 return count;
602adf40
YS
5870}
5871
9b60e70b
ID
5872static ssize_t rbd_remove(struct bus_type *bus,
5873 const char *buf,
5874 size_t count)
5875{
5876 if (single_major)
5877 return -EINVAL;
5878
5879 return do_rbd_remove(bus, buf, count);
5880}
5881
5882static ssize_t rbd_remove_single_major(struct bus_type *bus,
5883 const char *buf,
5884 size_t count)
5885{
5886 return do_rbd_remove(bus, buf, count);
5887}
5888
602adf40
YS
5889/*
5890 * create control files in sysfs
dfc5606d 5891 * /sys/bus/rbd/...
602adf40
YS
5892 */
5893static int rbd_sysfs_init(void)
5894{
dfc5606d 5895 int ret;
602adf40 5896
fed4c143 5897 ret = device_register(&rbd_root_dev);
21079786 5898 if (ret < 0)
dfc5606d 5899 return ret;
602adf40 5900
fed4c143
AE
5901 ret = bus_register(&rbd_bus_type);
5902 if (ret < 0)
5903 device_unregister(&rbd_root_dev);
602adf40 5904
602adf40
YS
5905 return ret;
5906}
5907
5908static void rbd_sysfs_cleanup(void)
5909{
dfc5606d 5910 bus_unregister(&rbd_bus_type);
fed4c143 5911 device_unregister(&rbd_root_dev);
602adf40
YS
5912}
5913
1c2a9dfe
AE
5914static int rbd_slab_init(void)
5915{
5916 rbd_assert(!rbd_img_request_cache);
03d94406 5917 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
5918 if (!rbd_img_request_cache)
5919 return -ENOMEM;
5920
5921 rbd_assert(!rbd_obj_request_cache);
03d94406 5922 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
5923 if (!rbd_obj_request_cache)
5924 goto out_err;
5925
6c696d85 5926 return 0;
1c2a9dfe 5927
6c696d85 5928out_err:
868311b1
AE
5929 kmem_cache_destroy(rbd_img_request_cache);
5930 rbd_img_request_cache = NULL;
1c2a9dfe
AE
5931 return -ENOMEM;
5932}
5933
5934static void rbd_slab_exit(void)
5935{
868311b1
AE
5936 rbd_assert(rbd_obj_request_cache);
5937 kmem_cache_destroy(rbd_obj_request_cache);
5938 rbd_obj_request_cache = NULL;
5939
1c2a9dfe
AE
5940 rbd_assert(rbd_img_request_cache);
5941 kmem_cache_destroy(rbd_img_request_cache);
5942 rbd_img_request_cache = NULL;
5943}
5944
cc344fa1 5945static int __init rbd_init(void)
602adf40
YS
5946{
5947 int rc;
5948
1e32d34c
AE
5949 if (!libceph_compatible(NULL)) {
5950 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5951 return -EINVAL;
5952 }
e1b4d96d 5953
1c2a9dfe 5954 rc = rbd_slab_init();
602adf40
YS
5955 if (rc)
5956 return rc;
e1b4d96d 5957
f5ee37bd
ID
5958 /*
5959 * The number of active work items is limited by the number of
f77303bd 5960 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
5961 */
5962 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5963 if (!rbd_wq) {
5964 rc = -ENOMEM;
5965 goto err_out_slab;
5966 }
5967
9b60e70b
ID
5968 if (single_major) {
5969 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5970 if (rbd_major < 0) {
5971 rc = rbd_major;
f5ee37bd 5972 goto err_out_wq;
9b60e70b
ID
5973 }
5974 }
5975
1c2a9dfe
AE
5976 rc = rbd_sysfs_init();
5977 if (rc)
9b60e70b
ID
5978 goto err_out_blkdev;
5979
5980 if (single_major)
5981 pr_info("loaded (major %d)\n", rbd_major);
5982 else
5983 pr_info("loaded\n");
1c2a9dfe 5984
e1b4d96d
ID
5985 return 0;
5986
9b60e70b
ID
5987err_out_blkdev:
5988 if (single_major)
5989 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
5990err_out_wq:
5991 destroy_workqueue(rbd_wq);
e1b4d96d
ID
5992err_out_slab:
5993 rbd_slab_exit();
1c2a9dfe 5994 return rc;
602adf40
YS
5995}
5996
cc344fa1 5997static void __exit rbd_exit(void)
602adf40 5998{
ffe312cf 5999 ida_destroy(&rbd_dev_id_ida);
602adf40 6000 rbd_sysfs_cleanup();
9b60e70b
ID
6001 if (single_major)
6002 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6003 destroy_workqueue(rbd_wq);
1c2a9dfe 6004 rbd_slab_exit();
602adf40
YS
6005}
6006
6007module_init(rbd_init);
6008module_exit(rbd_exit);
6009
d552c619 6010MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6011MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6012MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6013/* following authorship retained from original osdblk.c */
6014MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6015
90da258b 6016MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6017MODULE_LICENSE("GPL");