rbd: refactor rbd_wait_state_locked()
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
43df3d35 35#include <linux/ceph/striper.h>
602adf40 36#include <linux/ceph/decode.h>
59c2be1e 37#include <linux/parser.h>
30d1cff8 38#include <linux/bsearch.h>
602adf40
YS
39
40#include <linux/kernel.h>
41#include <linux/device.h>
42#include <linux/module.h>
7ad18afa 43#include <linux/blk-mq.h>
602adf40
YS
44#include <linux/fs.h>
45#include <linux/blkdev.h>
1c2a9dfe 46#include <linux/slab.h>
f8a22fc2 47#include <linux/idr.h>
bc1ecc65 48#include <linux/workqueue.h>
602adf40
YS
49
50#include "rbd_types.h"
51
aafb230e
AE
52#define RBD_DEBUG /* Activate rbd_assert() calls */
53
a2acd00e
AE
54/*
55 * Increment the given counter and return its updated value.
56 * If the counter is already 0 it will not be incremented.
57 * If the counter is already at its maximum value returns
58 * -EINVAL without updating it.
59 */
60static int atomic_inc_return_safe(atomic_t *v)
61{
62 unsigned int counter;
63
64 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
65 if (counter <= (unsigned int)INT_MAX)
66 return (int)counter;
67
68 atomic_dec(v);
69
70 return -EINVAL;
71}
72
73/* Decrement the counter. Return the resulting value, or -EINVAL */
74static int atomic_dec_return_safe(atomic_t *v)
75{
76 int counter;
77
78 counter = atomic_dec_return(v);
79 if (counter >= 0)
80 return counter;
81
82 atomic_inc(v);
83
84 return -EINVAL;
85}
86
f0f8cef5 87#define RBD_DRV_NAME "rbd"
602adf40 88
7e513d43
ID
89#define RBD_MINORS_PER_MAJOR 256
90#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 91
6d69bb53
ID
92#define RBD_MAX_PARENT_CHAIN_LEN 16
93
d4b125e9
AE
94#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95#define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
35d489f9 98#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
99
100#define RBD_SNAP_HEAD_NAME "-"
101
9682fc6d
AE
102#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
103
9e15b77d
AE
104/* This allows a single page to hold an image name sent by OSD */
105#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 106#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 107
1e130199 108#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 109
ed95b21a 110#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
111#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
112
d889140c
AE
113/* Feature bits */
114
8767b293
ID
115#define RBD_FEATURE_LAYERING (1ULL<<0)
116#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118#define RBD_FEATURE_DATA_POOL (1ULL<<7)
e573427a 119#define RBD_FEATURE_OPERATIONS (1ULL<<8)
8767b293 120
ed95b21a
ID
121#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
122 RBD_FEATURE_STRIPINGV2 | \
7e97332e 123 RBD_FEATURE_EXCLUSIVE_LOCK | \
e573427a
ID
124 RBD_FEATURE_DATA_POOL | \
125 RBD_FEATURE_OPERATIONS)
d889140c
AE
126
127/* Features supported by this (client software) implementation. */
128
770eba6e 129#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 130
81a89793
AE
131/*
132 * An RBD device name will be "rbd#", where the "rbd" comes from
133 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 134 */
602adf40
YS
135#define DEV_NAME_LEN 32
136
137/*
138 * block device image metadata (in-memory version)
139 */
140struct rbd_image_header {
f35a4dee 141 /* These six fields never change for a given rbd image */
849b4260 142 char *object_prefix;
602adf40 143 __u8 obj_order;
f35a4dee
AE
144 u64 stripe_unit;
145 u64 stripe_count;
7e97332e 146 s64 data_pool_id;
f35a4dee 147 u64 features; /* Might be changeable someday? */
602adf40 148
f84344f3
AE
149 /* The remaining fields need to be updated occasionally */
150 u64 image_size;
151 struct ceph_snap_context *snapc;
f35a4dee
AE
152 char *snap_names; /* format 1 only */
153 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
154};
155
0d7dbfce
AE
156/*
157 * An rbd image specification.
158 *
159 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
160 * identify an image. Each rbd_dev structure includes a pointer to
161 * an rbd_spec structure that encapsulates this identity.
162 *
163 * Each of the id's in an rbd_spec has an associated name. For a
164 * user-mapped image, the names are supplied and the id's associated
165 * with them are looked up. For a layered image, a parent image is
166 * defined by the tuple, and the names are looked up.
167 *
168 * An rbd_dev structure contains a parent_spec pointer which is
169 * non-null if the image it represents is a child in a layered
170 * image. This pointer will refer to the rbd_spec structure used
171 * by the parent rbd_dev for its own identity (i.e., the structure
172 * is shared between the parent and child).
173 *
174 * Since these structures are populated once, during the discovery
175 * phase of image construction, they are effectively immutable so
176 * we make no effort to synchronize access to them.
177 *
178 * Note that code herein does not assume the image name is known (it
179 * could be a null pointer).
0d7dbfce
AE
180 */
181struct rbd_spec {
182 u64 pool_id;
ecb4dc22 183 const char *pool_name;
0d7dbfce 184
ecb4dc22
AE
185 const char *image_id;
186 const char *image_name;
0d7dbfce
AE
187
188 u64 snap_id;
ecb4dc22 189 const char *snap_name;
0d7dbfce
AE
190
191 struct kref kref;
192};
193
602adf40 194/*
f0f8cef5 195 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
196 */
197struct rbd_client {
198 struct ceph_client *client;
199 struct kref kref;
200 struct list_head node;
201};
202
bf0d5f50 203struct rbd_img_request;
bf0d5f50 204
9969ebc5 205enum obj_request_type {
a1fbb5e7 206 OBJ_REQUEST_NODATA = 1,
5359a17d 207 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
7e07efb1 208 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
afb97888 209 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
9969ebc5 210};
bf0d5f50 211
6d2940c8 212enum obj_operation_type {
a1fbb5e7 213 OBJ_OP_READ = 1,
6d2940c8 214 OBJ_OP_WRITE,
90e98c52 215 OBJ_OP_DISCARD,
6d2940c8
GZ
216};
217
3da691bf
ID
218/*
219 * Writes go through the following state machine to deal with
220 * layering:
221 *
222 * need copyup
223 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
224 * | ^ |
225 * v \------------------------------/
226 * done
227 * ^
228 * |
229 * RBD_OBJ_WRITE_FLAT
230 *
231 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
232 * there is a parent or not.
233 */
234enum rbd_obj_write_state {
235 RBD_OBJ_WRITE_FLAT = 1,
236 RBD_OBJ_WRITE_GUARD,
237 RBD_OBJ_WRITE_COPYUP,
926f9b3f
AE
238};
239
bf0d5f50 240struct rbd_obj_request {
43df3d35 241 struct ceph_object_extent ex;
c5b5ef6c 242 union {
3da691bf
ID
243 bool tried_parent; /* for reads */
244 enum rbd_obj_write_state write_state; /* for writes */
c5b5ef6c 245 };
bf0d5f50 246
51c3509e 247 struct rbd_img_request *img_request;
86bd7998
ID
248 struct ceph_file_extent *img_extents;
249 u32 num_img_extents;
bf0d5f50 250
788e2df3 251 union {
5359a17d 252 struct ceph_bio_iter bio_pos;
788e2df3 253 struct {
7e07efb1
ID
254 struct ceph_bvec_iter bvec_pos;
255 u32 bvec_count;
afb97888 256 u32 bvec_idx;
788e2df3
AE
257 };
258 };
7e07efb1
ID
259 struct bio_vec *copyup_bvecs;
260 u32 copyup_bvec_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50 266
bf0d5f50
AE
267 struct kref kref;
268};
269
0c425248 270enum img_req_flags {
9849e986 271 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 272 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
273};
274
bf0d5f50 275struct rbd_img_request {
bf0d5f50 276 struct rbd_device *rbd_dev;
9bb0248d 277 enum obj_operation_type op_type;
ecc633ca 278 enum obj_request_type data_type;
0c425248 279 unsigned long flags;
bf0d5f50 280 union {
9849e986 281 u64 snap_id; /* for reads */
bf0d5f50 282 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
283 };
284 union {
285 struct request *rq; /* block request */
286 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 287 };
15961b44 288 spinlock_t completion_lock;
55f27e09 289 u64 xferred;/* aggregate bytes transferred */
a5a337d4 290 int result; /* first nonzero obj_request result */
bf0d5f50 291
43df3d35 292 struct list_head object_extents; /* obj_req.ex structs */
bf0d5f50 293 u32 obj_request_count;
7114edac 294 u32 pending_count;
bf0d5f50
AE
295
296 struct kref kref;
297};
298
299#define for_each_obj_request(ireq, oreq) \
43df3d35 300 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 301#define for_each_obj_request_safe(ireq, oreq, n) \
43df3d35 302 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 303
99d16943
ID
304enum rbd_watch_state {
305 RBD_WATCH_STATE_UNREGISTERED,
306 RBD_WATCH_STATE_REGISTERED,
307 RBD_WATCH_STATE_ERROR,
308};
309
ed95b21a
ID
310enum rbd_lock_state {
311 RBD_LOCK_STATE_UNLOCKED,
312 RBD_LOCK_STATE_LOCKED,
313 RBD_LOCK_STATE_RELEASING,
314};
315
316/* WatchNotify::ClientId */
317struct rbd_client_id {
318 u64 gid;
319 u64 handle;
320};
321
f84344f3 322struct rbd_mapping {
99c1f08f 323 u64 size;
34b13184 324 u64 features;
f84344f3
AE
325};
326
602adf40
YS
327/*
328 * a single device
329 */
330struct rbd_device {
de71a297 331 int dev_id; /* blkdev unique id */
602adf40
YS
332
333 int major; /* blkdev assigned major */
dd82fff1 334 int minor;
602adf40 335 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 336
a30b71b9 337 u32 image_format; /* Either 1 or 2 */
602adf40
YS
338 struct rbd_client *rbd_client;
339
340 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
341
b82d167b 342 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
343
344 struct rbd_image_header header;
b82d167b 345 unsigned long flags; /* possibly lock protected */
0d7dbfce 346 struct rbd_spec *spec;
d147543d 347 struct rbd_options *opts;
0d6d1e9c 348 char *config_info; /* add{,_single_major} string */
602adf40 349
c41d13a3 350 struct ceph_object_id header_oid;
922dab61 351 struct ceph_object_locator header_oloc;
971f839a 352
1643dfa4 353 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 354
99d16943
ID
355 struct mutex watch_mutex;
356 enum rbd_watch_state watch_state;
922dab61 357 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
358 u64 watch_cookie;
359 struct delayed_work watch_dwork;
59c2be1e 360
ed95b21a
ID
361 struct rw_semaphore lock_rwsem;
362 enum rbd_lock_state lock_state;
cbbfb0ff 363 char lock_cookie[32];
ed95b21a
ID
364 struct rbd_client_id owner_cid;
365 struct work_struct acquired_lock_work;
366 struct work_struct released_lock_work;
367 struct delayed_work lock_dwork;
368 struct work_struct unlock_work;
369 wait_queue_head_t lock_waitq;
370
1643dfa4 371 struct workqueue_struct *task_wq;
59c2be1e 372
86b00e0d
AE
373 struct rbd_spec *parent_spec;
374 u64 parent_overlap;
a2acd00e 375 atomic_t parent_ref;
2f82ee54 376 struct rbd_device *parent;
86b00e0d 377
7ad18afa
CH
378 /* Block layer tags. */
379 struct blk_mq_tag_set tag_set;
380
c666601a
JD
381 /* protects updating the header */
382 struct rw_semaphore header_rwsem;
f84344f3
AE
383
384 struct rbd_mapping mapping;
602adf40
YS
385
386 struct list_head node;
dfc5606d 387
dfc5606d
YS
388 /* sysfs related */
389 struct device dev;
b82d167b 390 unsigned long open_count; /* protected by lock */
dfc5606d
YS
391};
392
b82d167b 393/*
87c0fded
ID
394 * Flag bits for rbd_dev->flags:
395 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
396 * by rbd_dev->lock
397 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 398 */
6d292906
AE
399enum rbd_dev_flags {
400 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 401 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 402 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
403};
404
cfbf6377 405static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 406
602adf40 407static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
408static DEFINE_SPINLOCK(rbd_dev_list_lock);
409
432b8587
AE
410static LIST_HEAD(rbd_client_list); /* clients */
411static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 412
78c2a44a
AE
413/* Slab caches for frequently-allocated structures */
414
1c2a9dfe 415static struct kmem_cache *rbd_img_request_cache;
868311b1 416static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 417
9b60e70b 418static int rbd_major;
f8a22fc2
ID
419static DEFINE_IDA(rbd_dev_id_ida);
420
f5ee37bd
ID
421static struct workqueue_struct *rbd_wq;
422
9b60e70b 423/*
3cfa3b16 424 * single-major requires >= 0.75 version of userspace rbd utility.
9b60e70b 425 */
3cfa3b16 426static bool single_major = true;
9b60e70b 427module_param(single_major, bool, S_IRUGO);
3cfa3b16 428MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
9b60e70b 429
f0f8cef5
AE
430static ssize_t rbd_add(struct bus_type *bus, const char *buf,
431 size_t count);
432static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
433 size_t count);
9b60e70b
ID
434static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
435 size_t count);
436static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
437 size_t count);
6d69bb53 438static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
f0f8cef5 439
9b60e70b
ID
440static int rbd_dev_id_to_minor(int dev_id)
441{
7e513d43 442 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
443}
444
445static int minor_to_rbd_dev_id(int minor)
446{
7e513d43 447 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
448}
449
ed95b21a
ID
450static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
451{
452 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
453 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
454}
455
456static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
457{
458 bool is_lock_owner;
459
460 down_read(&rbd_dev->lock_rwsem);
461 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
462 up_read(&rbd_dev->lock_rwsem);
463 return is_lock_owner;
464}
465
8767b293
ID
466static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
467{
468 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
469}
470
b15a21dd
GKH
471static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
472static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
473static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
474static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
8767b293 475static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
b15a21dd
GKH
476
477static struct attribute *rbd_bus_attrs[] = {
478 &bus_attr_add.attr,
479 &bus_attr_remove.attr,
9b60e70b
ID
480 &bus_attr_add_single_major.attr,
481 &bus_attr_remove_single_major.attr,
8767b293 482 &bus_attr_supported_features.attr,
b15a21dd 483 NULL,
f0f8cef5 484};
92c76dc0
ID
485
486static umode_t rbd_bus_is_visible(struct kobject *kobj,
487 struct attribute *attr, int index)
488{
9b60e70b
ID
489 if (!single_major &&
490 (attr == &bus_attr_add_single_major.attr ||
491 attr == &bus_attr_remove_single_major.attr))
492 return 0;
493
92c76dc0
ID
494 return attr->mode;
495}
496
497static const struct attribute_group rbd_bus_group = {
498 .attrs = rbd_bus_attrs,
499 .is_visible = rbd_bus_is_visible,
500};
501__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
502
503static struct bus_type rbd_bus_type = {
504 .name = "rbd",
b15a21dd 505 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
506};
507
508static void rbd_root_dev_release(struct device *dev)
509{
510}
511
512static struct device rbd_root_dev = {
513 .init_name = "rbd",
514 .release = rbd_root_dev_release,
515};
516
06ecc6cb
AE
517static __printf(2, 3)
518void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
519{
520 struct va_format vaf;
521 va_list args;
522
523 va_start(args, fmt);
524 vaf.fmt = fmt;
525 vaf.va = &args;
526
527 if (!rbd_dev)
528 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
529 else if (rbd_dev->disk)
530 printk(KERN_WARNING "%s: %s: %pV\n",
531 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
532 else if (rbd_dev->spec && rbd_dev->spec->image_name)
533 printk(KERN_WARNING "%s: image %s: %pV\n",
534 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
535 else if (rbd_dev->spec && rbd_dev->spec->image_id)
536 printk(KERN_WARNING "%s: id %s: %pV\n",
537 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
538 else /* punt */
539 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
540 RBD_DRV_NAME, rbd_dev, &vaf);
541 va_end(args);
542}
543
aafb230e
AE
544#ifdef RBD_DEBUG
545#define rbd_assert(expr) \
546 if (unlikely(!(expr))) { \
547 printk(KERN_ERR "\nAssertion failure in %s() " \
548 "at line %d:\n\n" \
549 "\trbd_assert(%s);\n\n", \
550 __func__, __LINE__, #expr); \
551 BUG(); \
552 }
553#else /* !RBD_DEBUG */
554# define rbd_assert(expr) ((void) 0)
555#endif /* !RBD_DEBUG */
dfc5606d 556
05a46afd 557static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 558
cc4a38bd 559static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 560static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 561static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 562static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
563static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
564 u64 snap_id);
2ad3d716
AE
565static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
566 u8 *order, u64 *snap_size);
567static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
568 u64 *snap_features);
59c2be1e 569
602adf40
YS
570static int rbd_open(struct block_device *bdev, fmode_t mode)
571{
f0f8cef5 572 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 573 bool removing = false;
602adf40 574
a14ea269 575 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
576 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
577 removing = true;
578 else
579 rbd_dev->open_count++;
a14ea269 580 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
581 if (removing)
582 return -ENOENT;
583
c3e946ce 584 (void) get_device(&rbd_dev->dev);
340c7a2b 585
602adf40
YS
586 return 0;
587}
588
db2a144b 589static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
590{
591 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
592 unsigned long open_count_before;
593
a14ea269 594 spin_lock_irq(&rbd_dev->lock);
b82d167b 595 open_count_before = rbd_dev->open_count--;
a14ea269 596 spin_unlock_irq(&rbd_dev->lock);
b82d167b 597 rbd_assert(open_count_before > 0);
dfc5606d 598
c3e946ce 599 put_device(&rbd_dev->dev);
dfc5606d
YS
600}
601
131fd9f6
GZ
602static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
603{
1de797bb 604 int ro;
131fd9f6 605
1de797bb 606 if (get_user(ro, (int __user *)arg))
131fd9f6
GZ
607 return -EFAULT;
608
1de797bb 609 /* Snapshots can't be marked read-write */
131fd9f6
GZ
610 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
611 return -EROFS;
612
1de797bb
ID
613 /* Let blkdev_roset() handle it */
614 return -ENOTTY;
131fd9f6
GZ
615}
616
617static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
618 unsigned int cmd, unsigned long arg)
619{
620 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
1de797bb 621 int ret;
131fd9f6 622
131fd9f6
GZ
623 switch (cmd) {
624 case BLKROSET:
625 ret = rbd_ioctl_set_ro(rbd_dev, arg);
626 break;
627 default:
628 ret = -ENOTTY;
629 }
630
131fd9f6
GZ
631 return ret;
632}
633
634#ifdef CONFIG_COMPAT
635static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
636 unsigned int cmd, unsigned long arg)
637{
638 return rbd_ioctl(bdev, mode, cmd, arg);
639}
640#endif /* CONFIG_COMPAT */
641
602adf40
YS
642static const struct block_device_operations rbd_bd_ops = {
643 .owner = THIS_MODULE,
644 .open = rbd_open,
dfc5606d 645 .release = rbd_release,
131fd9f6
GZ
646 .ioctl = rbd_ioctl,
647#ifdef CONFIG_COMPAT
648 .compat_ioctl = rbd_compat_ioctl,
649#endif
602adf40
YS
650};
651
652/*
7262cfca 653 * Initialize an rbd client instance. Success or not, this function
cfbf6377 654 * consumes ceph_opts. Caller holds client_mutex.
602adf40 655 */
f8c38929 656static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
657{
658 struct rbd_client *rbdc;
659 int ret = -ENOMEM;
660
37206ee5 661 dout("%s:\n", __func__);
602adf40
YS
662 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
663 if (!rbdc)
664 goto out_opt;
665
666 kref_init(&rbdc->kref);
667 INIT_LIST_HEAD(&rbdc->node);
668
74da4a0f 669 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 670 if (IS_ERR(rbdc->client))
08f75463 671 goto out_rbdc;
43ae4701 672 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
673
674 ret = ceph_open_session(rbdc->client);
675 if (ret < 0)
08f75463 676 goto out_client;
602adf40 677
432b8587 678 spin_lock(&rbd_client_list_lock);
602adf40 679 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 680 spin_unlock(&rbd_client_list_lock);
602adf40 681
37206ee5 682 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 683
602adf40 684 return rbdc;
08f75463 685out_client:
602adf40 686 ceph_destroy_client(rbdc->client);
08f75463 687out_rbdc:
602adf40
YS
688 kfree(rbdc);
689out_opt:
43ae4701
AE
690 if (ceph_opts)
691 ceph_destroy_options(ceph_opts);
37206ee5
AE
692 dout("%s: error %d\n", __func__, ret);
693
28f259b7 694 return ERR_PTR(ret);
602adf40
YS
695}
696
2f82ee54
AE
697static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
698{
699 kref_get(&rbdc->kref);
700
701 return rbdc;
702}
703
602adf40 704/*
1f7ba331
AE
705 * Find a ceph client with specific addr and configuration. If
706 * found, bump its reference count.
602adf40 707 */
1f7ba331 708static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
709{
710 struct rbd_client *client_node;
1f7ba331 711 bool found = false;
602adf40 712
43ae4701 713 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
714 return NULL;
715
1f7ba331
AE
716 spin_lock(&rbd_client_list_lock);
717 list_for_each_entry(client_node, &rbd_client_list, node) {
718 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
719 __rbd_get_client(client_node);
720
1f7ba331
AE
721 found = true;
722 break;
723 }
724 }
725 spin_unlock(&rbd_client_list_lock);
726
727 return found ? client_node : NULL;
602adf40
YS
728}
729
59c2be1e 730/*
210c104c 731 * (Per device) rbd map options
59c2be1e
YS
732 */
733enum {
b5584180 734 Opt_queue_depth,
59c2be1e
YS
735 Opt_last_int,
736 /* int args above */
737 Opt_last_string,
738 /* string args above */
cc0538b6
AE
739 Opt_read_only,
740 Opt_read_write,
80de1912 741 Opt_lock_on_read,
e010dd0a 742 Opt_exclusive,
210c104c 743 Opt_err
59c2be1e
YS
744};
745
43ae4701 746static match_table_t rbd_opts_tokens = {
b5584180 747 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
748 /* int args above */
749 /* string args above */
be466c1c 750 {Opt_read_only, "read_only"},
cc0538b6
AE
751 {Opt_read_only, "ro"}, /* Alternate spelling */
752 {Opt_read_write, "read_write"},
753 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 754 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 755 {Opt_exclusive, "exclusive"},
210c104c 756 {Opt_err, NULL}
59c2be1e
YS
757};
758
98571b5a 759struct rbd_options {
b5584180 760 int queue_depth;
98571b5a 761 bool read_only;
80de1912 762 bool lock_on_read;
e010dd0a 763 bool exclusive;
98571b5a
AE
764};
765
b5584180 766#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 767#define RBD_READ_ONLY_DEFAULT false
80de1912 768#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 769#define RBD_EXCLUSIVE_DEFAULT false
98571b5a 770
59c2be1e
YS
771static int parse_rbd_opts_token(char *c, void *private)
772{
43ae4701 773 struct rbd_options *rbd_opts = private;
59c2be1e
YS
774 substring_t argstr[MAX_OPT_ARGS];
775 int token, intval, ret;
776
43ae4701 777 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
778 if (token < Opt_last_int) {
779 ret = match_int(&argstr[0], &intval);
780 if (ret < 0) {
210c104c 781 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
782 return ret;
783 }
784 dout("got int token %d val %d\n", token, intval);
785 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 786 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
787 } else {
788 dout("got token %d\n", token);
789 }
790
791 switch (token) {
b5584180
ID
792 case Opt_queue_depth:
793 if (intval < 1) {
794 pr_err("queue_depth out of range\n");
795 return -EINVAL;
796 }
797 rbd_opts->queue_depth = intval;
798 break;
cc0538b6
AE
799 case Opt_read_only:
800 rbd_opts->read_only = true;
801 break;
802 case Opt_read_write:
803 rbd_opts->read_only = false;
804 break;
80de1912
ID
805 case Opt_lock_on_read:
806 rbd_opts->lock_on_read = true;
807 break;
e010dd0a
ID
808 case Opt_exclusive:
809 rbd_opts->exclusive = true;
810 break;
59c2be1e 811 default:
210c104c
ID
812 /* libceph prints "bad option" msg */
813 return -EINVAL;
59c2be1e 814 }
210c104c 815
59c2be1e
YS
816 return 0;
817}
818
6d2940c8
GZ
819static char* obj_op_name(enum obj_operation_type op_type)
820{
821 switch (op_type) {
822 case OBJ_OP_READ:
823 return "read";
824 case OBJ_OP_WRITE:
825 return "write";
90e98c52
GZ
826 case OBJ_OP_DISCARD:
827 return "discard";
6d2940c8
GZ
828 default:
829 return "???";
830 }
831}
832
602adf40
YS
833/*
834 * Destroy ceph client
d23a4b3f 835 *
432b8587 836 * Caller must hold rbd_client_list_lock.
602adf40
YS
837 */
838static void rbd_client_release(struct kref *kref)
839{
840 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
841
37206ee5 842 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 843 spin_lock(&rbd_client_list_lock);
602adf40 844 list_del(&rbdc->node);
cd9d9f5d 845 spin_unlock(&rbd_client_list_lock);
602adf40
YS
846
847 ceph_destroy_client(rbdc->client);
848 kfree(rbdc);
849}
850
851/*
852 * Drop reference to ceph client node. If it's not referenced anymore, release
853 * it.
854 */
9d3997fd 855static void rbd_put_client(struct rbd_client *rbdc)
602adf40 856{
c53d5893
AE
857 if (rbdc)
858 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
859}
860
dd435855
ID
861static int wait_for_latest_osdmap(struct ceph_client *client)
862{
863 u64 newest_epoch;
864 int ret;
865
866 ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
867 if (ret)
868 return ret;
869
870 if (client->osdc.osdmap->epoch >= newest_epoch)
871 return 0;
872
873 ceph_osdc_maybe_request_map(&client->osdc);
874 return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
875 client->options->mount_timeout);
876}
877
5feb0d8d
ID
878/*
879 * Get a ceph client with specific addr and configuration, if one does
880 * not exist create it. Either way, ceph_opts is consumed by this
881 * function.
882 */
883static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
884{
885 struct rbd_client *rbdc;
dd435855 886 int ret;
5feb0d8d
ID
887
888 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
889 rbdc = rbd_client_find(ceph_opts);
dd435855 890 if (rbdc) {
5feb0d8d 891 ceph_destroy_options(ceph_opts);
dd435855
ID
892
893 /*
894 * Using an existing client. Make sure ->pg_pools is up to
895 * date before we look up the pool id in do_rbd_add().
896 */
897 ret = wait_for_latest_osdmap(rbdc->client);
898 if (ret) {
899 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
900 rbd_put_client(rbdc);
901 rbdc = ERR_PTR(ret);
902 }
903 } else {
5feb0d8d 904 rbdc = rbd_client_create(ceph_opts);
dd435855 905 }
5feb0d8d
ID
906 mutex_unlock(&client_mutex);
907
908 return rbdc;
909}
910
a30b71b9
AE
911static bool rbd_image_format_valid(u32 image_format)
912{
913 return image_format == 1 || image_format == 2;
914}
915
8e94af8e
AE
916static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
917{
103a150f
AE
918 size_t size;
919 u32 snap_count;
920
921 /* The header has to start with the magic rbd header text */
922 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
923 return false;
924
db2388b6
AE
925 /* The bio layer requires at least sector-sized I/O */
926
927 if (ondisk->options.order < SECTOR_SHIFT)
928 return false;
929
930 /* If we use u64 in a few spots we may be able to loosen this */
931
932 if (ondisk->options.order > 8 * sizeof (int) - 1)
933 return false;
934
103a150f
AE
935 /*
936 * The size of a snapshot header has to fit in a size_t, and
937 * that limits the number of snapshots.
938 */
939 snap_count = le32_to_cpu(ondisk->snap_count);
940 size = SIZE_MAX - sizeof (struct ceph_snap_context);
941 if (snap_count > size / sizeof (__le64))
942 return false;
943
944 /*
945 * Not only that, but the size of the entire the snapshot
946 * header must also be representable in a size_t.
947 */
948 size -= snap_count * sizeof (__le64);
949 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
950 return false;
951
952 return true;
8e94af8e
AE
953}
954
5bc3fb17
ID
955/*
956 * returns the size of an object in the image
957 */
958static u32 rbd_obj_bytes(struct rbd_image_header *header)
959{
960 return 1U << header->obj_order;
961}
962
263423f8
ID
963static void rbd_init_layout(struct rbd_device *rbd_dev)
964{
965 if (rbd_dev->header.stripe_unit == 0 ||
966 rbd_dev->header.stripe_count == 0) {
967 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
968 rbd_dev->header.stripe_count = 1;
969 }
970
971 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
972 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
973 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
974 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
975 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
976 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
977}
978
602adf40 979/*
bb23e37a
AE
980 * Fill an rbd image header with information from the given format 1
981 * on-disk header.
602adf40 982 */
662518b1 983static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 984 struct rbd_image_header_ondisk *ondisk)
602adf40 985{
662518b1 986 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
987 bool first_time = header->object_prefix == NULL;
988 struct ceph_snap_context *snapc;
989 char *object_prefix = NULL;
990 char *snap_names = NULL;
991 u64 *snap_sizes = NULL;
ccece235 992 u32 snap_count;
bb23e37a 993 int ret = -ENOMEM;
621901d6 994 u32 i;
602adf40 995
bb23e37a 996 /* Allocate this now to avoid having to handle failure below */
6a52325f 997
bb23e37a 998 if (first_time) {
848d796c
ID
999 object_prefix = kstrndup(ondisk->object_prefix,
1000 sizeof(ondisk->object_prefix),
1001 GFP_KERNEL);
bb23e37a
AE
1002 if (!object_prefix)
1003 return -ENOMEM;
bb23e37a 1004 }
00f1f36f 1005
bb23e37a 1006 /* Allocate the snapshot context and fill it in */
00f1f36f 1007
bb23e37a
AE
1008 snap_count = le32_to_cpu(ondisk->snap_count);
1009 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1010 if (!snapc)
1011 goto out_err;
1012 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1013 if (snap_count) {
bb23e37a 1014 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1015 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1016
bb23e37a 1017 /* We'll keep a copy of the snapshot names... */
621901d6 1018
bb23e37a
AE
1019 if (snap_names_len > (u64)SIZE_MAX)
1020 goto out_2big;
1021 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1022 if (!snap_names)
6a52325f
AE
1023 goto out_err;
1024
bb23e37a 1025 /* ...as well as the array of their sizes. */
88a25a5f
ME
1026 snap_sizes = kmalloc_array(snap_count,
1027 sizeof(*header->snap_sizes),
1028 GFP_KERNEL);
bb23e37a 1029 if (!snap_sizes)
6a52325f 1030 goto out_err;
bb23e37a 1031
f785cc1d 1032 /*
bb23e37a
AE
1033 * Copy the names, and fill in each snapshot's id
1034 * and size.
1035 *
99a41ebc 1036 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1037 * ondisk buffer we're working with has
f785cc1d
AE
1038 * snap_names_len bytes beyond the end of the
1039 * snapshot id array, this memcpy() is safe.
1040 */
bb23e37a
AE
1041 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1042 snaps = ondisk->snaps;
1043 for (i = 0; i < snap_count; i++) {
1044 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1045 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1046 }
602adf40 1047 }
6a52325f 1048
bb23e37a 1049 /* We won't fail any more, fill in the header */
621901d6 1050
bb23e37a
AE
1051 if (first_time) {
1052 header->object_prefix = object_prefix;
1053 header->obj_order = ondisk->options.order;
263423f8 1054 rbd_init_layout(rbd_dev);
602adf40 1055 } else {
662518b1
AE
1056 ceph_put_snap_context(header->snapc);
1057 kfree(header->snap_names);
1058 kfree(header->snap_sizes);
602adf40 1059 }
849b4260 1060
bb23e37a 1061 /* The remaining fields always get updated (when we refresh) */
621901d6 1062
f84344f3 1063 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1064 header->snapc = snapc;
1065 header->snap_names = snap_names;
1066 header->snap_sizes = snap_sizes;
468521c1 1067
602adf40 1068 return 0;
bb23e37a
AE
1069out_2big:
1070 ret = -EIO;
6a52325f 1071out_err:
bb23e37a
AE
1072 kfree(snap_sizes);
1073 kfree(snap_names);
1074 ceph_put_snap_context(snapc);
1075 kfree(object_prefix);
ccece235 1076
bb23e37a 1077 return ret;
602adf40
YS
1078}
1079
9682fc6d
AE
1080static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1081{
1082 const char *snap_name;
1083
1084 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1085
1086 /* Skip over names until we find the one we are looking for */
1087
1088 snap_name = rbd_dev->header.snap_names;
1089 while (which--)
1090 snap_name += strlen(snap_name) + 1;
1091
1092 return kstrdup(snap_name, GFP_KERNEL);
1093}
1094
30d1cff8
AE
1095/*
1096 * Snapshot id comparison function for use with qsort()/bsearch().
1097 * Note that result is for snapshots in *descending* order.
1098 */
1099static int snapid_compare_reverse(const void *s1, const void *s2)
1100{
1101 u64 snap_id1 = *(u64 *)s1;
1102 u64 snap_id2 = *(u64 *)s2;
1103
1104 if (snap_id1 < snap_id2)
1105 return 1;
1106 return snap_id1 == snap_id2 ? 0 : -1;
1107}
1108
1109/*
1110 * Search a snapshot context to see if the given snapshot id is
1111 * present.
1112 *
1113 * Returns the position of the snapshot id in the array if it's found,
1114 * or BAD_SNAP_INDEX otherwise.
1115 *
1116 * Note: The snapshot array is in kept sorted (by the osd) in
1117 * reverse order, highest snapshot id first.
1118 */
9682fc6d
AE
1119static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1120{
1121 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1122 u64 *found;
9682fc6d 1123
30d1cff8
AE
1124 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1125 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1126
30d1cff8 1127 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1128}
1129
2ad3d716
AE
1130static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1131 u64 snap_id)
9e15b77d 1132{
54cac61f 1133 u32 which;
da6a6b63 1134 const char *snap_name;
9e15b77d 1135
54cac61f
AE
1136 which = rbd_dev_snap_index(rbd_dev, snap_id);
1137 if (which == BAD_SNAP_INDEX)
da6a6b63 1138 return ERR_PTR(-ENOENT);
54cac61f 1139
da6a6b63
JD
1140 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1141 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1142}
1143
1144static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1145{
9e15b77d
AE
1146 if (snap_id == CEPH_NOSNAP)
1147 return RBD_SNAP_HEAD_NAME;
1148
54cac61f
AE
1149 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1150 if (rbd_dev->image_format == 1)
1151 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1152
54cac61f 1153 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1154}
1155
2ad3d716
AE
1156static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1157 u64 *snap_size)
602adf40 1158{
2ad3d716
AE
1159 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1160 if (snap_id == CEPH_NOSNAP) {
1161 *snap_size = rbd_dev->header.image_size;
1162 } else if (rbd_dev->image_format == 1) {
1163 u32 which;
602adf40 1164
2ad3d716
AE
1165 which = rbd_dev_snap_index(rbd_dev, snap_id);
1166 if (which == BAD_SNAP_INDEX)
1167 return -ENOENT;
e86924a8 1168
2ad3d716
AE
1169 *snap_size = rbd_dev->header.snap_sizes[which];
1170 } else {
1171 u64 size = 0;
1172 int ret;
1173
1174 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1175 if (ret)
1176 return ret;
1177
1178 *snap_size = size;
1179 }
1180 return 0;
602adf40
YS
1181}
1182
2ad3d716
AE
1183static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1184 u64 *snap_features)
602adf40 1185{
2ad3d716
AE
1186 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1187 if (snap_id == CEPH_NOSNAP) {
1188 *snap_features = rbd_dev->header.features;
1189 } else if (rbd_dev->image_format == 1) {
1190 *snap_features = 0; /* No features for format 1 */
602adf40 1191 } else {
2ad3d716
AE
1192 u64 features = 0;
1193 int ret;
8b0241f8 1194
2ad3d716
AE
1195 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1196 if (ret)
1197 return ret;
1198
1199 *snap_features = features;
1200 }
1201 return 0;
1202}
1203
1204static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1205{
8f4b7d98 1206 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1207 u64 size = 0;
1208 u64 features = 0;
1209 int ret;
1210
2ad3d716
AE
1211 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1212 if (ret)
1213 return ret;
1214 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1215 if (ret)
1216 return ret;
1217
1218 rbd_dev->mapping.size = size;
1219 rbd_dev->mapping.features = features;
1220
8b0241f8 1221 return 0;
602adf40
YS
1222}
1223
d1cf5788
AE
1224static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1225{
1226 rbd_dev->mapping.size = 0;
1227 rbd_dev->mapping.features = 0;
200a6a8b
AE
1228}
1229
5359a17d 1230static void zero_bvec(struct bio_vec *bv)
602adf40 1231{
602adf40 1232 void *buf;
5359a17d 1233 unsigned long flags;
602adf40 1234
5359a17d
ID
1235 buf = bvec_kmap_irq(bv, &flags);
1236 memset(buf, 0, bv->bv_len);
1237 flush_dcache_page(bv->bv_page);
1238 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1239}
1240
5359a17d 1241static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
b9434c5b 1242{
5359a17d 1243 struct ceph_bio_iter it = *bio_pos;
b9434c5b 1244
5359a17d
ID
1245 ceph_bio_iter_advance(&it, off);
1246 ceph_bio_iter_advance_step(&it, bytes, ({
1247 zero_bvec(&bv);
1248 }));
b9434c5b
AE
1249}
1250
7e07efb1 1251static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
602adf40 1252{
7e07efb1 1253 struct ceph_bvec_iter it = *bvec_pos;
602adf40 1254
7e07efb1
ID
1255 ceph_bvec_iter_advance(&it, off);
1256 ceph_bvec_iter_advance_step(&it, bytes, ({
1257 zero_bvec(&bv);
1258 }));
f7760dad
AE
1259}
1260
1261/*
3da691bf 1262 * Zero a range in @obj_req data buffer defined by a bio (list) or
afb97888 1263 * (private) bio_vec array.
f7760dad 1264 *
3da691bf 1265 * @off is relative to the start of the data buffer.
926f9b3f 1266 */
3da691bf
ID
1267static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1268 u32 bytes)
926f9b3f 1269{
ecc633ca 1270 switch (obj_req->img_request->data_type) {
3da691bf
ID
1271 case OBJ_REQUEST_BIO:
1272 zero_bios(&obj_req->bio_pos, off, bytes);
1273 break;
1274 case OBJ_REQUEST_BVECS:
afb97888 1275 case OBJ_REQUEST_OWN_BVECS:
3da691bf
ID
1276 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1277 break;
1278 default:
1279 rbd_assert(0);
6365d33a
AE
1280 }
1281}
1282
bf0d5f50
AE
1283static void rbd_obj_request_destroy(struct kref *kref);
1284static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1285{
1286 rbd_assert(obj_request != NULL);
37206ee5 1287 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1288 kref_read(&obj_request->kref));
bf0d5f50
AE
1289 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1290}
1291
0f2d5be7
AE
1292static void rbd_img_request_get(struct rbd_img_request *img_request)
1293{
1294 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1295 kref_read(&img_request->kref));
0f2d5be7
AE
1296 kref_get(&img_request->kref);
1297}
1298
bf0d5f50
AE
1299static void rbd_img_request_destroy(struct kref *kref);
1300static void rbd_img_request_put(struct rbd_img_request *img_request)
1301{
1302 rbd_assert(img_request != NULL);
37206ee5 1303 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1304 kref_read(&img_request->kref));
e93aca0a 1305 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1306}
1307
1308static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1309 struct rbd_obj_request *obj_request)
1310{
25dcf954
AE
1311 rbd_assert(obj_request->img_request == NULL);
1312
b155e86c 1313 /* Image request now owns object's original reference */
bf0d5f50 1314 obj_request->img_request = img_request;
25dcf954 1315 img_request->obj_request_count++;
7114edac 1316 img_request->pending_count++;
15961b44 1317 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
bf0d5f50
AE
1318}
1319
1320static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1321 struct rbd_obj_request *obj_request)
1322{
15961b44 1323 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
43df3d35 1324 list_del(&obj_request->ex.oe_item);
25dcf954
AE
1325 rbd_assert(img_request->obj_request_count > 0);
1326 img_request->obj_request_count--;
bf0d5f50 1327 rbd_assert(obj_request->img_request == img_request);
bf0d5f50
AE
1328 rbd_obj_request_put(obj_request);
1329}
1330
980917fc 1331static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1332{
980917fc
ID
1333 struct ceph_osd_request *osd_req = obj_request->osd_req;
1334
a90bb0c1 1335 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
43df3d35
ID
1336 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1337 obj_request->ex.oe_len, osd_req);
980917fc 1338 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1339}
1340
0c425248
AE
1341/*
1342 * The default/initial value for all image request flags is 0. Each
1343 * is conditionally set to 1 at image request initialization time
1344 * and currently never change thereafter.
1345 */
d0b2e944
AE
1346static void img_request_layered_set(struct rbd_img_request *img_request)
1347{
1348 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1349 smp_mb();
1350}
1351
a2acd00e
AE
1352static void img_request_layered_clear(struct rbd_img_request *img_request)
1353{
1354 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1355 smp_mb();
1356}
1357
d0b2e944
AE
1358static bool img_request_layered_test(struct rbd_img_request *img_request)
1359{
1360 smp_mb();
1361 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1362}
1363
3da691bf 1364static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
6e2a4505 1365{
3da691bf 1366 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
b9434c5b 1367
43df3d35
ID
1368 return !obj_req->ex.oe_off &&
1369 obj_req->ex.oe_len == rbd_dev->layout.object_size;
6e2a4505
AE
1370}
1371
3da691bf 1372static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
bf0d5f50 1373{
3da691bf 1374 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
bf0d5f50 1375
43df3d35 1376 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
3da691bf 1377 rbd_dev->layout.object_size;
0dcc685e
ID
1378}
1379
86bd7998 1380static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
bf0d5f50 1381{
86bd7998
ID
1382 return ceph_file_extents_bytes(obj_req->img_extents,
1383 obj_req->num_img_extents);
bf0d5f50
AE
1384}
1385
3da691bf 1386static bool rbd_img_is_write(struct rbd_img_request *img_req)
bf0d5f50 1387{
9bb0248d 1388 switch (img_req->op_type) {
3da691bf
ID
1389 case OBJ_OP_READ:
1390 return false;
1391 case OBJ_OP_WRITE:
1392 case OBJ_OP_DISCARD:
1393 return true;
1394 default:
1395 rbd_assert(0);
1396 }
90e98c52
GZ
1397}
1398
3da691bf 1399static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
2761713d 1400
85e084fe 1401static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50 1402{
3da691bf 1403 struct rbd_obj_request *obj_req = osd_req->r_priv;
bf0d5f50 1404
3da691bf
ID
1405 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1406 osd_req->r_result, obj_req);
1407 rbd_assert(osd_req == obj_req->osd_req);
bf0d5f50 1408
3da691bf
ID
1409 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1410 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1411 obj_req->xferred = osd_req->r_result;
1412 else
1413 /*
1414 * Writes aren't allowed to return a data payload. In some
1415 * guarded write cases (e.g. stat + zero on an empty object)
1416 * a stat response makes it through, but we don't care.
1417 */
1418 obj_req->xferred = 0;
bf0d5f50 1419
3da691bf 1420 rbd_obj_handle_request(obj_req);
bf0d5f50
AE
1421}
1422
9d4df01f 1423static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1424{
8c042b0d 1425 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1426
a162b308 1427 osd_req->r_flags = CEPH_OSD_FLAG_READ;
7c84883a 1428 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1429}
1430
1431static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1432{
9d4df01f 1433 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1434
a162b308 1435 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1134e091 1436 ktime_get_real_ts(&osd_req->r_mtime);
43df3d35 1437 osd_req->r_data_offset = obj_request->ex.oe_off;
430c28c3
AE
1438}
1439
bc81207e 1440static struct ceph_osd_request *
a162b308 1441rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
bc81207e 1442{
a162b308
ID
1443 struct rbd_img_request *img_req = obj_req->img_request;
1444 struct rbd_device *rbd_dev = img_req->rbd_dev;
bc81207e
ID
1445 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1446 struct ceph_osd_request *req;
a90bb0c1
ID
1447 const char *name_format = rbd_dev->image_format == 1 ?
1448 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e 1449
a162b308
ID
1450 req = ceph_osdc_alloc_request(osdc,
1451 (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1452 num_ops, false, GFP_NOIO);
bc81207e
ID
1453 if (!req)
1454 return NULL;
1455
bc81207e 1456 req->r_callback = rbd_osd_req_callback;
a162b308 1457 req->r_priv = obj_req;
bc81207e
ID
1458
1459 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
a90bb0c1 1460 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
43df3d35 1461 rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
bc81207e
ID
1462 goto err_req;
1463
1464 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1465 goto err_req;
1466
1467 return req;
1468
1469err_req:
1470 ceph_osdc_put_request(req);
1471 return NULL;
1472}
1473
bf0d5f50
AE
1474static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1475{
1476 ceph_osdc_put_request(osd_req);
1477}
1478
ecc633ca 1479static struct rbd_obj_request *rbd_obj_request_create(void)
bf0d5f50
AE
1480{
1481 struct rbd_obj_request *obj_request;
bf0d5f50 1482
5a60e876 1483 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 1484 if (!obj_request)
f907ad55 1485 return NULL;
f907ad55 1486
43df3d35 1487 ceph_object_extent_init(&obj_request->ex);
bf0d5f50
AE
1488 kref_init(&obj_request->kref);
1489
67e2b652 1490 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
1491 return obj_request;
1492}
1493
1494static void rbd_obj_request_destroy(struct kref *kref)
1495{
1496 struct rbd_obj_request *obj_request;
7e07efb1 1497 u32 i;
bf0d5f50
AE
1498
1499 obj_request = container_of(kref, struct rbd_obj_request, kref);
1500
37206ee5
AE
1501 dout("%s: obj %p\n", __func__, obj_request);
1502
bf0d5f50
AE
1503 if (obj_request->osd_req)
1504 rbd_osd_req_destroy(obj_request->osd_req);
1505
ecc633ca 1506 switch (obj_request->img_request->data_type) {
9969ebc5 1507 case OBJ_REQUEST_NODATA:
bf0d5f50 1508 case OBJ_REQUEST_BIO:
7e07efb1 1509 case OBJ_REQUEST_BVECS:
5359a17d 1510 break; /* Nothing to do */
afb97888
ID
1511 case OBJ_REQUEST_OWN_BVECS:
1512 kfree(obj_request->bvec_pos.bvecs);
788e2df3 1513 break;
7e07efb1
ID
1514 default:
1515 rbd_assert(0);
bf0d5f50
AE
1516 }
1517
86bd7998 1518 kfree(obj_request->img_extents);
7e07efb1
ID
1519 if (obj_request->copyup_bvecs) {
1520 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1521 if (obj_request->copyup_bvecs[i].bv_page)
1522 __free_page(obj_request->copyup_bvecs[i].bv_page);
1523 }
1524 kfree(obj_request->copyup_bvecs);
bf0d5f50
AE
1525 }
1526
868311b1 1527 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1528}
1529
fb65d228
AE
1530/* It's OK to call this for a device with no parent */
1531
1532static void rbd_spec_put(struct rbd_spec *spec);
1533static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1534{
1535 rbd_dev_remove_parent(rbd_dev);
1536 rbd_spec_put(rbd_dev->parent_spec);
1537 rbd_dev->parent_spec = NULL;
1538 rbd_dev->parent_overlap = 0;
1539}
1540
a2acd00e
AE
1541/*
1542 * Parent image reference counting is used to determine when an
1543 * image's parent fields can be safely torn down--after there are no
1544 * more in-flight requests to the parent image. When the last
1545 * reference is dropped, cleaning them up is safe.
1546 */
1547static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1548{
1549 int counter;
1550
1551 if (!rbd_dev->parent_spec)
1552 return;
1553
1554 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1555 if (counter > 0)
1556 return;
1557
1558 /* Last reference; clean up parent data structures */
1559
1560 if (!counter)
1561 rbd_dev_unparent(rbd_dev);
1562 else
9584d508 1563 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
1564}
1565
1566/*
1567 * If an image has a non-zero parent overlap, get a reference to its
1568 * parent.
1569 *
1570 * Returns true if the rbd device has a parent with a non-zero
1571 * overlap and a reference for it was successfully taken, or
1572 * false otherwise.
1573 */
1574static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1575{
ae43e9d0 1576 int counter = 0;
a2acd00e
AE
1577
1578 if (!rbd_dev->parent_spec)
1579 return false;
1580
ae43e9d0
ID
1581 down_read(&rbd_dev->header_rwsem);
1582 if (rbd_dev->parent_overlap)
1583 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1584 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
1585
1586 if (counter < 0)
9584d508 1587 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 1588
ae43e9d0 1589 return counter > 0;
a2acd00e
AE
1590}
1591
bf0d5f50
AE
1592/*
1593 * Caller is responsible for filling in the list of object requests
1594 * that comprises the image request, and the Linux request pointer
1595 * (if there is one).
1596 */
cc344fa1
AE
1597static struct rbd_img_request *rbd_img_request_create(
1598 struct rbd_device *rbd_dev,
6d2940c8 1599 enum obj_operation_type op_type,
4e752f0a 1600 struct ceph_snap_context *snapc)
bf0d5f50
AE
1601{
1602 struct rbd_img_request *img_request;
bf0d5f50 1603
a0c5895b 1604 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
1605 if (!img_request)
1606 return NULL;
1607
bf0d5f50 1608 img_request->rbd_dev = rbd_dev;
9bb0248d 1609 img_request->op_type = op_type;
9bb0248d 1610 if (!rbd_img_is_write(img_request))
bf0d5f50 1611 img_request->snap_id = rbd_dev->spec->snap_id;
9bb0248d
ID
1612 else
1613 img_request->snapc = snapc;
1614
a2acd00e 1615 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1616 img_request_layered_set(img_request);
a0c5895b 1617
bf0d5f50 1618 spin_lock_init(&img_request->completion_lock);
43df3d35 1619 INIT_LIST_HEAD(&img_request->object_extents);
bf0d5f50
AE
1620 kref_init(&img_request->kref);
1621
dfd9875f
ID
1622 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1623 obj_op_name(op_type), img_request);
bf0d5f50
AE
1624 return img_request;
1625}
1626
1627static void rbd_img_request_destroy(struct kref *kref)
1628{
1629 struct rbd_img_request *img_request;
1630 struct rbd_obj_request *obj_request;
1631 struct rbd_obj_request *next_obj_request;
1632
1633 img_request = container_of(kref, struct rbd_img_request, kref);
1634
37206ee5
AE
1635 dout("%s: img %p\n", __func__, img_request);
1636
bf0d5f50
AE
1637 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1638 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1639 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1640
a2acd00e
AE
1641 if (img_request_layered_test(img_request)) {
1642 img_request_layered_clear(img_request);
1643 rbd_dev_parent_put(img_request->rbd_dev);
1644 }
1645
9bb0248d 1646 if (rbd_img_is_write(img_request))
812164f8 1647 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1648
1c2a9dfe 1649 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1650}
1651
86bd7998
ID
1652static void prune_extents(struct ceph_file_extent *img_extents,
1653 u32 *num_img_extents, u64 overlap)
e93f3152 1654{
86bd7998 1655 u32 cnt = *num_img_extents;
e93f3152 1656
86bd7998
ID
1657 /* drop extents completely beyond the overlap */
1658 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1659 cnt--;
e93f3152 1660
86bd7998
ID
1661 if (cnt) {
1662 struct ceph_file_extent *ex = &img_extents[cnt - 1];
e93f3152 1663
86bd7998
ID
1664 /* trim final overlapping extent */
1665 if (ex->fe_off + ex->fe_len > overlap)
1666 ex->fe_len = overlap - ex->fe_off;
1667 }
e93f3152 1668
86bd7998 1669 *num_img_extents = cnt;
e93f3152
AE
1670}
1671
86bd7998
ID
1672/*
1673 * Determine the byte range(s) covered by either just the object extent
1674 * or the entire object in the parent image.
1675 */
1676static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1677 bool entire)
e93f3152 1678{
86bd7998
ID
1679 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1680 int ret;
e93f3152 1681
86bd7998
ID
1682 if (!rbd_dev->parent_overlap)
1683 return 0;
e93f3152 1684
86bd7998
ID
1685 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1686 entire ? 0 : obj_req->ex.oe_off,
1687 entire ? rbd_dev->layout.object_size :
1688 obj_req->ex.oe_len,
1689 &obj_req->img_extents,
1690 &obj_req->num_img_extents);
1691 if (ret)
1692 return ret;
e93f3152 1693
86bd7998
ID
1694 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1695 rbd_dev->parent_overlap);
1696 return 0;
e93f3152
AE
1697}
1698
3da691bf 1699static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1217857f 1700{
ecc633ca 1701 switch (obj_req->img_request->data_type) {
3da691bf
ID
1702 case OBJ_REQUEST_BIO:
1703 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1704 &obj_req->bio_pos,
43df3d35 1705 obj_req->ex.oe_len);
3da691bf
ID
1706 break;
1707 case OBJ_REQUEST_BVECS:
afb97888 1708 case OBJ_REQUEST_OWN_BVECS:
3da691bf 1709 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
43df3d35 1710 obj_req->ex.oe_len);
afb97888 1711 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
3da691bf
ID
1712 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1713 &obj_req->bvec_pos);
1714 break;
1715 default:
1716 rbd_assert(0);
1217857f 1717 }
3da691bf 1718}
1217857f 1719
3da691bf
ID
1720static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1721{
a162b308 1722 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
3da691bf
ID
1723 if (!obj_req->osd_req)
1724 return -ENOMEM;
2a842aca 1725
3da691bf 1726 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
43df3d35 1727 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf 1728 rbd_osd_req_setup_data(obj_req, 0);
7ad18afa 1729
3da691bf
ID
1730 rbd_osd_req_format_read(obj_req);
1731 return 0;
1732}
1733
1734static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1735 unsigned int which)
1736{
1737 struct page **pages;
8b3e1a56 1738
3da691bf
ID
1739 /*
1740 * The response data for a STAT call consists of:
1741 * le64 length;
1742 * struct {
1743 * le32 tv_sec;
1744 * le32 tv_nsec;
1745 * } mtime;
1746 */
1747 pages = ceph_alloc_page_vector(1, GFP_NOIO);
1748 if (IS_ERR(pages))
1749 return PTR_ERR(pages);
1750
1751 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1752 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1753 8 + sizeof(struct ceph_timespec),
1754 0, false, true);
1755 return 0;
1217857f
AE
1756}
1757
3da691bf
ID
1758static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1759 unsigned int which)
2169238d 1760{
3da691bf
ID
1761 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1762 u16 opcode;
2169238d 1763
3da691bf
ID
1764 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1765 rbd_dev->layout.object_size,
1766 rbd_dev->layout.object_size);
2169238d 1767
3da691bf
ID
1768 if (rbd_obj_is_entire(obj_req))
1769 opcode = CEPH_OSD_OP_WRITEFULL;
1770 else
1771 opcode = CEPH_OSD_OP_WRITE;
2169238d 1772
3da691bf 1773 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
43df3d35 1774 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf 1775 rbd_osd_req_setup_data(obj_req, which++);
2169238d 1776
3da691bf
ID
1777 rbd_assert(which == obj_req->osd_req->r_num_ops);
1778 rbd_osd_req_format_write(obj_req);
1779}
2169238d 1780
3da691bf
ID
1781static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1782{
3da691bf
ID
1783 unsigned int num_osd_ops, which = 0;
1784 int ret;
1785
86bd7998
ID
1786 /* reverse map the entire object onto the parent */
1787 ret = rbd_obj_calc_img_extents(obj_req, true);
1788 if (ret)
1789 return ret;
1790
1791 if (obj_req->num_img_extents) {
3da691bf
ID
1792 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1793 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1794 } else {
1795 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1796 num_osd_ops = 2; /* setallochint + write/writefull */
2169238d
AE
1797 }
1798
a162b308 1799 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1800 if (!obj_req->osd_req)
1801 return -ENOMEM;
2169238d 1802
86bd7998 1803 if (obj_req->num_img_extents) {
3da691bf
ID
1804 ret = __rbd_obj_setup_stat(obj_req, which++);
1805 if (ret)
1806 return ret;
1807 }
1808
1809 __rbd_obj_setup_write(obj_req, which);
1810 return 0;
2169238d
AE
1811}
1812
3da691bf
ID
1813static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1814 unsigned int which)
1815{
3b434a2a
JD
1816 u16 opcode;
1817
3da691bf 1818 if (rbd_obj_is_entire(obj_req)) {
86bd7998 1819 if (obj_req->num_img_extents) {
2bb1e56e
ID
1820 osd_req_op_init(obj_req->osd_req, which++,
1821 CEPH_OSD_OP_CREATE, 0);
3b434a2a
JD
1822 opcode = CEPH_OSD_OP_TRUNCATE;
1823 } else {
3da691bf
ID
1824 osd_req_op_init(obj_req->osd_req, which++,
1825 CEPH_OSD_OP_DELETE, 0);
1826 opcode = 0;
3b434a2a 1827 }
3da691bf
ID
1828 } else if (rbd_obj_is_tail(obj_req)) {
1829 opcode = CEPH_OSD_OP_TRUNCATE;
3b434a2a 1830 } else {
3da691bf 1831 opcode = CEPH_OSD_OP_ZERO;
3b434a2a
JD
1832 }
1833
3da691bf
ID
1834 if (opcode)
1835 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
43df3d35 1836 obj_req->ex.oe_off, obj_req->ex.oe_len,
3da691bf
ID
1837 0, 0);
1838
1839 rbd_assert(which == obj_req->osd_req->r_num_ops);
1840 rbd_osd_req_format_write(obj_req);
3b434a2a
JD
1841}
1842
3da691bf 1843static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
bf0d5f50 1844{
3da691bf
ID
1845 unsigned int num_osd_ops, which = 0;
1846 int ret;
37206ee5 1847
86bd7998
ID
1848 /* reverse map the entire object onto the parent */
1849 ret = rbd_obj_calc_img_extents(obj_req, true);
1850 if (ret)
1851 return ret;
f1a4739f 1852
3da691bf
ID
1853 if (rbd_obj_is_entire(obj_req)) {
1854 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2bb1e56e
ID
1855 if (obj_req->num_img_extents)
1856 num_osd_ops = 2; /* create + truncate */
1857 else
1858 num_osd_ops = 1; /* delete */
3da691bf 1859 } else {
86bd7998 1860 if (obj_req->num_img_extents) {
3da691bf
ID
1861 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1862 num_osd_ops = 2; /* stat + truncate/zero */
1863 } else {
1864 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1865 num_osd_ops = 1; /* truncate/zero */
1866 }
f1a4739f
AE
1867 }
1868
a162b308 1869 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1870 if (!obj_req->osd_req)
1871 return -ENOMEM;
bf0d5f50 1872
86bd7998 1873 if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
3da691bf
ID
1874 ret = __rbd_obj_setup_stat(obj_req, which++);
1875 if (ret)
1876 return ret;
1877 }
3b434a2a 1878
3da691bf
ID
1879 __rbd_obj_setup_discard(obj_req, which);
1880 return 0;
1881}
9d4df01f 1882
3da691bf
ID
1883/*
1884 * For each object request in @img_req, allocate an OSD request, add
1885 * individual OSD ops and prepare them for submission. The number of
1886 * OSD ops depends on op_type and the overlap point (if any).
1887 */
1888static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1889{
1890 struct rbd_obj_request *obj_req;
1891 int ret;
430c28c3 1892
3da691bf 1893 for_each_obj_request(img_req, obj_req) {
9bb0248d 1894 switch (img_req->op_type) {
3da691bf
ID
1895 case OBJ_OP_READ:
1896 ret = rbd_obj_setup_read(obj_req);
1897 break;
1898 case OBJ_OP_WRITE:
1899 ret = rbd_obj_setup_write(obj_req);
1900 break;
1901 case OBJ_OP_DISCARD:
1902 ret = rbd_obj_setup_discard(obj_req);
1903 break;
1904 default:
1905 rbd_assert(0);
1906 }
1907 if (ret)
1908 return ret;
bf0d5f50
AE
1909 }
1910
1911 return 0;
3da691bf 1912}
bf0d5f50 1913
5a237819
ID
1914union rbd_img_fill_iter {
1915 struct ceph_bio_iter bio_iter;
1916 struct ceph_bvec_iter bvec_iter;
1917};
bf0d5f50 1918
5a237819
ID
1919struct rbd_img_fill_ctx {
1920 enum obj_request_type pos_type;
1921 union rbd_img_fill_iter *pos;
1922 union rbd_img_fill_iter iter;
1923 ceph_object_extent_fn_t set_pos_fn;
afb97888
ID
1924 ceph_object_extent_fn_t count_fn;
1925 ceph_object_extent_fn_t copy_fn;
5a237819 1926};
bf0d5f50 1927
5a237819 1928static struct ceph_object_extent *alloc_object_extent(void *arg)
0eefd470 1929{
5a237819
ID
1930 struct rbd_img_request *img_req = arg;
1931 struct rbd_obj_request *obj_req;
0eefd470 1932
5a237819
ID
1933 obj_req = rbd_obj_request_create();
1934 if (!obj_req)
1935 return NULL;
2761713d 1936
5a237819
ID
1937 rbd_img_obj_request_add(img_req, obj_req);
1938 return &obj_req->ex;
1939}
0eefd470 1940
afb97888
ID
1941/*
1942 * While su != os && sc == 1 is technically not fancy (it's the same
1943 * layout as su == os && sc == 1), we can't use the nocopy path for it
1944 * because ->set_pos_fn() should be called only once per object.
1945 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1946 * treat su != os && sc == 1 as fancy.
1947 */
1948static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1949{
1950 return l->stripe_unit != l->object_size;
1951}
0eefd470 1952
afb97888
ID
1953static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1954 struct ceph_file_extent *img_extents,
1955 u32 num_img_extents,
1956 struct rbd_img_fill_ctx *fctx)
1957{
1958 u32 i;
1959 int ret;
1960
1961 img_req->data_type = fctx->pos_type;
0eefd470
AE
1962
1963 /*
afb97888
ID
1964 * Create object requests and set each object request's starting
1965 * position in the provided bio (list) or bio_vec array.
0eefd470 1966 */
afb97888
ID
1967 fctx->iter = *fctx->pos;
1968 for (i = 0; i < num_img_extents; i++) {
1969 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
1970 img_extents[i].fe_off,
1971 img_extents[i].fe_len,
1972 &img_req->object_extents,
1973 alloc_object_extent, img_req,
1974 fctx->set_pos_fn, &fctx->iter);
1975 if (ret)
1976 return ret;
1977 }
0eefd470 1978
afb97888 1979 return __rbd_img_fill_request(img_req);
0eefd470
AE
1980}
1981
5a237819
ID
1982/*
1983 * Map a list of image extents to a list of object extents, create the
1984 * corresponding object requests (normally each to a different object,
1985 * but not always) and add them to @img_req. For each object request,
afb97888 1986 * set up its data descriptor to point to the corresponding chunk(s) of
5a237819
ID
1987 * @fctx->pos data buffer.
1988 *
afb97888
ID
1989 * Because ceph_file_to_extents() will merge adjacent object extents
1990 * together, each object request's data descriptor may point to multiple
1991 * different chunks of @fctx->pos data buffer.
1992 *
5a237819
ID
1993 * @fctx->pos data buffer is assumed to be large enough.
1994 */
1995static int rbd_img_fill_request(struct rbd_img_request *img_req,
1996 struct ceph_file_extent *img_extents,
1997 u32 num_img_extents,
1998 struct rbd_img_fill_ctx *fctx)
3d7efd18 1999{
afb97888
ID
2000 struct rbd_device *rbd_dev = img_req->rbd_dev;
2001 struct rbd_obj_request *obj_req;
5a237819
ID
2002 u32 i;
2003 int ret;
2004
afb97888
ID
2005 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2006 !rbd_layout_is_fancy(&rbd_dev->layout))
2007 return rbd_img_fill_request_nocopy(img_req, img_extents,
2008 num_img_extents, fctx);
3d7efd18 2009
afb97888 2010 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
0eefd470 2011
bbea1c1a 2012 /*
afb97888
ID
2013 * Create object requests and determine ->bvec_count for each object
2014 * request. Note that ->bvec_count sum over all object requests may
2015 * be greater than the number of bio_vecs in the provided bio (list)
2016 * or bio_vec array because when mapped, those bio_vecs can straddle
2017 * stripe unit boundaries.
bbea1c1a 2018 */
5a237819
ID
2019 fctx->iter = *fctx->pos;
2020 for (i = 0; i < num_img_extents; i++) {
afb97888 2021 ret = ceph_file_to_extents(&rbd_dev->layout,
5a237819
ID
2022 img_extents[i].fe_off,
2023 img_extents[i].fe_len,
2024 &img_req->object_extents,
2025 alloc_object_extent, img_req,
afb97888
ID
2026 fctx->count_fn, &fctx->iter);
2027 if (ret)
2028 return ret;
bbea1c1a 2029 }
0eefd470 2030
afb97888
ID
2031 for_each_obj_request(img_req, obj_req) {
2032 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2033 sizeof(*obj_req->bvec_pos.bvecs),
2034 GFP_NOIO);
2035 if (!obj_req->bvec_pos.bvecs)
2036 return -ENOMEM;
2037 }
0eefd470 2038
8785b1d4 2039 /*
afb97888
ID
2040 * Fill in each object request's private bio_vec array, splitting and
2041 * rearranging the provided bio_vecs in stripe unit chunks as needed.
8785b1d4 2042 */
afb97888
ID
2043 fctx->iter = *fctx->pos;
2044 for (i = 0; i < num_img_extents; i++) {
2045 ret = ceph_iterate_extents(&rbd_dev->layout,
2046 img_extents[i].fe_off,
2047 img_extents[i].fe_len,
2048 &img_req->object_extents,
2049 fctx->copy_fn, &fctx->iter);
5a237819
ID
2050 if (ret)
2051 return ret;
2052 }
3d7efd18 2053
5a237819
ID
2054 return __rbd_img_fill_request(img_req);
2055}
2056
2057static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2058 u64 off, u64 len)
2059{
2060 struct ceph_file_extent ex = { off, len };
2061 union rbd_img_fill_iter dummy;
2062 struct rbd_img_fill_ctx fctx = {
2063 .pos_type = OBJ_REQUEST_NODATA,
2064 .pos = &dummy,
2065 };
2066
2067 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2068}
2069
2070static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2071{
2072 struct rbd_obj_request *obj_req =
2073 container_of(ex, struct rbd_obj_request, ex);
2074 struct ceph_bio_iter *it = arg;
3d7efd18 2075
5a237819
ID
2076 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2077 obj_req->bio_pos = *it;
2078 ceph_bio_iter_advance(it, bytes);
2079}
3d7efd18 2080
afb97888
ID
2081static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2082{
2083 struct rbd_obj_request *obj_req =
2084 container_of(ex, struct rbd_obj_request, ex);
2085 struct ceph_bio_iter *it = arg;
0eefd470 2086
afb97888
ID
2087 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2088 ceph_bio_iter_advance_step(it, bytes, ({
2089 obj_req->bvec_count++;
2090 }));
0eefd470 2091
afb97888 2092}
0eefd470 2093
afb97888
ID
2094static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2095{
2096 struct rbd_obj_request *obj_req =
2097 container_of(ex, struct rbd_obj_request, ex);
2098 struct ceph_bio_iter *it = arg;
0eefd470 2099
afb97888
ID
2100 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2101 ceph_bio_iter_advance_step(it, bytes, ({
2102 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2103 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2104 }));
3d7efd18
AE
2105}
2106
5a237819
ID
2107static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2108 struct ceph_file_extent *img_extents,
2109 u32 num_img_extents,
2110 struct ceph_bio_iter *bio_pos)
2111{
2112 struct rbd_img_fill_ctx fctx = {
2113 .pos_type = OBJ_REQUEST_BIO,
2114 .pos = (union rbd_img_fill_iter *)bio_pos,
2115 .set_pos_fn = set_bio_pos,
afb97888
ID
2116 .count_fn = count_bio_bvecs,
2117 .copy_fn = copy_bio_bvecs,
5a237819 2118 };
3d7efd18 2119
5a237819
ID
2120 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2121 &fctx);
2122}
3d7efd18 2123
5a237819
ID
2124static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2125 u64 off, u64 len, struct bio *bio)
2126{
2127 struct ceph_file_extent ex = { off, len };
2128 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
3d7efd18 2129
5a237819
ID
2130 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2131}
a9e8ba2c 2132
5a237819
ID
2133static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2134{
2135 struct rbd_obj_request *obj_req =
2136 container_of(ex, struct rbd_obj_request, ex);
2137 struct ceph_bvec_iter *it = arg;
3d7efd18 2138
5a237819
ID
2139 obj_req->bvec_pos = *it;
2140 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2141 ceph_bvec_iter_advance(it, bytes);
2142}
3d7efd18 2143
afb97888
ID
2144static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2145{
2146 struct rbd_obj_request *obj_req =
2147 container_of(ex, struct rbd_obj_request, ex);
2148 struct ceph_bvec_iter *it = arg;
058aa991 2149
afb97888
ID
2150 ceph_bvec_iter_advance_step(it, bytes, ({
2151 obj_req->bvec_count++;
2152 }));
2153}
058aa991 2154
afb97888
ID
2155static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2156{
2157 struct rbd_obj_request *obj_req =
2158 container_of(ex, struct rbd_obj_request, ex);
2159 struct ceph_bvec_iter *it = arg;
3d7efd18 2160
afb97888
ID
2161 ceph_bvec_iter_advance_step(it, bytes, ({
2162 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2163 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2164 }));
3d7efd18
AE
2165}
2166
5a237819
ID
2167static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2168 struct ceph_file_extent *img_extents,
2169 u32 num_img_extents,
2170 struct ceph_bvec_iter *bvec_pos)
c5b5ef6c 2171{
5a237819
ID
2172 struct rbd_img_fill_ctx fctx = {
2173 .pos_type = OBJ_REQUEST_BVECS,
2174 .pos = (union rbd_img_fill_iter *)bvec_pos,
2175 .set_pos_fn = set_bvec_pos,
afb97888
ID
2176 .count_fn = count_bvecs,
2177 .copy_fn = copy_bvecs,
5a237819 2178 };
c5b5ef6c 2179
5a237819
ID
2180 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2181 &fctx);
2182}
c5b5ef6c 2183
5a237819
ID
2184static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2185 struct ceph_file_extent *img_extents,
2186 u32 num_img_extents,
2187 struct bio_vec *bvecs)
2188{
2189 struct ceph_bvec_iter it = {
2190 .bvecs = bvecs,
2191 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2192 num_img_extents) },
2193 };
c5b5ef6c 2194
5a237819
ID
2195 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2196 &it);
2197}
c5b5ef6c 2198
efbd1a11 2199static void rbd_img_request_submit(struct rbd_img_request *img_request)
bf0d5f50 2200{
bf0d5f50 2201 struct rbd_obj_request *obj_request;
c5b5ef6c 2202
37206ee5 2203 dout("%s: img %p\n", __func__, img_request);
c2e82414 2204
663ae2cc 2205 rbd_img_request_get(img_request);
efbd1a11 2206 for_each_obj_request(img_request, obj_request)
3da691bf 2207 rbd_obj_request_submit(obj_request);
c2e82414 2208
663ae2cc 2209 rbd_img_request_put(img_request);
c5b5ef6c
AE
2210}
2211
86bd7998 2212static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
c5b5ef6c 2213{
3da691bf
ID
2214 struct rbd_img_request *img_req = obj_req->img_request;
2215 struct rbd_img_request *child_img_req;
c5b5ef6c
AE
2216 int ret;
2217
e93aca0a
ID
2218 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2219 OBJ_OP_READ, NULL);
3da691bf 2220 if (!child_img_req)
710214e3
ID
2221 return -ENOMEM;
2222
e93aca0a
ID
2223 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2224 child_img_req->obj_request = obj_req;
a90bb0c1 2225
3da691bf 2226 if (!rbd_img_is_write(img_req)) {
ecc633ca 2227 switch (img_req->data_type) {
3da691bf 2228 case OBJ_REQUEST_BIO:
5a237819
ID
2229 ret = __rbd_img_fill_from_bio(child_img_req,
2230 obj_req->img_extents,
2231 obj_req->num_img_extents,
2232 &obj_req->bio_pos);
3da691bf
ID
2233 break;
2234 case OBJ_REQUEST_BVECS:
afb97888 2235 case OBJ_REQUEST_OWN_BVECS:
5a237819
ID
2236 ret = __rbd_img_fill_from_bvecs(child_img_req,
2237 obj_req->img_extents,
2238 obj_req->num_img_extents,
2239 &obj_req->bvec_pos);
3da691bf
ID
2240 break;
2241 default:
2242 rbd_assert(0);
2243 }
2244 } else {
5a237819
ID
2245 ret = rbd_img_fill_from_bvecs(child_img_req,
2246 obj_req->img_extents,
2247 obj_req->num_img_extents,
2248 obj_req->copyup_bvecs);
3da691bf
ID
2249 }
2250 if (ret) {
2251 rbd_img_request_put(child_img_req);
2252 return ret;
2253 }
2254
2255 rbd_img_request_submit(child_img_req);
2256 return 0;
2257}
2258
2259static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2260{
2261 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2262 int ret;
2263
2264 if (obj_req->result == -ENOENT &&
86bd7998
ID
2265 rbd_dev->parent_overlap && !obj_req->tried_parent) {
2266 /* reverse map this object extent onto the parent */
2267 ret = rbd_obj_calc_img_extents(obj_req, false);
3da691bf
ID
2268 if (ret) {
2269 obj_req->result = ret;
2270 return true;
2271 }
86bd7998
ID
2272
2273 if (obj_req->num_img_extents) {
2274 obj_req->tried_parent = true;
2275 ret = rbd_obj_read_from_parent(obj_req);
2276 if (ret) {
2277 obj_req->result = ret;
2278 return true;
2279 }
2280 return false;
2281 }
710214e3
ID
2282 }
2283
c5b5ef6c 2284 /*
3da691bf
ID
2285 * -ENOENT means a hole in the image -- zero-fill the entire
2286 * length of the request. A short read also implies zero-fill
2287 * to the end of the request. In both cases we update xferred
2288 * count to indicate the whole request was satisfied.
c5b5ef6c 2289 */
3da691bf 2290 if (obj_req->result == -ENOENT ||
43df3d35 2291 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
3da691bf
ID
2292 rbd_assert(!obj_req->xferred || !obj_req->result);
2293 rbd_obj_zero_range(obj_req, obj_req->xferred,
43df3d35 2294 obj_req->ex.oe_len - obj_req->xferred);
3da691bf 2295 obj_req->result = 0;
43df3d35 2296 obj_req->xferred = obj_req->ex.oe_len;
710214e3 2297 }
c5b5ef6c 2298
3da691bf
ID
2299 return true;
2300}
c5b5ef6c 2301
3da691bf
ID
2302/*
2303 * copyup_bvecs pages are never highmem pages
2304 */
2305static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2306{
2307 struct ceph_bvec_iter it = {
2308 .bvecs = bvecs,
2309 .iter = { .bi_size = bytes },
2310 };
c5b5ef6c 2311
3da691bf
ID
2312 ceph_bvec_iter_advance_step(&it, bytes, ({
2313 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2314 bv.bv_len))
2315 return false;
2316 }));
2317 return true;
c5b5ef6c
AE
2318}
2319
3da691bf 2320static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
b454e36d 2321{
3da691bf 2322 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
70d045f6 2323
3da691bf
ID
2324 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2325 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2326 rbd_osd_req_destroy(obj_req->osd_req);
70d045f6 2327
b454e36d 2328 /*
3da691bf
ID
2329 * Create a copyup request with the same number of OSD ops as
2330 * the original request. The original request was stat + op(s),
2331 * the new copyup request will be copyup + the same op(s).
b454e36d 2332 */
a162b308 2333 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
2334 if (!obj_req->osd_req)
2335 return -ENOMEM;
b454e36d 2336
c622d226 2337 /*
3da691bf
ID
2338 * Only send non-zero copyup data to save some I/O and network
2339 * bandwidth -- zero copyup data is equivalent to the object not
2340 * existing.
c622d226 2341 */
3da691bf
ID
2342 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2343 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2344 bytes = 0;
2345 }
c622d226 2346
3da691bf
ID
2347 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2348 "copyup");
2349 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2350 obj_req->copyup_bvecs, bytes);
2351
9bb0248d 2352 switch (obj_req->img_request->op_type) {
3da691bf
ID
2353 case OBJ_OP_WRITE:
2354 __rbd_obj_setup_write(obj_req, 1);
2355 break;
2356 case OBJ_OP_DISCARD:
2357 rbd_assert(!rbd_obj_is_entire(obj_req));
2358 __rbd_obj_setup_discard(obj_req, 1);
2359 break;
2360 default:
2361 rbd_assert(0);
2362 }
70d045f6 2363
3da691bf 2364 rbd_obj_request_submit(obj_req);
3da691bf 2365 return 0;
70d045f6
ID
2366}
2367
7e07efb1 2368static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
70d045f6 2369{
7e07efb1 2370 u32 i;
b454e36d 2371
7e07efb1
ID
2372 rbd_assert(!obj_req->copyup_bvecs);
2373 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2374 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2375 sizeof(*obj_req->copyup_bvecs),
2376 GFP_NOIO);
2377 if (!obj_req->copyup_bvecs)
2378 return -ENOMEM;
b454e36d 2379
7e07efb1
ID
2380 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2381 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2382
2383 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2384 if (!obj_req->copyup_bvecs[i].bv_page)
2385 return -ENOMEM;
3d7efd18 2386
7e07efb1
ID
2387 obj_req->copyup_bvecs[i].bv_offset = 0;
2388 obj_req->copyup_bvecs[i].bv_len = len;
2389 obj_overlap -= len;
2390 }
b454e36d 2391
7e07efb1
ID
2392 rbd_assert(!obj_overlap);
2393 return 0;
b454e36d
AE
2394}
2395
3da691bf 2396static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
bf0d5f50 2397{
3da691bf 2398 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3da691bf 2399 int ret;
bf0d5f50 2400
86bd7998
ID
2401 rbd_assert(obj_req->num_img_extents);
2402 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2403 rbd_dev->parent_overlap);
2404 if (!obj_req->num_img_extents) {
3da691bf
ID
2405 /*
2406 * The overlap has become 0 (most likely because the
2407 * image has been flattened). Use rbd_obj_issue_copyup()
2408 * to re-submit the original write request -- the copyup
2409 * operation itself will be a no-op, since someone must
2410 * have populated the child object while we weren't
2411 * looking. Move to WRITE_FLAT state as we'll be done
2412 * with the operation once the null copyup completes.
2413 */
2414 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2415 return rbd_obj_issue_copyup(obj_req, 0);
bf0d5f50
AE
2416 }
2417
86bd7998 2418 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3da691bf
ID
2419 if (ret)
2420 return ret;
2421
2422 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
86bd7998 2423 return rbd_obj_read_from_parent(obj_req);
bf0d5f50 2424}
8b3e1a56 2425
3da691bf 2426static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
8b3e1a56 2427{
3da691bf 2428 int ret;
8b3e1a56 2429
3da691bf
ID
2430again:
2431 switch (obj_req->write_state) {
2432 case RBD_OBJ_WRITE_GUARD:
2433 rbd_assert(!obj_req->xferred);
2434 if (obj_req->result == -ENOENT) {
2435 /*
2436 * The target object doesn't exist. Read the data for
2437 * the entire target object up to the overlap point (if
2438 * any) from the parent, so we can use it for a copyup.
2439 */
2440 ret = rbd_obj_handle_write_guard(obj_req);
2441 if (ret) {
2442 obj_req->result = ret;
2443 return true;
2444 }
2445 return false;
2446 }
2447 /* fall through */
2448 case RBD_OBJ_WRITE_FLAT:
2449 if (!obj_req->result)
2450 /*
2451 * There is no such thing as a successful short
2452 * write -- indicate the whole request was satisfied.
2453 */
43df3d35 2454 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2455 return true;
2456 case RBD_OBJ_WRITE_COPYUP:
2457 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2458 if (obj_req->result)
2459 goto again;
8b3e1a56 2460
3da691bf
ID
2461 rbd_assert(obj_req->xferred);
2462 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2463 if (ret) {
2464 obj_req->result = ret;
2465 return true;
2466 }
2467 return false;
2468 default:
2469 rbd_assert(0);
2470 }
2471}
02c74fba 2472
3da691bf
ID
2473/*
2474 * Returns true if @obj_req is completed, or false otherwise.
2475 */
2476static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2477{
9bb0248d 2478 switch (obj_req->img_request->op_type) {
3da691bf
ID
2479 case OBJ_OP_READ:
2480 return rbd_obj_handle_read(obj_req);
2481 case OBJ_OP_WRITE:
2482 return rbd_obj_handle_write(obj_req);
2483 case OBJ_OP_DISCARD:
2484 if (rbd_obj_handle_write(obj_req)) {
2485 /*
2486 * Hide -ENOENT from delete/truncate/zero -- discarding
2487 * a non-existent object is not a problem.
2488 */
2489 if (obj_req->result == -ENOENT) {
2490 obj_req->result = 0;
43df3d35 2491 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2492 }
2493 return true;
2494 }
2495 return false;
2496 default:
2497 rbd_assert(0);
2498 }
2499}
02c74fba 2500
7114edac
ID
2501static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2502{
2503 struct rbd_img_request *img_req = obj_req->img_request;
2504
2505 rbd_assert((!obj_req->result &&
43df3d35 2506 obj_req->xferred == obj_req->ex.oe_len) ||
7114edac
ID
2507 (obj_req->result < 0 && !obj_req->xferred));
2508 if (!obj_req->result) {
2509 img_req->xferred += obj_req->xferred;
980917fc 2510 return;
02c74fba 2511 }
a9e8ba2c 2512
7114edac
ID
2513 rbd_warn(img_req->rbd_dev,
2514 "%s at objno %llu %llu~%llu result %d xferred %llu",
43df3d35
ID
2515 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2516 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
7114edac
ID
2517 obj_req->xferred);
2518 if (!img_req->result) {
2519 img_req->result = obj_req->result;
2520 img_req->xferred = 0;
2521 }
2522}
a9e8ba2c 2523
3da691bf
ID
2524static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2525{
2526 struct rbd_obj_request *obj_req = img_req->obj_request;
a9e8ba2c 2527
3da691bf 2528 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
86bd7998
ID
2529 rbd_assert((!img_req->result &&
2530 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2531 (img_req->result < 0 && !img_req->xferred));
8b3e1a56 2532
3da691bf
ID
2533 obj_req->result = img_req->result;
2534 obj_req->xferred = img_req->xferred;
2535 rbd_img_request_put(img_req);
8b3e1a56
AE
2536}
2537
7114edac 2538static void rbd_img_end_request(struct rbd_img_request *img_req)
8b3e1a56 2539{
7114edac
ID
2540 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2541 rbd_assert((!img_req->result &&
2542 img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2543 (img_req->result < 0 && !img_req->xferred));
8b3e1a56 2544
7114edac
ID
2545 blk_mq_end_request(img_req->rq,
2546 errno_to_blk_status(img_req->result));
2547 rbd_img_request_put(img_req);
3da691bf 2548}
8b3e1a56 2549
3da691bf
ID
2550static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2551{
7114edac 2552 struct rbd_img_request *img_req;
8b3e1a56 2553
7114edac 2554again:
3da691bf
ID
2555 if (!__rbd_obj_handle_request(obj_req))
2556 return;
8b3e1a56 2557
7114edac
ID
2558 img_req = obj_req->img_request;
2559 spin_lock(&img_req->completion_lock);
2560 rbd_obj_end_request(obj_req);
2561 rbd_assert(img_req->pending_count);
2562 if (--img_req->pending_count) {
2563 spin_unlock(&img_req->completion_lock);
2564 return;
2565 }
8b3e1a56 2566
7114edac
ID
2567 spin_unlock(&img_req->completion_lock);
2568 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2569 obj_req = img_req->obj_request;
2570 rbd_img_end_child_request(img_req);
2571 goto again;
2572 }
2573 rbd_img_end_request(img_req);
8b3e1a56 2574}
bf0d5f50 2575
ed95b21a 2576static const struct rbd_client_id rbd_empty_cid;
b8d70035 2577
ed95b21a
ID
2578static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2579 const struct rbd_client_id *rhs)
2580{
2581 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2582}
2583
2584static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2585{
2586 struct rbd_client_id cid;
2587
2588 mutex_lock(&rbd_dev->watch_mutex);
2589 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2590 cid.handle = rbd_dev->watch_cookie;
2591 mutex_unlock(&rbd_dev->watch_mutex);
2592 return cid;
2593}
2594
2595/*
2596 * lock_rwsem must be held for write
2597 */
2598static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2599 const struct rbd_client_id *cid)
2600{
2601 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2602 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2603 cid->gid, cid->handle);
2604 rbd_dev->owner_cid = *cid; /* struct */
2605}
2606
2607static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2608{
2609 mutex_lock(&rbd_dev->watch_mutex);
2610 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2611 mutex_unlock(&rbd_dev->watch_mutex);
2612}
2613
edd8ca80
FM
2614static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2615{
2616 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2617
2618 strcpy(rbd_dev->lock_cookie, cookie);
2619 rbd_set_owner_cid(rbd_dev, &cid);
2620 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2621}
2622
ed95b21a
ID
2623/*
2624 * lock_rwsem must be held for write
2625 */
2626static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 2627{
922dab61 2628 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2629 char cookie[32];
e627db08 2630 int ret;
b8d70035 2631
cbbfb0ff
ID
2632 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2633 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 2634
ed95b21a
ID
2635 format_lock_cookie(rbd_dev, cookie);
2636 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2637 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2638 RBD_LOCK_TAG, "", 0);
e627db08 2639 if (ret)
ed95b21a 2640 return ret;
b8d70035 2641
ed95b21a 2642 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
edd8ca80 2643 __rbd_lock(rbd_dev, cookie);
ed95b21a 2644 return 0;
b8d70035
AE
2645}
2646
ed95b21a
ID
2647/*
2648 * lock_rwsem must be held for write
2649 */
bbead745 2650static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 2651{
922dab61 2652 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
2653 int ret;
2654
cbbfb0ff
ID
2655 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2656 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 2657
ed95b21a 2658 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 2659 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
2660 if (ret && ret != -ENOENT)
2661 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 2662
bbead745
ID
2663 /* treat errors as the image is unlocked */
2664 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 2665 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
2666 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2667 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
2668}
2669
ed95b21a
ID
2670static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2671 enum rbd_notify_op notify_op,
2672 struct page ***preply_pages,
2673 size_t *preply_len)
9969ebc5
AE
2674{
2675 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2676 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
08a79102
KS
2677 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2678 int buf_size = sizeof(buf);
ed95b21a 2679 void *p = buf;
9969ebc5 2680
ed95b21a 2681 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 2682
ed95b21a
ID
2683 /* encode *LockPayload NotifyMessage (op + ClientId) */
2684 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2685 ceph_encode_32(&p, notify_op);
2686 ceph_encode_64(&p, cid.gid);
2687 ceph_encode_64(&p, cid.handle);
8eb87565 2688
ed95b21a
ID
2689 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2690 &rbd_dev->header_oloc, buf, buf_size,
2691 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
2692}
2693
ed95b21a
ID
2694static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2695 enum rbd_notify_op notify_op)
b30a01f2 2696{
ed95b21a
ID
2697 struct page **reply_pages;
2698 size_t reply_len;
b30a01f2 2699
ed95b21a
ID
2700 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2701 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2702}
b30a01f2 2703
ed95b21a
ID
2704static void rbd_notify_acquired_lock(struct work_struct *work)
2705{
2706 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2707 acquired_lock_work);
76756a51 2708
ed95b21a 2709 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
2710}
2711
ed95b21a 2712static void rbd_notify_released_lock(struct work_struct *work)
c525f036 2713{
ed95b21a
ID
2714 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2715 released_lock_work);
811c6688 2716
ed95b21a 2717 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
2718}
2719
ed95b21a 2720static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 2721{
ed95b21a
ID
2722 struct page **reply_pages;
2723 size_t reply_len;
2724 bool lock_owner_responded = false;
36be9a76
AE
2725 int ret;
2726
ed95b21a 2727 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 2728
ed95b21a
ID
2729 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2730 &reply_pages, &reply_len);
2731 if (ret && ret != -ETIMEDOUT) {
2732 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 2733 goto out;
ed95b21a 2734 }
36be9a76 2735
ed95b21a
ID
2736 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2737 void *p = page_address(reply_pages[0]);
2738 void *const end = p + reply_len;
2739 u32 n;
36be9a76 2740
ed95b21a
ID
2741 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2742 while (n--) {
2743 u8 struct_v;
2744 u32 len;
36be9a76 2745
ed95b21a
ID
2746 ceph_decode_need(&p, end, 8 + 8, e_inval);
2747 p += 8 + 8; /* skip gid and cookie */
04017e29 2748
ed95b21a
ID
2749 ceph_decode_32_safe(&p, end, len, e_inval);
2750 if (!len)
2751 continue;
2752
2753 if (lock_owner_responded) {
2754 rbd_warn(rbd_dev,
2755 "duplicate lock owners detected");
2756 ret = -EIO;
2757 goto out;
2758 }
2759
2760 lock_owner_responded = true;
2761 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2762 &struct_v, &len);
2763 if (ret) {
2764 rbd_warn(rbd_dev,
2765 "failed to decode ResponseMessage: %d",
2766 ret);
2767 goto e_inval;
2768 }
2769
2770 ret = ceph_decode_32(&p);
2771 }
2772 }
2773
2774 if (!lock_owner_responded) {
2775 rbd_warn(rbd_dev, "no lock owners detected");
2776 ret = -ETIMEDOUT;
2777 }
2778
2779out:
2780 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2781 return ret;
2782
2783e_inval:
2784 ret = -EINVAL;
2785 goto out;
2786}
2787
2788static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2789{
2790 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2791
2792 cancel_delayed_work(&rbd_dev->lock_dwork);
2793 if (wake_all)
2794 wake_up_all(&rbd_dev->lock_waitq);
2795 else
2796 wake_up(&rbd_dev->lock_waitq);
2797}
2798
2799static int get_lock_owner_info(struct rbd_device *rbd_dev,
2800 struct ceph_locker **lockers, u32 *num_lockers)
2801{
2802 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2803 u8 lock_type;
2804 char *lock_tag;
2805 int ret;
2806
2807 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2808
2809 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2810 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2811 &lock_type, &lock_tag, lockers, num_lockers);
2812 if (ret)
2813 return ret;
2814
2815 if (*num_lockers == 0) {
2816 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2817 goto out;
2818 }
2819
2820 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2821 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2822 lock_tag);
2823 ret = -EBUSY;
2824 goto out;
2825 }
2826
2827 if (lock_type == CEPH_CLS_LOCK_SHARED) {
2828 rbd_warn(rbd_dev, "shared lock type detected");
2829 ret = -EBUSY;
2830 goto out;
2831 }
2832
2833 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2834 strlen(RBD_LOCK_COOKIE_PREFIX))) {
2835 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2836 (*lockers)[0].id.cookie);
2837 ret = -EBUSY;
2838 goto out;
2839 }
2840
2841out:
2842 kfree(lock_tag);
2843 return ret;
2844}
2845
2846static int find_watcher(struct rbd_device *rbd_dev,
2847 const struct ceph_locker *locker)
2848{
2849 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2850 struct ceph_watch_item *watchers;
2851 u32 num_watchers;
2852 u64 cookie;
2853 int i;
2854 int ret;
2855
2856 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2857 &rbd_dev->header_oloc, &watchers,
2858 &num_watchers);
2859 if (ret)
2860 return ret;
2861
2862 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2863 for (i = 0; i < num_watchers; i++) {
2864 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2865 sizeof(locker->info.addr)) &&
2866 watchers[i].cookie == cookie) {
2867 struct rbd_client_id cid = {
2868 .gid = le64_to_cpu(watchers[i].name.num),
2869 .handle = cookie,
2870 };
2871
2872 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2873 rbd_dev, cid.gid, cid.handle);
2874 rbd_set_owner_cid(rbd_dev, &cid);
2875 ret = 1;
2876 goto out;
2877 }
2878 }
2879
2880 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2881 ret = 0;
2882out:
2883 kfree(watchers);
2884 return ret;
2885}
2886
2887/*
2888 * lock_rwsem must be held for write
2889 */
2890static int rbd_try_lock(struct rbd_device *rbd_dev)
2891{
2892 struct ceph_client *client = rbd_dev->rbd_client->client;
2893 struct ceph_locker *lockers;
2894 u32 num_lockers;
2895 int ret;
2896
2897 for (;;) {
2898 ret = rbd_lock(rbd_dev);
2899 if (ret != -EBUSY)
2900 return ret;
2901
2902 /* determine if the current lock holder is still alive */
2903 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2904 if (ret)
2905 return ret;
2906
2907 if (num_lockers == 0)
2908 goto again;
2909
2910 ret = find_watcher(rbd_dev, lockers);
2911 if (ret) {
2912 if (ret > 0)
2913 ret = 0; /* have to request lock */
2914 goto out;
2915 }
2916
2917 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2918 ENTITY_NAME(lockers[0].id.name));
2919
2920 ret = ceph_monc_blacklist_add(&client->monc,
2921 &lockers[0].info.addr);
2922 if (ret) {
2923 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2924 ENTITY_NAME(lockers[0].id.name), ret);
2925 goto out;
2926 }
2927
2928 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2929 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2930 lockers[0].id.cookie,
2931 &lockers[0].id.name);
2932 if (ret && ret != -ENOENT)
2933 goto out;
2934
2935again:
2936 ceph_free_lockers(lockers, num_lockers);
2937 }
2938
2939out:
2940 ceph_free_lockers(lockers, num_lockers);
2941 return ret;
2942}
2943
2944/*
2945 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2946 */
2947static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2948 int *pret)
2949{
2950 enum rbd_lock_state lock_state;
2951
2952 down_read(&rbd_dev->lock_rwsem);
2953 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
2954 rbd_dev->lock_state);
2955 if (__rbd_is_lock_owner(rbd_dev)) {
2956 lock_state = rbd_dev->lock_state;
2957 up_read(&rbd_dev->lock_rwsem);
2958 return lock_state;
2959 }
2960
2961 up_read(&rbd_dev->lock_rwsem);
2962 down_write(&rbd_dev->lock_rwsem);
2963 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
2964 rbd_dev->lock_state);
2965 if (!__rbd_is_lock_owner(rbd_dev)) {
2966 *pret = rbd_try_lock(rbd_dev);
2967 if (*pret)
2968 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
2969 }
2970
2971 lock_state = rbd_dev->lock_state;
2972 up_write(&rbd_dev->lock_rwsem);
2973 return lock_state;
2974}
2975
2976static void rbd_acquire_lock(struct work_struct *work)
2977{
2978 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
2979 struct rbd_device, lock_dwork);
2980 enum rbd_lock_state lock_state;
37f13252 2981 int ret = 0;
ed95b21a
ID
2982
2983 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2984again:
2985 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
2986 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
2987 if (lock_state == RBD_LOCK_STATE_LOCKED)
2988 wake_requests(rbd_dev, true);
2989 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
2990 rbd_dev, lock_state, ret);
2991 return;
2992 }
2993
2994 ret = rbd_request_lock(rbd_dev);
2995 if (ret == -ETIMEDOUT) {
2996 goto again; /* treat this as a dead client */
e010dd0a
ID
2997 } else if (ret == -EROFS) {
2998 rbd_warn(rbd_dev, "peer will not release lock");
2999 /*
3000 * If this is rbd_add_acquire_lock(), we want to fail
3001 * immediately -- reuse BLACKLISTED flag. Otherwise we
3002 * want to block.
3003 */
3004 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3005 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3006 /* wake "rbd map --exclusive" process */
3007 wake_requests(rbd_dev, false);
3008 }
ed95b21a
ID
3009 } else if (ret < 0) {
3010 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3011 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3012 RBD_RETRY_DELAY);
3013 } else {
3014 /*
3015 * lock owner acked, but resend if we don't see them
3016 * release the lock
3017 */
3018 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3019 rbd_dev);
3020 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3021 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3022 }
3023}
3024
3025/*
3026 * lock_rwsem must be held for write
3027 */
3028static bool rbd_release_lock(struct rbd_device *rbd_dev)
3029{
3030 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3031 rbd_dev->lock_state);
3032 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3033 return false;
3034
3035 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3036 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3037 /*
ed95b21a 3038 * Ensure that all in-flight IO is flushed.
52bb1f9b 3039 *
ed95b21a
ID
3040 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3041 * may be shared with other devices.
52bb1f9b 3042 */
ed95b21a
ID
3043 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3044 up_read(&rbd_dev->lock_rwsem);
3045
3046 down_write(&rbd_dev->lock_rwsem);
3047 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3048 rbd_dev->lock_state);
3049 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3050 return false;
3051
bbead745
ID
3052 rbd_unlock(rbd_dev);
3053 /*
3054 * Give others a chance to grab the lock - we would re-acquire
3055 * almost immediately if we got new IO during ceph_osdc_sync()
3056 * otherwise. We need to ack our own notifications, so this
3057 * lock_dwork will be requeued from rbd_wait_state_locked()
3058 * after wake_requests() in rbd_handle_released_lock().
3059 */
3060 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3061 return true;
3062}
3063
3064static void rbd_release_lock_work(struct work_struct *work)
3065{
3066 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3067 unlock_work);
3068
3069 down_write(&rbd_dev->lock_rwsem);
3070 rbd_release_lock(rbd_dev);
3071 up_write(&rbd_dev->lock_rwsem);
3072}
3073
3074static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3075 void **p)
3076{
3077 struct rbd_client_id cid = { 0 };
3078
3079 if (struct_v >= 2) {
3080 cid.gid = ceph_decode_64(p);
3081 cid.handle = ceph_decode_64(p);
3082 }
3083
3084 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3085 cid.handle);
3086 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3087 down_write(&rbd_dev->lock_rwsem);
3088 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3089 /*
3090 * we already know that the remote client is
3091 * the owner
3092 */
3093 up_write(&rbd_dev->lock_rwsem);
3094 return;
3095 }
3096
3097 rbd_set_owner_cid(rbd_dev, &cid);
3098 downgrade_write(&rbd_dev->lock_rwsem);
3099 } else {
3100 down_read(&rbd_dev->lock_rwsem);
3101 }
3102
3103 if (!__rbd_is_lock_owner(rbd_dev))
3104 wake_requests(rbd_dev, false);
3105 up_read(&rbd_dev->lock_rwsem);
3106}
3107
3108static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3109 void **p)
3110{
3111 struct rbd_client_id cid = { 0 };
3112
3113 if (struct_v >= 2) {
3114 cid.gid = ceph_decode_64(p);
3115 cid.handle = ceph_decode_64(p);
3116 }
3117
3118 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3119 cid.handle);
3120 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3121 down_write(&rbd_dev->lock_rwsem);
3122 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3123 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3124 __func__, rbd_dev, cid.gid, cid.handle,
3125 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3126 up_write(&rbd_dev->lock_rwsem);
3127 return;
3128 }
3129
3130 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3131 downgrade_write(&rbd_dev->lock_rwsem);
3132 } else {
3133 down_read(&rbd_dev->lock_rwsem);
3134 }
3135
3136 if (!__rbd_is_lock_owner(rbd_dev))
3137 wake_requests(rbd_dev, false);
3138 up_read(&rbd_dev->lock_rwsem);
3139}
3140
3b77faa0
ID
3141/*
3142 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3143 * ResponseMessage is needed.
3144 */
3145static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3146 void **p)
ed95b21a
ID
3147{
3148 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3149 struct rbd_client_id cid = { 0 };
3b77faa0 3150 int result = 1;
ed95b21a
ID
3151
3152 if (struct_v >= 2) {
3153 cid.gid = ceph_decode_64(p);
3154 cid.handle = ceph_decode_64(p);
3155 }
3156
3157 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3158 cid.handle);
3159 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3160 return result;
ed95b21a
ID
3161
3162 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3163 if (__rbd_is_lock_owner(rbd_dev)) {
3164 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3165 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3166 goto out_unlock;
3167
3168 /*
3169 * encode ResponseMessage(0) so the peer can detect
3170 * a missing owner
3171 */
3172 result = 0;
3173
3174 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3175 if (!rbd_dev->opts->exclusive) {
3176 dout("%s rbd_dev %p queueing unlock_work\n",
3177 __func__, rbd_dev);
3178 queue_work(rbd_dev->task_wq,
3179 &rbd_dev->unlock_work);
3180 } else {
3181 /* refuse to release the lock */
3182 result = -EROFS;
3183 }
ed95b21a
ID
3184 }
3185 }
3b77faa0
ID
3186
3187out_unlock:
ed95b21a 3188 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3189 return result;
ed95b21a
ID
3190}
3191
3192static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3193 u64 notify_id, u64 cookie, s32 *result)
3194{
3195 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
08a79102
KS
3196 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3197 int buf_size = sizeof(buf);
ed95b21a
ID
3198 int ret;
3199
3200 if (result) {
3201 void *p = buf;
3202
3203 /* encode ResponseMessage */
3204 ceph_start_encoding(&p, 1, 1,
3205 buf_size - CEPH_ENCODING_START_BLK_LEN);
3206 ceph_encode_32(&p, *result);
3207 } else {
3208 buf_size = 0;
3209 }
b8d70035 3210
922dab61
ID
3211 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3212 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3213 buf, buf_size);
52bb1f9b 3214 if (ret)
ed95b21a
ID
3215 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3216}
3217
3218static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3219 u64 cookie)
3220{
3221 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3222 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3223}
3224
3225static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3226 u64 notify_id, u64 cookie, s32 result)
3227{
3228 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3229 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3230}
3231
3232static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3233 u64 notifier_id, void *data, size_t data_len)
3234{
3235 struct rbd_device *rbd_dev = arg;
3236 void *p = data;
3237 void *const end = p + data_len;
d4c2269b 3238 u8 struct_v = 0;
ed95b21a
ID
3239 u32 len;
3240 u32 notify_op;
3241 int ret;
3242
3243 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3244 __func__, rbd_dev, cookie, notify_id, data_len);
3245 if (data_len) {
3246 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3247 &struct_v, &len);
3248 if (ret) {
3249 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3250 ret);
3251 return;
3252 }
3253
3254 notify_op = ceph_decode_32(&p);
3255 } else {
3256 /* legacy notification for header updates */
3257 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3258 len = 0;
3259 }
3260
3261 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3262 switch (notify_op) {
3263 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3264 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3265 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3266 break;
3267 case RBD_NOTIFY_OP_RELEASED_LOCK:
3268 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3269 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3270 break;
3271 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3272 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3273 if (ret <= 0)
ed95b21a 3274 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3275 cookie, ret);
ed95b21a
ID
3276 else
3277 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3278 break;
3279 case RBD_NOTIFY_OP_HEADER_UPDATE:
3280 ret = rbd_dev_refresh(rbd_dev);
3281 if (ret)
3282 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3283
3284 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3285 break;
3286 default:
3287 if (rbd_is_lock_owner(rbd_dev))
3288 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3289 cookie, -EOPNOTSUPP);
3290 else
3291 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3292 break;
3293 }
b8d70035
AE
3294}
3295
99d16943
ID
3296static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3297
922dab61 3298static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3299{
922dab61 3300 struct rbd_device *rbd_dev = arg;
bb040aa0 3301
922dab61 3302 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3303
ed95b21a
ID
3304 down_write(&rbd_dev->lock_rwsem);
3305 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3306 up_write(&rbd_dev->lock_rwsem);
3307
99d16943
ID
3308 mutex_lock(&rbd_dev->watch_mutex);
3309 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3310 __rbd_unregister_watch(rbd_dev);
3311 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3312
99d16943 3313 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3314 }
99d16943 3315 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3316}
3317
9969ebc5 3318/*
99d16943 3319 * watch_mutex must be locked
9969ebc5 3320 */
99d16943 3321static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3322{
3323 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3324 struct ceph_osd_linger_request *handle;
9969ebc5 3325
922dab61 3326 rbd_assert(!rbd_dev->watch_handle);
99d16943 3327 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3328
922dab61
ID
3329 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3330 &rbd_dev->header_oloc, rbd_watch_cb,
3331 rbd_watch_errcb, rbd_dev);
3332 if (IS_ERR(handle))
3333 return PTR_ERR(handle);
8eb87565 3334
922dab61 3335 rbd_dev->watch_handle = handle;
b30a01f2 3336 return 0;
b30a01f2
ID
3337}
3338
99d16943
ID
3339/*
3340 * watch_mutex must be locked
3341 */
3342static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3343{
922dab61
ID
3344 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3345 int ret;
b30a01f2 3346
99d16943
ID
3347 rbd_assert(rbd_dev->watch_handle);
3348 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3349
922dab61
ID
3350 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3351 if (ret)
3352 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3353
922dab61 3354 rbd_dev->watch_handle = NULL;
c525f036
ID
3355}
3356
99d16943
ID
3357static int rbd_register_watch(struct rbd_device *rbd_dev)
3358{
3359 int ret;
3360
3361 mutex_lock(&rbd_dev->watch_mutex);
3362 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3363 ret = __rbd_register_watch(rbd_dev);
3364 if (ret)
3365 goto out;
3366
3367 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3368 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3369
3370out:
3371 mutex_unlock(&rbd_dev->watch_mutex);
3372 return ret;
3373}
3374
3375static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3376{
99d16943
ID
3377 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3378
3379 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3380 cancel_work_sync(&rbd_dev->acquired_lock_work);
3381 cancel_work_sync(&rbd_dev->released_lock_work);
3382 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3383 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3384}
3385
3386static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3387{
ed95b21a 3388 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3389 cancel_tasks_sync(rbd_dev);
3390
3391 mutex_lock(&rbd_dev->watch_mutex);
3392 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3393 __rbd_unregister_watch(rbd_dev);
3394 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3395 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3396
811c6688 3397 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3398}
3399
14bb211d
ID
3400/*
3401 * lock_rwsem must be held for write
3402 */
3403static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3404{
3405 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3406 char cookie[32];
3407 int ret;
3408
3409 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3410
3411 format_lock_cookie(rbd_dev, cookie);
3412 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3413 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3414 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3415 RBD_LOCK_TAG, cookie);
3416 if (ret) {
3417 if (ret != -EOPNOTSUPP)
3418 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3419 ret);
3420
3421 /*
3422 * Lock cookie cannot be updated on older OSDs, so do
3423 * a manual release and queue an acquire.
3424 */
3425 if (rbd_release_lock(rbd_dev))
3426 queue_delayed_work(rbd_dev->task_wq,
3427 &rbd_dev->lock_dwork, 0);
3428 } else {
edd8ca80 3429 __rbd_lock(rbd_dev, cookie);
14bb211d
ID
3430 }
3431}
3432
99d16943
ID
3433static void rbd_reregister_watch(struct work_struct *work)
3434{
3435 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436 struct rbd_device, watch_dwork);
3437 int ret;
3438
3439 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3440
3441 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3442 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3443 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3444 return;
87c0fded 3445 }
99d16943
ID
3446
3447 ret = __rbd_register_watch(rbd_dev);
3448 if (ret) {
3449 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3450 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3451 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3452 wake_requests(rbd_dev, true);
87c0fded 3453 } else {
99d16943
ID
3454 queue_delayed_work(rbd_dev->task_wq,
3455 &rbd_dev->watch_dwork,
3456 RBD_RETRY_DELAY);
87c0fded
ID
3457 }
3458 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3459 return;
99d16943
ID
3460 }
3461
3462 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3463 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3464 mutex_unlock(&rbd_dev->watch_mutex);
3465
14bb211d
ID
3466 down_write(&rbd_dev->lock_rwsem);
3467 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3468 rbd_reacquire_lock(rbd_dev);
3469 up_write(&rbd_dev->lock_rwsem);
3470
99d16943
ID
3471 ret = rbd_dev_refresh(rbd_dev);
3472 if (ret)
f6870cc9 3473 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
99d16943
ID
3474}
3475
36be9a76 3476/*
f40eb349
AE
3477 * Synchronous osd object method call. Returns the number of bytes
3478 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3479 */
3480static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3481 struct ceph_object_id *oid,
3482 struct ceph_object_locator *oloc,
36be9a76 3483 const char *method_name,
4157976b 3484 const void *outbound,
36be9a76 3485 size_t outbound_size,
4157976b 3486 void *inbound,
e2a58ee5 3487 size_t inbound_size)
36be9a76 3488{
ecd4a68a
ID
3489 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3490 struct page *req_page = NULL;
3491 struct page *reply_page;
36be9a76
AE
3492 int ret;
3493
3494 /*
6010a451
AE
3495 * Method calls are ultimately read operations. The result
3496 * should placed into the inbound buffer provided. They
3497 * also supply outbound data--parameters for the object
3498 * method. Currently if this is present it will be a
3499 * snapshot id.
36be9a76 3500 */
ecd4a68a
ID
3501 if (outbound) {
3502 if (outbound_size > PAGE_SIZE)
3503 return -E2BIG;
36be9a76 3504
ecd4a68a
ID
3505 req_page = alloc_page(GFP_KERNEL);
3506 if (!req_page)
3507 return -ENOMEM;
04017e29 3508
ecd4a68a 3509 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3510 }
36be9a76 3511
ecd4a68a
ID
3512 reply_page = alloc_page(GFP_KERNEL);
3513 if (!reply_page) {
3514 if (req_page)
3515 __free_page(req_page);
3516 return -ENOMEM;
3517 }
57385b51 3518
ecd4a68a
ID
3519 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3520 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3521 reply_page, &inbound_size);
3522 if (!ret) {
3523 memcpy(inbound, page_address(reply_page), inbound_size);
3524 ret = inbound_size;
3525 }
36be9a76 3526
ecd4a68a
ID
3527 if (req_page)
3528 __free_page(req_page);
3529 __free_page(reply_page);
36be9a76
AE
3530 return ret;
3531}
3532
ed95b21a
ID
3533/*
3534 * lock_rwsem must be held for read
3535 */
2f18d466 3536static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
ed95b21a
ID
3537{
3538 DEFINE_WAIT(wait);
2f18d466
ID
3539 int ret = 0;
3540
3541 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
3542 return -EBLACKLISTED;
3543
3544 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3545 return 0;
3546
3547 if (!may_acquire) {
3548 rbd_warn(rbd_dev, "exclusive lock required");
3549 return -EROFS;
3550 }
ed95b21a
ID
3551
3552 do {
3553 /*
3554 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3555 * and cancel_delayed_work() in wake_requests().
3556 */
3557 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3558 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3559 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3560 TASK_UNINTERRUPTIBLE);
3561 up_read(&rbd_dev->lock_rwsem);
3562 schedule();
3563 down_read(&rbd_dev->lock_rwsem);
2f18d466
ID
3564 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3565 ret = -EBLACKLISTED;
3566 break;
3567 }
3568 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
87c0fded 3569
ed95b21a 3570 finish_wait(&rbd_dev->lock_waitq, &wait);
2f18d466 3571 return ret;
ed95b21a
ID
3572}
3573
7ad18afa 3574static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 3575{
7ad18afa
CH
3576 struct request *rq = blk_mq_rq_from_pdu(work);
3577 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 3578 struct rbd_img_request *img_request;
4e752f0a 3579 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
3580 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3581 u64 length = blk_rq_bytes(rq);
6d2940c8 3582 enum obj_operation_type op_type;
4e752f0a 3583 u64 mapping_size;
80de1912 3584 bool must_be_locked;
bf0d5f50
AE
3585 int result;
3586
aebf526b
CH
3587 switch (req_op(rq)) {
3588 case REQ_OP_DISCARD:
6ac56951 3589 case REQ_OP_WRITE_ZEROES:
90e98c52 3590 op_type = OBJ_OP_DISCARD;
aebf526b
CH
3591 break;
3592 case REQ_OP_WRITE:
6d2940c8 3593 op_type = OBJ_OP_WRITE;
aebf526b
CH
3594 break;
3595 case REQ_OP_READ:
6d2940c8 3596 op_type = OBJ_OP_READ;
aebf526b
CH
3597 break;
3598 default:
3599 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3600 result = -EIO;
3601 goto err;
3602 }
6d2940c8 3603
bc1ecc65 3604 /* Ignore/skip any zero-length requests */
bf0d5f50 3605
bc1ecc65
ID
3606 if (!length) {
3607 dout("%s: zero-length request\n", __func__);
3608 result = 0;
3609 goto err_rq;
3610 }
bf0d5f50 3611
9568c93e
ID
3612 rbd_assert(op_type == OBJ_OP_READ ||
3613 rbd_dev->spec->snap_id == CEPH_NOSNAP);
4dda41d3 3614
bc1ecc65
ID
3615 /*
3616 * Quit early if the mapped snapshot no longer exists. It's
3617 * still possible the snapshot will have disappeared by the
3618 * time our request arrives at the osd, but there's no sense in
3619 * sending it if we already know.
3620 */
3621 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3622 dout("request for non-existent snapshot");
3623 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3624 result = -ENXIO;
3625 goto err_rq;
3626 }
4dda41d3 3627
bc1ecc65
ID
3628 if (offset && length > U64_MAX - offset + 1) {
3629 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3630 length);
3631 result = -EINVAL;
3632 goto err_rq; /* Shouldn't happen */
3633 }
4dda41d3 3634
7ad18afa
CH
3635 blk_mq_start_request(rq);
3636
4e752f0a
JD
3637 down_read(&rbd_dev->header_rwsem);
3638 mapping_size = rbd_dev->mapping.size;
6d2940c8 3639 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
3640 snapc = rbd_dev->header.snapc;
3641 ceph_get_snap_context(snapc);
3642 }
3643 up_read(&rbd_dev->header_rwsem);
3644
3645 if (offset + length > mapping_size) {
bc1ecc65 3646 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 3647 length, mapping_size);
bc1ecc65
ID
3648 result = -EIO;
3649 goto err_rq;
3650 }
bf0d5f50 3651
f9bebd58
ID
3652 must_be_locked =
3653 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3654 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
3655 if (must_be_locked) {
3656 down_read(&rbd_dev->lock_rwsem);
2f18d466
ID
3657 result = rbd_wait_state_locked(rbd_dev,
3658 !rbd_dev->opts->exclusive);
3659 if (result)
87c0fded 3660 goto err_unlock;
ed95b21a
ID
3661 }
3662
dfd9875f 3663 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
bc1ecc65
ID
3664 if (!img_request) {
3665 result = -ENOMEM;
ed95b21a 3666 goto err_unlock;
bc1ecc65
ID
3667 }
3668 img_request->rq = rq;
70b16db8 3669 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 3670
90e98c52 3671 if (op_type == OBJ_OP_DISCARD)
5a237819 3672 result = rbd_img_fill_nodata(img_request, offset, length);
90e98c52 3673 else
5a237819
ID
3674 result = rbd_img_fill_from_bio(img_request, offset, length,
3675 rq->bio);
bc1ecc65
ID
3676 if (result)
3677 goto err_img_request;
bf0d5f50 3678
efbd1a11 3679 rbd_img_request_submit(img_request);
ed95b21a
ID
3680 if (must_be_locked)
3681 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 3682 return;
bf0d5f50 3683
bc1ecc65
ID
3684err_img_request:
3685 rbd_img_request_put(img_request);
ed95b21a
ID
3686err_unlock:
3687 if (must_be_locked)
3688 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
3689err_rq:
3690 if (result)
3691 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 3692 obj_op_name(op_type), length, offset, result);
e96a650a 3693 ceph_put_snap_context(snapc);
7ad18afa 3694err:
2a842aca 3695 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 3696}
bf0d5f50 3697
fc17b653 3698static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 3699 const struct blk_mq_queue_data *bd)
bc1ecc65 3700{
7ad18afa
CH
3701 struct request *rq = bd->rq;
3702 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 3703
7ad18afa 3704 queue_work(rbd_wq, work);
fc17b653 3705 return BLK_STS_OK;
bf0d5f50
AE
3706}
3707
602adf40
YS
3708static void rbd_free_disk(struct rbd_device *rbd_dev)
3709{
5769ed0c
ID
3710 blk_cleanup_queue(rbd_dev->disk->queue);
3711 blk_mq_free_tag_set(&rbd_dev->tag_set);
3712 put_disk(rbd_dev->disk);
a0cab924 3713 rbd_dev->disk = NULL;
602adf40
YS
3714}
3715
788e2df3 3716static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
3717 struct ceph_object_id *oid,
3718 struct ceph_object_locator *oloc,
3719 void *buf, int buf_len)
788e2df3
AE
3720
3721{
fe5478e0
ID
3722 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3723 struct ceph_osd_request *req;
3724 struct page **pages;
3725 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
3726 int ret;
3727
fe5478e0
ID
3728 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3729 if (!req)
3730 return -ENOMEM;
788e2df3 3731
fe5478e0
ID
3732 ceph_oid_copy(&req->r_base_oid, oid);
3733 ceph_oloc_copy(&req->r_base_oloc, oloc);
3734 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 3735
fe5478e0 3736 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 3737 if (ret)
fe5478e0 3738 goto out_req;
788e2df3 3739
fe5478e0
ID
3740 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3741 if (IS_ERR(pages)) {
3742 ret = PTR_ERR(pages);
3743 goto out_req;
3744 }
1ceae7ef 3745
fe5478e0
ID
3746 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3747 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3748 true);
3749
3750 ceph_osdc_start_request(osdc, req, false);
3751 ret = ceph_osdc_wait_request(osdc, req);
3752 if (ret >= 0)
3753 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 3754
fe5478e0
ID
3755out_req:
3756 ceph_osdc_put_request(req);
788e2df3
AE
3757 return ret;
3758}
3759
602adf40 3760/*
662518b1
AE
3761 * Read the complete header for the given rbd device. On successful
3762 * return, the rbd_dev->header field will contain up-to-date
3763 * information about the image.
602adf40 3764 */
99a41ebc 3765static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3766{
4156d998 3767 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3768 u32 snap_count = 0;
4156d998
AE
3769 u64 names_size = 0;
3770 u32 want_count;
3771 int ret;
602adf40 3772
00f1f36f 3773 /*
4156d998
AE
3774 * The complete header will include an array of its 64-bit
3775 * snapshot ids, followed by the names of those snapshots as
3776 * a contiguous block of NUL-terminated strings. Note that
3777 * the number of snapshots could change by the time we read
3778 * it in, in which case we re-read it.
00f1f36f 3779 */
4156d998
AE
3780 do {
3781 size_t size;
3782
3783 kfree(ondisk);
3784
3785 size = sizeof (*ondisk);
3786 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3787 size += names_size;
3788 ondisk = kmalloc(size, GFP_KERNEL);
3789 if (!ondisk)
662518b1 3790 return -ENOMEM;
4156d998 3791
fe5478e0
ID
3792 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3793 &rbd_dev->header_oloc, ondisk, size);
4156d998 3794 if (ret < 0)
662518b1 3795 goto out;
c0cd10db 3796 if ((size_t)ret < size) {
4156d998 3797 ret = -ENXIO;
06ecc6cb
AE
3798 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3799 size, ret);
662518b1 3800 goto out;
4156d998
AE
3801 }
3802 if (!rbd_dev_ondisk_valid(ondisk)) {
3803 ret = -ENXIO;
06ecc6cb 3804 rbd_warn(rbd_dev, "invalid header");
662518b1 3805 goto out;
81e759fb 3806 }
602adf40 3807
4156d998
AE
3808 names_size = le64_to_cpu(ondisk->snap_names_len);
3809 want_count = snap_count;
3810 snap_count = le32_to_cpu(ondisk->snap_count);
3811 } while (snap_count != want_count);
00f1f36f 3812
662518b1
AE
3813 ret = rbd_header_from_disk(rbd_dev, ondisk);
3814out:
4156d998
AE
3815 kfree(ondisk);
3816
3817 return ret;
602adf40
YS
3818}
3819
15228ede
AE
3820/*
3821 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3822 * has disappeared from the (just updated) snapshot context.
3823 */
3824static void rbd_exists_validate(struct rbd_device *rbd_dev)
3825{
3826 u64 snap_id;
3827
3828 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3829 return;
3830
3831 snap_id = rbd_dev->spec->snap_id;
3832 if (snap_id == CEPH_NOSNAP)
3833 return;
3834
3835 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3836 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3837}
3838
9875201e
JD
3839static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3840{
3841 sector_t size;
9875201e
JD
3842
3843 /*
811c6688
ID
3844 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3845 * try to update its size. If REMOVING is set, updating size
3846 * is just useless work since the device can't be opened.
9875201e 3847 */
811c6688
ID
3848 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3849 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
3850 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3851 dout("setting size to %llu sectors", (unsigned long long)size);
3852 set_capacity(rbd_dev->disk, size);
3853 revalidate_disk(rbd_dev->disk);
3854 }
3855}
3856
cc4a38bd 3857static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3858{
e627db08 3859 u64 mapping_size;
1fe5e993
AE
3860 int ret;
3861
cfbf6377 3862 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3863 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
3864
3865 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 3866 if (ret)
73e39e4d 3867 goto out;
15228ede 3868
e8f59b59
ID
3869 /*
3870 * If there is a parent, see if it has disappeared due to the
3871 * mapped image getting flattened.
3872 */
3873 if (rbd_dev->parent) {
3874 ret = rbd_dev_v2_parent_info(rbd_dev);
3875 if (ret)
73e39e4d 3876 goto out;
e8f59b59
ID
3877 }
3878
5ff1108c 3879 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 3880 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
3881 } else {
3882 /* validate mapped snapshot's EXISTS flag */
3883 rbd_exists_validate(rbd_dev);
3884 }
15228ede 3885
73e39e4d 3886out:
cfbf6377 3887 up_write(&rbd_dev->header_rwsem);
73e39e4d 3888 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 3889 rbd_dev_update_size(rbd_dev);
1fe5e993 3890
73e39e4d 3891 return ret;
1fe5e993
AE
3892}
3893
d6296d39
CH
3894static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3895 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
3896{
3897 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3898
3899 INIT_WORK(work, rbd_queue_workfn);
3900 return 0;
3901}
3902
f363b089 3903static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 3904 .queue_rq = rbd_queue_rq,
7ad18afa
CH
3905 .init_request = rbd_init_request,
3906};
3907
602adf40
YS
3908static int rbd_init_disk(struct rbd_device *rbd_dev)
3909{
3910 struct gendisk *disk;
3911 struct request_queue *q;
593a9e7b 3912 u64 segment_size;
7ad18afa 3913 int err;
602adf40 3914
602adf40 3915 /* create gendisk info */
7e513d43
ID
3916 disk = alloc_disk(single_major ?
3917 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3918 RBD_MINORS_PER_MAJOR);
602adf40 3919 if (!disk)
1fcdb8aa 3920 return -ENOMEM;
602adf40 3921
f0f8cef5 3922 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3923 rbd_dev->dev_id);
602adf40 3924 disk->major = rbd_dev->major;
dd82fff1 3925 disk->first_minor = rbd_dev->minor;
7e513d43
ID
3926 if (single_major)
3927 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
3928 disk->fops = &rbd_bd_ops;
3929 disk->private_data = rbd_dev;
3930
7ad18afa
CH
3931 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3932 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 3933 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 3934 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 3935 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
3936 rbd_dev->tag_set.nr_hw_queues = 1;
3937 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3938
3939 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3940 if (err)
602adf40 3941 goto out_disk;
029bcbd8 3942
7ad18afa
CH
3943 q = blk_mq_init_queue(&rbd_dev->tag_set);
3944 if (IS_ERR(q)) {
3945 err = PTR_ERR(q);
3946 goto out_tag_set;
3947 }
3948
8b904b5b 3949 blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
d8a2c89c 3950 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 3951
029bcbd8 3952 /* set io sizes to object size */
593a9e7b
AE
3953 segment_size = rbd_obj_bytes(&rbd_dev->header);
3954 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 3955 q->limits.max_sectors = queue_max_hw_sectors(q);
21acdf45 3956 blk_queue_max_segments(q, USHRT_MAX);
24f1df60 3957 blk_queue_max_segment_size(q, UINT_MAX);
593a9e7b
AE
3958 blk_queue_io_min(q, segment_size);
3959 blk_queue_io_opt(q, segment_size);
029bcbd8 3960
90e98c52 3961 /* enable the discard support */
8b904b5b 3962 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
90e98c52 3963 q->limits.discard_granularity = segment_size;
2bb4cd5c 3964 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
6ac56951 3965 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
90e98c52 3966
bae818ee 3967 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 3968 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 3969
5769ed0c
ID
3970 /*
3971 * disk_release() expects a queue ref from add_disk() and will
3972 * put it. Hold an extra ref until add_disk() is called.
3973 */
3974 WARN_ON(!blk_get_queue(q));
602adf40 3975 disk->queue = q;
602adf40
YS
3976 q->queuedata = rbd_dev;
3977
3978 rbd_dev->disk = disk;
602adf40 3979
602adf40 3980 return 0;
7ad18afa
CH
3981out_tag_set:
3982 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
3983out_disk:
3984 put_disk(disk);
7ad18afa 3985 return err;
602adf40
YS
3986}
3987
dfc5606d
YS
3988/*
3989 sysfs
3990*/
3991
593a9e7b
AE
3992static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3993{
3994 return container_of(dev, struct rbd_device, dev);
3995}
3996
dfc5606d
YS
3997static ssize_t rbd_size_show(struct device *dev,
3998 struct device_attribute *attr, char *buf)
3999{
593a9e7b 4000 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4001
fc71d833
AE
4002 return sprintf(buf, "%llu\n",
4003 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4004}
4005
34b13184
AE
4006/*
4007 * Note this shows the features for whatever's mapped, which is not
4008 * necessarily the base image.
4009 */
4010static ssize_t rbd_features_show(struct device *dev,
4011 struct device_attribute *attr, char *buf)
4012{
4013 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4014
4015 return sprintf(buf, "0x%016llx\n",
fc71d833 4016 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4017}
4018
dfc5606d
YS
4019static ssize_t rbd_major_show(struct device *dev,
4020 struct device_attribute *attr, char *buf)
4021{
593a9e7b 4022 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4023
fc71d833
AE
4024 if (rbd_dev->major)
4025 return sprintf(buf, "%d\n", rbd_dev->major);
4026
4027 return sprintf(buf, "(none)\n");
dd82fff1
ID
4028}
4029
4030static ssize_t rbd_minor_show(struct device *dev,
4031 struct device_attribute *attr, char *buf)
4032{
4033 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4034
dd82fff1 4035 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4036}
4037
005a07bf
ID
4038static ssize_t rbd_client_addr_show(struct device *dev,
4039 struct device_attribute *attr, char *buf)
4040{
4041 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4042 struct ceph_entity_addr *client_addr =
4043 ceph_client_addr(rbd_dev->rbd_client->client);
4044
4045 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4046 le32_to_cpu(client_addr->nonce));
4047}
4048
dfc5606d
YS
4049static ssize_t rbd_client_id_show(struct device *dev,
4050 struct device_attribute *attr, char *buf)
602adf40 4051{
593a9e7b 4052 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4053
1dbb4399 4054 return sprintf(buf, "client%lld\n",
033268a5 4055 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4056}
4057
267fb90b
MC
4058static ssize_t rbd_cluster_fsid_show(struct device *dev,
4059 struct device_attribute *attr, char *buf)
4060{
4061 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4062
4063 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4064}
4065
0d6d1e9c
MC
4066static ssize_t rbd_config_info_show(struct device *dev,
4067 struct device_attribute *attr, char *buf)
4068{
4069 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4070
4071 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4072}
4073
dfc5606d
YS
4074static ssize_t rbd_pool_show(struct device *dev,
4075 struct device_attribute *attr, char *buf)
602adf40 4076{
593a9e7b 4077 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4078
0d7dbfce 4079 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4080}
4081
9bb2f334
AE
4082static ssize_t rbd_pool_id_show(struct device *dev,
4083 struct device_attribute *attr, char *buf)
4084{
4085 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4086
0d7dbfce 4087 return sprintf(buf, "%llu\n",
fc71d833 4088 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4089}
4090
dfc5606d
YS
4091static ssize_t rbd_name_show(struct device *dev,
4092 struct device_attribute *attr, char *buf)
4093{
593a9e7b 4094 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4095
a92ffdf8
AE
4096 if (rbd_dev->spec->image_name)
4097 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4098
4099 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4100}
4101
589d30e0
AE
4102static ssize_t rbd_image_id_show(struct device *dev,
4103 struct device_attribute *attr, char *buf)
4104{
4105 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4106
0d7dbfce 4107 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4108}
4109
34b13184
AE
4110/*
4111 * Shows the name of the currently-mapped snapshot (or
4112 * RBD_SNAP_HEAD_NAME for the base image).
4113 */
dfc5606d
YS
4114static ssize_t rbd_snap_show(struct device *dev,
4115 struct device_attribute *attr,
4116 char *buf)
4117{
593a9e7b 4118 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4119
0d7dbfce 4120 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4121}
4122
92a58671
MC
4123static ssize_t rbd_snap_id_show(struct device *dev,
4124 struct device_attribute *attr, char *buf)
4125{
4126 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4127
4128 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4129}
4130
86b00e0d 4131/*
ff96128f
ID
4132 * For a v2 image, shows the chain of parent images, separated by empty
4133 * lines. For v1 images or if there is no parent, shows "(no parent
4134 * image)".
86b00e0d
AE
4135 */
4136static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4137 struct device_attribute *attr,
4138 char *buf)
86b00e0d
AE
4139{
4140 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4141 ssize_t count = 0;
86b00e0d 4142
ff96128f 4143 if (!rbd_dev->parent)
86b00e0d
AE
4144 return sprintf(buf, "(no parent image)\n");
4145
ff96128f
ID
4146 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4147 struct rbd_spec *spec = rbd_dev->parent_spec;
4148
4149 count += sprintf(&buf[count], "%s"
4150 "pool_id %llu\npool_name %s\n"
4151 "image_id %s\nimage_name %s\n"
4152 "snap_id %llu\nsnap_name %s\n"
4153 "overlap %llu\n",
4154 !count ? "" : "\n", /* first? */
4155 spec->pool_id, spec->pool_name,
4156 spec->image_id, spec->image_name ?: "(unknown)",
4157 spec->snap_id, spec->snap_name,
4158 rbd_dev->parent_overlap);
4159 }
4160
4161 return count;
86b00e0d
AE
4162}
4163
dfc5606d
YS
4164static ssize_t rbd_image_refresh(struct device *dev,
4165 struct device_attribute *attr,
4166 const char *buf,
4167 size_t size)
4168{
593a9e7b 4169 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4170 int ret;
602adf40 4171
cc4a38bd 4172 ret = rbd_dev_refresh(rbd_dev);
e627db08 4173 if (ret)
52bb1f9b 4174 return ret;
b813623a 4175
52bb1f9b 4176 return size;
dfc5606d 4177}
602adf40 4178
dfc5606d 4179static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4180static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4181static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4182static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4183static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4184static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4185static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4186static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4187static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4188static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4189static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4190static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4191static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4192static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4193static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4194static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4195
4196static struct attribute *rbd_attrs[] = {
4197 &dev_attr_size.attr,
34b13184 4198 &dev_attr_features.attr,
dfc5606d 4199 &dev_attr_major.attr,
dd82fff1 4200 &dev_attr_minor.attr,
005a07bf 4201 &dev_attr_client_addr.attr,
dfc5606d 4202 &dev_attr_client_id.attr,
267fb90b 4203 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4204 &dev_attr_config_info.attr,
dfc5606d 4205 &dev_attr_pool.attr,
9bb2f334 4206 &dev_attr_pool_id.attr,
dfc5606d 4207 &dev_attr_name.attr,
589d30e0 4208 &dev_attr_image_id.attr,
dfc5606d 4209 &dev_attr_current_snap.attr,
92a58671 4210 &dev_attr_snap_id.attr,
86b00e0d 4211 &dev_attr_parent.attr,
dfc5606d 4212 &dev_attr_refresh.attr,
dfc5606d
YS
4213 NULL
4214};
4215
4216static struct attribute_group rbd_attr_group = {
4217 .attrs = rbd_attrs,
4218};
4219
4220static const struct attribute_group *rbd_attr_groups[] = {
4221 &rbd_attr_group,
4222 NULL
4223};
4224
6cac4695 4225static void rbd_dev_release(struct device *dev);
dfc5606d 4226
b9942bc9 4227static const struct device_type rbd_device_type = {
dfc5606d
YS
4228 .name = "rbd",
4229 .groups = rbd_attr_groups,
6cac4695 4230 .release = rbd_dev_release,
dfc5606d
YS
4231};
4232
8b8fb99c
AE
4233static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4234{
4235 kref_get(&spec->kref);
4236
4237 return spec;
4238}
4239
4240static void rbd_spec_free(struct kref *kref);
4241static void rbd_spec_put(struct rbd_spec *spec)
4242{
4243 if (spec)
4244 kref_put(&spec->kref, rbd_spec_free);
4245}
4246
4247static struct rbd_spec *rbd_spec_alloc(void)
4248{
4249 struct rbd_spec *spec;
4250
4251 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4252 if (!spec)
4253 return NULL;
04077599
ID
4254
4255 spec->pool_id = CEPH_NOPOOL;
4256 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4257 kref_init(&spec->kref);
4258
8b8fb99c
AE
4259 return spec;
4260}
4261
4262static void rbd_spec_free(struct kref *kref)
4263{
4264 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4265
4266 kfree(spec->pool_name);
4267 kfree(spec->image_id);
4268 kfree(spec->image_name);
4269 kfree(spec->snap_name);
4270 kfree(spec);
4271}
4272
1643dfa4 4273static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4274{
99d16943 4275 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4276 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4277
c41d13a3 4278 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4279 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4280 kfree(rbd_dev->config_info);
c41d13a3 4281
dd5ac32d
ID
4282 rbd_put_client(rbd_dev->rbd_client);
4283 rbd_spec_put(rbd_dev->spec);
4284 kfree(rbd_dev->opts);
4285 kfree(rbd_dev);
1643dfa4
ID
4286}
4287
4288static void rbd_dev_release(struct device *dev)
4289{
4290 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4291 bool need_put = !!rbd_dev->opts;
4292
4293 if (need_put) {
4294 destroy_workqueue(rbd_dev->task_wq);
4295 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4296 }
4297
4298 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4299
4300 /*
4301 * This is racy, but way better than putting module outside of
4302 * the release callback. The race window is pretty small, so
4303 * doing something similar to dm (dm-builtin.c) is overkill.
4304 */
4305 if (need_put)
4306 module_put(THIS_MODULE);
4307}
4308
1643dfa4
ID
4309static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4310 struct rbd_spec *spec)
c53d5893
AE
4311{
4312 struct rbd_device *rbd_dev;
4313
1643dfa4 4314 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4315 if (!rbd_dev)
4316 return NULL;
4317
4318 spin_lock_init(&rbd_dev->lock);
4319 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4320 init_rwsem(&rbd_dev->header_rwsem);
4321
7e97332e 4322 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4323 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4324 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4325
99d16943
ID
4326 mutex_init(&rbd_dev->watch_mutex);
4327 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4328 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4329
ed95b21a
ID
4330 init_rwsem(&rbd_dev->lock_rwsem);
4331 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4332 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4333 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4334 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4335 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4336 init_waitqueue_head(&rbd_dev->lock_waitq);
4337
dd5ac32d
ID
4338 rbd_dev->dev.bus = &rbd_bus_type;
4339 rbd_dev->dev.type = &rbd_device_type;
4340 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4341 device_initialize(&rbd_dev->dev);
4342
c53d5893 4343 rbd_dev->rbd_client = rbdc;
d147543d 4344 rbd_dev->spec = spec;
0903e875 4345
1643dfa4
ID
4346 return rbd_dev;
4347}
4348
4349/*
4350 * Create a mapping rbd_dev.
4351 */
4352static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4353 struct rbd_spec *spec,
4354 struct rbd_options *opts)
4355{
4356 struct rbd_device *rbd_dev;
4357
4358 rbd_dev = __rbd_dev_create(rbdc, spec);
4359 if (!rbd_dev)
4360 return NULL;
4361
4362 rbd_dev->opts = opts;
4363
4364 /* get an id and fill in device name */
4365 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4366 minor_to_rbd_dev_id(1 << MINORBITS),
4367 GFP_KERNEL);
4368 if (rbd_dev->dev_id < 0)
4369 goto fail_rbd_dev;
4370
4371 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4372 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4373 rbd_dev->name);
4374 if (!rbd_dev->task_wq)
4375 goto fail_dev_id;
dd5ac32d 4376
1643dfa4
ID
4377 /* we have a ref from do_rbd_add() */
4378 __module_get(THIS_MODULE);
dd5ac32d 4379
1643dfa4 4380 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4381 return rbd_dev;
1643dfa4
ID
4382
4383fail_dev_id:
4384 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4385fail_rbd_dev:
4386 rbd_dev_free(rbd_dev);
4387 return NULL;
c53d5893
AE
4388}
4389
4390static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4391{
dd5ac32d
ID
4392 if (rbd_dev)
4393 put_device(&rbd_dev->dev);
c53d5893
AE
4394}
4395
9d475de5
AE
4396/*
4397 * Get the size and object order for an image snapshot, or if
4398 * snap_id is CEPH_NOSNAP, gets this information for the base
4399 * image.
4400 */
4401static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4402 u8 *order, u64 *snap_size)
4403{
4404 __le64 snapid = cpu_to_le64(snap_id);
4405 int ret;
4406 struct {
4407 u8 order;
4408 __le64 size;
4409 } __attribute__ ((packed)) size_buf = { 0 };
4410
ecd4a68a
ID
4411 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4412 &rbd_dev->header_oloc, "get_size",
4413 &snapid, sizeof(snapid),
4414 &size_buf, sizeof(size_buf));
36be9a76 4415 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4416 if (ret < 0)
4417 return ret;
57385b51
AE
4418 if (ret < sizeof (size_buf))
4419 return -ERANGE;
9d475de5 4420
c3545579 4421 if (order) {
c86f86e9 4422 *order = size_buf.order;
c3545579
JD
4423 dout(" order %u", (unsigned int)*order);
4424 }
9d475de5
AE
4425 *snap_size = le64_to_cpu(size_buf.size);
4426
c3545579
JD
4427 dout(" snap_id 0x%016llx snap_size = %llu\n",
4428 (unsigned long long)snap_id,
57385b51 4429 (unsigned long long)*snap_size);
9d475de5
AE
4430
4431 return 0;
4432}
4433
4434static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4435{
4436 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4437 &rbd_dev->header.obj_order,
4438 &rbd_dev->header.image_size);
4439}
4440
1e130199
AE
4441static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4442{
4443 void *reply_buf;
4444 int ret;
4445 void *p;
4446
4447 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4448 if (!reply_buf)
4449 return -ENOMEM;
4450
ecd4a68a
ID
4451 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4452 &rbd_dev->header_oloc, "get_object_prefix",
4453 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4454 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4455 if (ret < 0)
4456 goto out;
4457
4458 p = reply_buf;
4459 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4460 p + ret, NULL, GFP_NOIO);
4461 ret = 0;
1e130199
AE
4462
4463 if (IS_ERR(rbd_dev->header.object_prefix)) {
4464 ret = PTR_ERR(rbd_dev->header.object_prefix);
4465 rbd_dev->header.object_prefix = NULL;
4466 } else {
4467 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4468 }
1e130199
AE
4469out:
4470 kfree(reply_buf);
4471
4472 return ret;
4473}
4474
b1b5402a
AE
4475static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4476 u64 *snap_features)
4477{
4478 __le64 snapid = cpu_to_le64(snap_id);
4479 struct {
4480 __le64 features;
4481 __le64 incompat;
4157976b 4482 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4483 u64 unsup;
b1b5402a
AE
4484 int ret;
4485
ecd4a68a
ID
4486 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4487 &rbd_dev->header_oloc, "get_features",
4488 &snapid, sizeof(snapid),
4489 &features_buf, sizeof(features_buf));
36be9a76 4490 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4491 if (ret < 0)
4492 return ret;
57385b51
AE
4493 if (ret < sizeof (features_buf))
4494 return -ERANGE;
d889140c 4495
d3767f0f
ID
4496 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4497 if (unsup) {
4498 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4499 unsup);
b8f5c6ed 4500 return -ENXIO;
d3767f0f 4501 }
d889140c 4502
b1b5402a
AE
4503 *snap_features = le64_to_cpu(features_buf.features);
4504
4505 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4506 (unsigned long long)snap_id,
4507 (unsigned long long)*snap_features,
4508 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4509
4510 return 0;
4511}
4512
4513static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4514{
4515 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4516 &rbd_dev->header.features);
4517}
4518
86b00e0d
AE
4519static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4520{
4521 struct rbd_spec *parent_spec;
4522 size_t size;
4523 void *reply_buf = NULL;
4524 __le64 snapid;
4525 void *p;
4526 void *end;
642a2537 4527 u64 pool_id;
86b00e0d 4528 char *image_id;
3b5cf2a2 4529 u64 snap_id;
86b00e0d 4530 u64 overlap;
86b00e0d
AE
4531 int ret;
4532
4533 parent_spec = rbd_spec_alloc();
4534 if (!parent_spec)
4535 return -ENOMEM;
4536
4537 size = sizeof (__le64) + /* pool_id */
4538 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4539 sizeof (__le64) + /* snap_id */
4540 sizeof (__le64); /* overlap */
4541 reply_buf = kmalloc(size, GFP_KERNEL);
4542 if (!reply_buf) {
4543 ret = -ENOMEM;
4544 goto out_err;
4545 }
4546
4d9b67cd 4547 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
4548 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4549 &rbd_dev->header_oloc, "get_parent",
4550 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4551 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
4552 if (ret < 0)
4553 goto out_err;
4554
86b00e0d 4555 p = reply_buf;
57385b51
AE
4556 end = reply_buf + ret;
4557 ret = -ERANGE;
642a2537 4558 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
4559 if (pool_id == CEPH_NOPOOL) {
4560 /*
4561 * Either the parent never existed, or we have
4562 * record of it but the image got flattened so it no
4563 * longer has a parent. When the parent of a
4564 * layered image disappears we immediately set the
4565 * overlap to 0. The effect of this is that all new
4566 * requests will be treated as if the image had no
4567 * parent.
4568 */
4569 if (rbd_dev->parent_overlap) {
4570 rbd_dev->parent_overlap = 0;
392a9dad
AE
4571 rbd_dev_parent_put(rbd_dev);
4572 pr_info("%s: clone image has been flattened\n",
4573 rbd_dev->disk->disk_name);
4574 }
4575
86b00e0d 4576 goto out; /* No parent? No problem. */
392a9dad 4577 }
86b00e0d 4578
0903e875
AE
4579 /* The ceph file layout needs to fit pool id in 32 bits */
4580
4581 ret = -EIO;
642a2537 4582 if (pool_id > (u64)U32_MAX) {
9584d508 4583 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 4584 (unsigned long long)pool_id, U32_MAX);
57385b51 4585 goto out_err;
c0cd10db 4586 }
0903e875 4587
979ed480 4588 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
4589 if (IS_ERR(image_id)) {
4590 ret = PTR_ERR(image_id);
4591 goto out_err;
4592 }
3b5cf2a2 4593 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
4594 ceph_decode_64_safe(&p, end, overlap, out_err);
4595
3b5cf2a2
AE
4596 /*
4597 * The parent won't change (except when the clone is
4598 * flattened, already handled that). So we only need to
4599 * record the parent spec we have not already done so.
4600 */
4601 if (!rbd_dev->parent_spec) {
4602 parent_spec->pool_id = pool_id;
4603 parent_spec->image_id = image_id;
4604 parent_spec->snap_id = snap_id;
70cf49cf
AE
4605 rbd_dev->parent_spec = parent_spec;
4606 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
4607 } else {
4608 kfree(image_id);
3b5cf2a2
AE
4609 }
4610
4611 /*
cf32bd9c
ID
4612 * We always update the parent overlap. If it's zero we issue
4613 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 4614 */
3b5cf2a2 4615 if (!overlap) {
3b5cf2a2 4616 if (parent_spec) {
cf32bd9c
ID
4617 /* refresh, careful to warn just once */
4618 if (rbd_dev->parent_overlap)
4619 rbd_warn(rbd_dev,
4620 "clone now standalone (overlap became 0)");
3b5cf2a2 4621 } else {
cf32bd9c
ID
4622 /* initial probe */
4623 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 4624 }
70cf49cf 4625 }
cf32bd9c
ID
4626 rbd_dev->parent_overlap = overlap;
4627
86b00e0d
AE
4628out:
4629 ret = 0;
4630out_err:
4631 kfree(reply_buf);
4632 rbd_spec_put(parent_spec);
4633
4634 return ret;
4635}
4636
cc070d59
AE
4637static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4638{
4639 struct {
4640 __le64 stripe_unit;
4641 __le64 stripe_count;
4642 } __attribute__ ((packed)) striping_info_buf = { 0 };
4643 size_t size = sizeof (striping_info_buf);
4644 void *p;
cc070d59
AE
4645 int ret;
4646
ecd4a68a
ID
4647 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4648 &rbd_dev->header_oloc, "get_stripe_unit_count",
4649 NULL, 0, &striping_info_buf, size);
cc070d59
AE
4650 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4651 if (ret < 0)
4652 return ret;
4653 if (ret < size)
4654 return -ERANGE;
4655
cc070d59 4656 p = &striping_info_buf;
b1331852
ID
4657 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4658 rbd_dev->header.stripe_count = ceph_decode_64(&p);
cc070d59
AE
4659 return 0;
4660}
4661
7e97332e
ID
4662static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4663{
4664 __le64 data_pool_id;
4665 int ret;
4666
4667 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4668 &rbd_dev->header_oloc, "get_data_pool",
4669 NULL, 0, &data_pool_id, sizeof(data_pool_id));
4670 if (ret < 0)
4671 return ret;
4672 if (ret < sizeof(data_pool_id))
4673 return -EBADMSG;
4674
4675 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4676 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4677 return 0;
4678}
4679
9e15b77d
AE
4680static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4681{
ecd4a68a 4682 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
4683 size_t image_id_size;
4684 char *image_id;
4685 void *p;
4686 void *end;
4687 size_t size;
4688 void *reply_buf = NULL;
4689 size_t len = 0;
4690 char *image_name = NULL;
4691 int ret;
4692
4693 rbd_assert(!rbd_dev->spec->image_name);
4694
69e7a02f
AE
4695 len = strlen(rbd_dev->spec->image_id);
4696 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4697 image_id = kmalloc(image_id_size, GFP_KERNEL);
4698 if (!image_id)
4699 return NULL;
4700
4701 p = image_id;
4157976b 4702 end = image_id + image_id_size;
57385b51 4703 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4704
4705 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4706 reply_buf = kmalloc(size, GFP_KERNEL);
4707 if (!reply_buf)
4708 goto out;
4709
ecd4a68a
ID
4710 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4711 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4712 "dir_get_name", image_id, image_id_size,
4713 reply_buf, size);
9e15b77d
AE
4714 if (ret < 0)
4715 goto out;
4716 p = reply_buf;
f40eb349
AE
4717 end = reply_buf + ret;
4718
9e15b77d
AE
4719 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4720 if (IS_ERR(image_name))
4721 image_name = NULL;
4722 else
4723 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4724out:
4725 kfree(reply_buf);
4726 kfree(image_id);
4727
4728 return image_name;
4729}
4730
2ad3d716
AE
4731static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4732{
4733 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4734 const char *snap_name;
4735 u32 which = 0;
4736
4737 /* Skip over names until we find the one we are looking for */
4738
4739 snap_name = rbd_dev->header.snap_names;
4740 while (which < snapc->num_snaps) {
4741 if (!strcmp(name, snap_name))
4742 return snapc->snaps[which];
4743 snap_name += strlen(snap_name) + 1;
4744 which++;
4745 }
4746 return CEPH_NOSNAP;
4747}
4748
4749static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4750{
4751 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4752 u32 which;
4753 bool found = false;
4754 u64 snap_id;
4755
4756 for (which = 0; !found && which < snapc->num_snaps; which++) {
4757 const char *snap_name;
4758
4759 snap_id = snapc->snaps[which];
4760 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4761 if (IS_ERR(snap_name)) {
4762 /* ignore no-longer existing snapshots */
4763 if (PTR_ERR(snap_name) == -ENOENT)
4764 continue;
4765 else
4766 break;
4767 }
2ad3d716
AE
4768 found = !strcmp(name, snap_name);
4769 kfree(snap_name);
4770 }
4771 return found ? snap_id : CEPH_NOSNAP;
4772}
4773
4774/*
4775 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4776 * no snapshot by that name is found, or if an error occurs.
4777 */
4778static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4779{
4780 if (rbd_dev->image_format == 1)
4781 return rbd_v1_snap_id_by_name(rbd_dev, name);
4782
4783 return rbd_v2_snap_id_by_name(rbd_dev, name);
4784}
4785
9e15b77d 4786/*
04077599
ID
4787 * An image being mapped will have everything but the snap id.
4788 */
4789static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4790{
4791 struct rbd_spec *spec = rbd_dev->spec;
4792
4793 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4794 rbd_assert(spec->image_id && spec->image_name);
4795 rbd_assert(spec->snap_name);
4796
4797 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4798 u64 snap_id;
4799
4800 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4801 if (snap_id == CEPH_NOSNAP)
4802 return -ENOENT;
4803
4804 spec->snap_id = snap_id;
4805 } else {
4806 spec->snap_id = CEPH_NOSNAP;
4807 }
4808
4809 return 0;
4810}
4811
4812/*
4813 * A parent image will have all ids but none of the names.
e1d4213f 4814 *
04077599
ID
4815 * All names in an rbd spec are dynamically allocated. It's OK if we
4816 * can't figure out the name for an image id.
9e15b77d 4817 */
04077599 4818static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 4819{
2e9f7f1c
AE
4820 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4821 struct rbd_spec *spec = rbd_dev->spec;
4822 const char *pool_name;
4823 const char *image_name;
4824 const char *snap_name;
9e15b77d
AE
4825 int ret;
4826
04077599
ID
4827 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4828 rbd_assert(spec->image_id);
4829 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 4830
2e9f7f1c 4831 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4832
2e9f7f1c
AE
4833 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4834 if (!pool_name) {
4835 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4836 return -EIO;
4837 }
2e9f7f1c
AE
4838 pool_name = kstrdup(pool_name, GFP_KERNEL);
4839 if (!pool_name)
9e15b77d
AE
4840 return -ENOMEM;
4841
4842 /* Fetch the image name; tolerate failure here */
4843
2e9f7f1c
AE
4844 image_name = rbd_dev_image_name(rbd_dev);
4845 if (!image_name)
06ecc6cb 4846 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4847
04077599 4848 /* Fetch the snapshot name */
9e15b77d 4849
2e9f7f1c 4850 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4851 if (IS_ERR(snap_name)) {
4852 ret = PTR_ERR(snap_name);
9e15b77d 4853 goto out_err;
2e9f7f1c
AE
4854 }
4855
4856 spec->pool_name = pool_name;
4857 spec->image_name = image_name;
4858 spec->snap_name = snap_name;
9e15b77d
AE
4859
4860 return 0;
04077599 4861
9e15b77d 4862out_err:
2e9f7f1c
AE
4863 kfree(image_name);
4864 kfree(pool_name);
9e15b77d
AE
4865 return ret;
4866}
4867
cc4a38bd 4868static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4869{
4870 size_t size;
4871 int ret;
4872 void *reply_buf;
4873 void *p;
4874 void *end;
4875 u64 seq;
4876 u32 snap_count;
4877 struct ceph_snap_context *snapc;
4878 u32 i;
4879
4880 /*
4881 * We'll need room for the seq value (maximum snapshot id),
4882 * snapshot count, and array of that many snapshot ids.
4883 * For now we have a fixed upper limit on the number we're
4884 * prepared to receive.
4885 */
4886 size = sizeof (__le64) + sizeof (__le32) +
4887 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4888 reply_buf = kzalloc(size, GFP_KERNEL);
4889 if (!reply_buf)
4890 return -ENOMEM;
4891
ecd4a68a
ID
4892 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4893 &rbd_dev->header_oloc, "get_snapcontext",
4894 NULL, 0, reply_buf, size);
36be9a76 4895 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4896 if (ret < 0)
4897 goto out;
4898
35d489f9 4899 p = reply_buf;
57385b51
AE
4900 end = reply_buf + ret;
4901 ret = -ERANGE;
35d489f9
AE
4902 ceph_decode_64_safe(&p, end, seq, out);
4903 ceph_decode_32_safe(&p, end, snap_count, out);
4904
4905 /*
4906 * Make sure the reported number of snapshot ids wouldn't go
4907 * beyond the end of our buffer. But before checking that,
4908 * make sure the computed size of the snapshot context we
4909 * allocate is representable in a size_t.
4910 */
4911 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4912 / sizeof (u64)) {
4913 ret = -EINVAL;
4914 goto out;
4915 }
4916 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4917 goto out;
468521c1 4918 ret = 0;
35d489f9 4919
812164f8 4920 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4921 if (!snapc) {
4922 ret = -ENOMEM;
4923 goto out;
4924 }
35d489f9 4925 snapc->seq = seq;
35d489f9
AE
4926 for (i = 0; i < snap_count; i++)
4927 snapc->snaps[i] = ceph_decode_64(&p);
4928
49ece554 4929 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4930 rbd_dev->header.snapc = snapc;
4931
4932 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4933 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4934out:
4935 kfree(reply_buf);
4936
57385b51 4937 return ret;
35d489f9
AE
4938}
4939
54cac61f
AE
4940static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4941 u64 snap_id)
b8b1e2db
AE
4942{
4943 size_t size;
4944 void *reply_buf;
54cac61f 4945 __le64 snapid;
b8b1e2db
AE
4946 int ret;
4947 void *p;
4948 void *end;
b8b1e2db
AE
4949 char *snap_name;
4950
4951 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4952 reply_buf = kmalloc(size, GFP_KERNEL);
4953 if (!reply_buf)
4954 return ERR_PTR(-ENOMEM);
4955
54cac61f 4956 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
4957 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4958 &rbd_dev->header_oloc, "get_snapshot_name",
4959 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4960 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4961 if (ret < 0) {
4962 snap_name = ERR_PTR(ret);
b8b1e2db 4963 goto out;
f40eb349 4964 }
b8b1e2db
AE
4965
4966 p = reply_buf;
f40eb349 4967 end = reply_buf + ret;
e5c35534 4968 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4969 if (IS_ERR(snap_name))
b8b1e2db 4970 goto out;
b8b1e2db 4971
f40eb349 4972 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4973 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4974out:
4975 kfree(reply_buf);
4976
f40eb349 4977 return snap_name;
b8b1e2db
AE
4978}
4979
2df3fac7 4980static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4981{
2df3fac7 4982 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4983 int ret;
117973fb 4984
1617e40c
JD
4985 ret = rbd_dev_v2_image_size(rbd_dev);
4986 if (ret)
cfbf6377 4987 return ret;
1617e40c 4988
2df3fac7
AE
4989 if (first_time) {
4990 ret = rbd_dev_v2_header_onetime(rbd_dev);
4991 if (ret)
cfbf6377 4992 return ret;
2df3fac7
AE
4993 }
4994
cc4a38bd 4995 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
4996 if (ret && first_time) {
4997 kfree(rbd_dev->header.object_prefix);
4998 rbd_dev->header.object_prefix = NULL;
4999 }
117973fb
AE
5000
5001 return ret;
5002}
5003
a720ae09
ID
5004static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5005{
5006 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5007
5008 if (rbd_dev->image_format == 1)
5009 return rbd_dev_v1_header_info(rbd_dev);
5010
5011 return rbd_dev_v2_header_info(rbd_dev);
5012}
5013
e28fff26
AE
5014/*
5015 * Skips over white space at *buf, and updates *buf to point to the
5016 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5017 * the token (string of non-white space characters) found. Note
5018 * that *buf must be terminated with '\0'.
e28fff26
AE
5019 */
5020static inline size_t next_token(const char **buf)
5021{
5022 /*
5023 * These are the characters that produce nonzero for
5024 * isspace() in the "C" and "POSIX" locales.
5025 */
5026 const char *spaces = " \f\n\r\t\v";
5027
5028 *buf += strspn(*buf, spaces); /* Find start of token */
5029
5030 return strcspn(*buf, spaces); /* Return token length */
5031}
5032
ea3352f4
AE
5033/*
5034 * Finds the next token in *buf, dynamically allocates a buffer big
5035 * enough to hold a copy of it, and copies the token into the new
5036 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5037 * that a duplicate buffer is created even for a zero-length token.
5038 *
5039 * Returns a pointer to the newly-allocated duplicate, or a null
5040 * pointer if memory for the duplicate was not available. If
5041 * the lenp argument is a non-null pointer, the length of the token
5042 * (not including the '\0') is returned in *lenp.
5043 *
5044 * If successful, the *buf pointer will be updated to point beyond
5045 * the end of the found token.
5046 *
5047 * Note: uses GFP_KERNEL for allocation.
5048 */
5049static inline char *dup_token(const char **buf, size_t *lenp)
5050{
5051 char *dup;
5052 size_t len;
5053
5054 len = next_token(buf);
4caf35f9 5055 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5056 if (!dup)
5057 return NULL;
ea3352f4
AE
5058 *(dup + len) = '\0';
5059 *buf += len;
5060
5061 if (lenp)
5062 *lenp = len;
5063
5064 return dup;
5065}
5066
a725f65e 5067/*
859c31df
AE
5068 * Parse the options provided for an "rbd add" (i.e., rbd image
5069 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5070 * and the data written is passed here via a NUL-terminated buffer.
5071 * Returns 0 if successful or an error code otherwise.
d22f76e7 5072 *
859c31df
AE
5073 * The information extracted from these options is recorded in
5074 * the other parameters which return dynamically-allocated
5075 * structures:
5076 * ceph_opts
5077 * The address of a pointer that will refer to a ceph options
5078 * structure. Caller must release the returned pointer using
5079 * ceph_destroy_options() when it is no longer needed.
5080 * rbd_opts
5081 * Address of an rbd options pointer. Fully initialized by
5082 * this function; caller must release with kfree().
5083 * spec
5084 * Address of an rbd image specification pointer. Fully
5085 * initialized by this function based on parsed options.
5086 * Caller must release with rbd_spec_put().
5087 *
5088 * The options passed take this form:
5089 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5090 * where:
5091 * <mon_addrs>
5092 * A comma-separated list of one or more monitor addresses.
5093 * A monitor address is an ip address, optionally followed
5094 * by a port number (separated by a colon).
5095 * I.e.: ip1[:port1][,ip2[:port2]...]
5096 * <options>
5097 * A comma-separated list of ceph and/or rbd options.
5098 * <pool_name>
5099 * The name of the rados pool containing the rbd image.
5100 * <image_name>
5101 * The name of the image in that pool to map.
5102 * <snap_id>
5103 * An optional snapshot id. If provided, the mapping will
5104 * present data from the image at the time that snapshot was
5105 * created. The image head is used if no snapshot id is
5106 * provided. Snapshot mappings are always read-only.
a725f65e 5107 */
859c31df 5108static int rbd_add_parse_args(const char *buf,
dc79b113 5109 struct ceph_options **ceph_opts,
859c31df
AE
5110 struct rbd_options **opts,
5111 struct rbd_spec **rbd_spec)
e28fff26 5112{
d22f76e7 5113 size_t len;
859c31df 5114 char *options;
0ddebc0c 5115 const char *mon_addrs;
ecb4dc22 5116 char *snap_name;
0ddebc0c 5117 size_t mon_addrs_size;
859c31df 5118 struct rbd_spec *spec = NULL;
4e9afeba 5119 struct rbd_options *rbd_opts = NULL;
859c31df 5120 struct ceph_options *copts;
dc79b113 5121 int ret;
e28fff26
AE
5122
5123 /* The first four tokens are required */
5124
7ef3214a 5125 len = next_token(&buf);
4fb5d671
AE
5126 if (!len) {
5127 rbd_warn(NULL, "no monitor address(es) provided");
5128 return -EINVAL;
5129 }
0ddebc0c 5130 mon_addrs = buf;
f28e565a 5131 mon_addrs_size = len + 1;
7ef3214a 5132 buf += len;
a725f65e 5133
dc79b113 5134 ret = -EINVAL;
f28e565a
AE
5135 options = dup_token(&buf, NULL);
5136 if (!options)
dc79b113 5137 return -ENOMEM;
4fb5d671
AE
5138 if (!*options) {
5139 rbd_warn(NULL, "no options provided");
5140 goto out_err;
5141 }
e28fff26 5142
859c31df
AE
5143 spec = rbd_spec_alloc();
5144 if (!spec)
f28e565a 5145 goto out_mem;
859c31df
AE
5146
5147 spec->pool_name = dup_token(&buf, NULL);
5148 if (!spec->pool_name)
5149 goto out_mem;
4fb5d671
AE
5150 if (!*spec->pool_name) {
5151 rbd_warn(NULL, "no pool name provided");
5152 goto out_err;
5153 }
e28fff26 5154
69e7a02f 5155 spec->image_name = dup_token(&buf, NULL);
859c31df 5156 if (!spec->image_name)
f28e565a 5157 goto out_mem;
4fb5d671
AE
5158 if (!*spec->image_name) {
5159 rbd_warn(NULL, "no image name provided");
5160 goto out_err;
5161 }
d4b125e9 5162
f28e565a
AE
5163 /*
5164 * Snapshot name is optional; default is to use "-"
5165 * (indicating the head/no snapshot).
5166 */
3feeb894 5167 len = next_token(&buf);
820a5f3e 5168 if (!len) {
3feeb894
AE
5169 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5170 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5171 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5172 ret = -ENAMETOOLONG;
f28e565a 5173 goto out_err;
849b4260 5174 }
ecb4dc22
AE
5175 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5176 if (!snap_name)
f28e565a 5177 goto out_mem;
ecb4dc22
AE
5178 *(snap_name + len) = '\0';
5179 spec->snap_name = snap_name;
e5c35534 5180
0ddebc0c 5181 /* Initialize all rbd options to the defaults */
e28fff26 5182
4e9afeba
AE
5183 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5184 if (!rbd_opts)
5185 goto out_mem;
5186
5187 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5188 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5189 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
e010dd0a 5190 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
d22f76e7 5191
859c31df 5192 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5193 mon_addrs + mon_addrs_size - 1,
4e9afeba 5194 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5195 if (IS_ERR(copts)) {
5196 ret = PTR_ERR(copts);
dc79b113
AE
5197 goto out_err;
5198 }
859c31df
AE
5199 kfree(options);
5200
5201 *ceph_opts = copts;
4e9afeba 5202 *opts = rbd_opts;
859c31df 5203 *rbd_spec = spec;
0ddebc0c 5204
dc79b113 5205 return 0;
f28e565a 5206out_mem:
dc79b113 5207 ret = -ENOMEM;
d22f76e7 5208out_err:
859c31df
AE
5209 kfree(rbd_opts);
5210 rbd_spec_put(spec);
f28e565a 5211 kfree(options);
d22f76e7 5212
dc79b113 5213 return ret;
a725f65e
AE
5214}
5215
e010dd0a
ID
5216static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5217{
5218 down_write(&rbd_dev->lock_rwsem);
5219 if (__rbd_is_lock_owner(rbd_dev))
5220 rbd_unlock(rbd_dev);
5221 up_write(&rbd_dev->lock_rwsem);
5222}
5223
5224static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5225{
2f18d466
ID
5226 int ret;
5227
e010dd0a
ID
5228 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5229 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5230 return -EINVAL;
5231 }
5232
5233 /* FIXME: "rbd map --exclusive" should be in interruptible */
5234 down_read(&rbd_dev->lock_rwsem);
2f18d466 5235 ret = rbd_wait_state_locked(rbd_dev, true);
e010dd0a 5236 up_read(&rbd_dev->lock_rwsem);
2f18d466 5237 if (ret) {
e010dd0a
ID
5238 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5239 return -EROFS;
5240 }
5241
5242 return 0;
5243}
5244
589d30e0
AE
5245/*
5246 * An rbd format 2 image has a unique identifier, distinct from the
5247 * name given to it by the user. Internally, that identifier is
5248 * what's used to specify the names of objects related to the image.
5249 *
5250 * A special "rbd id" object is used to map an rbd image name to its
5251 * id. If that object doesn't exist, then there is no v2 rbd image
5252 * with the supplied name.
5253 *
5254 * This function will record the given rbd_dev's image_id field if
5255 * it can be determined, and in that case will return 0. If any
5256 * errors occur a negative errno will be returned and the rbd_dev's
5257 * image_id field will be unchanged (and should be NULL).
5258 */
5259static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5260{
5261 int ret;
5262 size_t size;
ecd4a68a 5263 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5264 void *response;
c0fba368 5265 char *image_id;
2f82ee54 5266
2c0d0a10
AE
5267 /*
5268 * When probing a parent image, the image id is already
5269 * known (and the image name likely is not). There's no
c0fba368
AE
5270 * need to fetch the image id again in this case. We
5271 * do still need to set the image format though.
2c0d0a10 5272 */
c0fba368
AE
5273 if (rbd_dev->spec->image_id) {
5274 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5275
2c0d0a10 5276 return 0;
c0fba368 5277 }
2c0d0a10 5278
589d30e0
AE
5279 /*
5280 * First, see if the format 2 image id file exists, and if
5281 * so, get the image's persistent id from it.
5282 */
ecd4a68a
ID
5283 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5284 rbd_dev->spec->image_name);
5285 if (ret)
5286 return ret;
5287
5288 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5289
5290 /* Response will be an encoded string, which includes a length */
5291
5292 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5293 response = kzalloc(size, GFP_NOIO);
5294 if (!response) {
5295 ret = -ENOMEM;
5296 goto out;
5297 }
5298
c0fba368
AE
5299 /* If it doesn't exist we'll assume it's a format 1 image */
5300
ecd4a68a
ID
5301 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5302 "get_id", NULL, 0,
5303 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5304 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5305 if (ret == -ENOENT) {
5306 image_id = kstrdup("", GFP_KERNEL);
5307 ret = image_id ? 0 : -ENOMEM;
5308 if (!ret)
5309 rbd_dev->image_format = 1;
7dd440c9 5310 } else if (ret >= 0) {
c0fba368
AE
5311 void *p = response;
5312
5313 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5314 NULL, GFP_NOIO);
461f758a 5315 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5316 if (!ret)
5317 rbd_dev->image_format = 2;
c0fba368
AE
5318 }
5319
5320 if (!ret) {
5321 rbd_dev->spec->image_id = image_id;
5322 dout("image_id is %s\n", image_id);
589d30e0
AE
5323 }
5324out:
5325 kfree(response);
ecd4a68a 5326 ceph_oid_destroy(&oid);
589d30e0
AE
5327 return ret;
5328}
5329
3abef3b3
AE
5330/*
5331 * Undo whatever state changes are made by v1 or v2 header info
5332 * call.
5333 */
6fd48b3b
AE
5334static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5335{
5336 struct rbd_image_header *header;
5337
e69b8d41 5338 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5339
5340 /* Free dynamic fields from the header, then zero it out */
5341
5342 header = &rbd_dev->header;
812164f8 5343 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5344 kfree(header->snap_sizes);
5345 kfree(header->snap_names);
5346 kfree(header->object_prefix);
5347 memset(header, 0, sizeof (*header));
5348}
5349
2df3fac7 5350static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5351{
5352 int ret;
a30b71b9 5353
1e130199 5354 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5355 if (ret)
b1b5402a
AE
5356 goto out_err;
5357
2df3fac7
AE
5358 /*
5359 * Get the and check features for the image. Currently the
5360 * features are assumed to never change.
5361 */
b1b5402a 5362 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5363 if (ret)
9d475de5 5364 goto out_err;
35d489f9 5365
cc070d59
AE
5366 /* If the image supports fancy striping, get its parameters */
5367
5368 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5369 ret = rbd_dev_v2_striping_info(rbd_dev);
5370 if (ret < 0)
5371 goto out_err;
5372 }
a30b71b9 5373
7e97332e
ID
5374 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5375 ret = rbd_dev_v2_data_pool(rbd_dev);
5376 if (ret)
5377 goto out_err;
5378 }
5379
263423f8 5380 rbd_init_layout(rbd_dev);
35152979 5381 return 0;
263423f8 5382
9d475de5 5383out_err:
642a2537 5384 rbd_dev->header.features = 0;
1e130199
AE
5385 kfree(rbd_dev->header.object_prefix);
5386 rbd_dev->header.object_prefix = NULL;
9d475de5 5387 return ret;
a30b71b9
AE
5388}
5389
6d69bb53
ID
5390/*
5391 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5392 * rbd_dev_image_probe() recursion depth, which means it's also the
5393 * length of the already discovered part of the parent chain.
5394 */
5395static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5396{
2f82ee54 5397 struct rbd_device *parent = NULL;
124afba2
AE
5398 int ret;
5399
5400 if (!rbd_dev->parent_spec)
5401 return 0;
124afba2 5402
6d69bb53
ID
5403 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5404 pr_info("parent chain is too long (%d)\n", depth);
5405 ret = -EINVAL;
5406 goto out_err;
5407 }
5408
1643dfa4 5409 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5410 if (!parent) {
5411 ret = -ENOMEM;
124afba2 5412 goto out_err;
1f2c6651
ID
5413 }
5414
5415 /*
5416 * Images related by parent/child relationships always share
5417 * rbd_client and spec/parent_spec, so bump their refcounts.
5418 */
5419 __rbd_get_client(rbd_dev->rbd_client);
5420 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5421
6d69bb53 5422 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5423 if (ret < 0)
5424 goto out_err;
1f2c6651 5425
124afba2 5426 rbd_dev->parent = parent;
a2acd00e 5427 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5428 return 0;
1f2c6651 5429
124afba2 5430out_err:
1f2c6651 5431 rbd_dev_unparent(rbd_dev);
1761b229 5432 rbd_dev_destroy(parent);
124afba2
AE
5433 return ret;
5434}
5435
5769ed0c
ID
5436static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5437{
5438 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5439 rbd_dev_mapping_clear(rbd_dev);
5440 rbd_free_disk(rbd_dev);
5441 if (!single_major)
5442 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5443}
5444
811c6688
ID
5445/*
5446 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5447 * upon return.
5448 */
200a6a8b 5449static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5450{
83a06263 5451 int ret;
d1cf5788 5452
9b60e70b 5453 /* Record our major and minor device numbers. */
83a06263 5454
9b60e70b
ID
5455 if (!single_major) {
5456 ret = register_blkdev(0, rbd_dev->name);
5457 if (ret < 0)
1643dfa4 5458 goto err_out_unlock;
9b60e70b
ID
5459
5460 rbd_dev->major = ret;
5461 rbd_dev->minor = 0;
5462 } else {
5463 rbd_dev->major = rbd_major;
5464 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5465 }
83a06263
AE
5466
5467 /* Set up the blkdev mapping. */
5468
5469 ret = rbd_init_disk(rbd_dev);
5470 if (ret)
5471 goto err_out_blkdev;
5472
f35a4dee 5473 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5474 if (ret)
5475 goto err_out_disk;
bc1ecc65 5476
f35a4dee 5477 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
9568c93e 5478 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
f35a4dee 5479
5769ed0c 5480 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 5481 if (ret)
f5ee37bd 5482 goto err_out_mapping;
83a06263 5483
129b79d4 5484 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5485 up_write(&rbd_dev->header_rwsem);
5769ed0c 5486 return 0;
2f82ee54 5487
f35a4dee
AE
5488err_out_mapping:
5489 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5490err_out_disk:
5491 rbd_free_disk(rbd_dev);
5492err_out_blkdev:
9b60e70b
ID
5493 if (!single_major)
5494 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5495err_out_unlock:
5496 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5497 return ret;
5498}
5499
332bb12d
AE
5500static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5501{
5502 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5503 int ret;
332bb12d
AE
5504
5505 /* Record the header object name for this rbd image. */
5506
5507 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 5508 if (rbd_dev->image_format == 1)
c41d13a3
ID
5509 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5510 spec->image_name, RBD_SUFFIX);
332bb12d 5511 else
c41d13a3
ID
5512 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5513 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5514
c41d13a3 5515 return ret;
332bb12d
AE
5516}
5517
200a6a8b
AE
5518static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5519{
6fd48b3b 5520 rbd_dev_unprobe(rbd_dev);
fd22aef8
ID
5521 if (rbd_dev->opts)
5522 rbd_unregister_watch(rbd_dev);
6fd48b3b
AE
5523 rbd_dev->image_format = 0;
5524 kfree(rbd_dev->spec->image_id);
5525 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
5526}
5527
a30b71b9
AE
5528/*
5529 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
5530 * device. If this image is the one being mapped (i.e., not a
5531 * parent), initiate a watch on its header object before using that
5532 * object to get detailed information about the rbd image.
a30b71b9 5533 */
6d69bb53 5534static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
5535{
5536 int ret;
5537
5538 /*
3abef3b3
AE
5539 * Get the id from the image id object. Unless there's an
5540 * error, rbd_dev->spec->image_id will be filled in with
5541 * a dynamically-allocated string, and rbd_dev->image_format
5542 * will be set to either 1 or 2.
a30b71b9
AE
5543 */
5544 ret = rbd_dev_image_id(rbd_dev);
5545 if (ret)
c0fba368 5546 return ret;
c0fba368 5547
332bb12d
AE
5548 ret = rbd_dev_header_name(rbd_dev);
5549 if (ret)
5550 goto err_out_format;
5551
6d69bb53 5552 if (!depth) {
99d16943 5553 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
5554 if (ret) {
5555 if (ret == -ENOENT)
5556 pr_info("image %s/%s does not exist\n",
5557 rbd_dev->spec->pool_name,
5558 rbd_dev->spec->image_name);
c41d13a3 5559 goto err_out_format;
1fe48023 5560 }
1f3ef788 5561 }
b644de2b 5562
a720ae09 5563 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 5564 if (ret)
b644de2b 5565 goto err_out_watch;
83a06263 5566
04077599
ID
5567 /*
5568 * If this image is the one being mapped, we have pool name and
5569 * id, image name and id, and snap name - need to fill snap id.
5570 * Otherwise this is a parent image, identified by pool, image
5571 * and snap ids - need to fill in names for those ids.
5572 */
6d69bb53 5573 if (!depth)
04077599
ID
5574 ret = rbd_spec_fill_snap_id(rbd_dev);
5575 else
5576 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
5577 if (ret) {
5578 if (ret == -ENOENT)
5579 pr_info("snap %s/%s@%s does not exist\n",
5580 rbd_dev->spec->pool_name,
5581 rbd_dev->spec->image_name,
5582 rbd_dev->spec->snap_name);
33dca39f 5583 goto err_out_probe;
1fe48023 5584 }
9bb81c9b 5585
e8f59b59
ID
5586 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5587 ret = rbd_dev_v2_parent_info(rbd_dev);
5588 if (ret)
5589 goto err_out_probe;
5590
5591 /*
5592 * Need to warn users if this image is the one being
5593 * mapped and has a parent.
5594 */
6d69bb53 5595 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
5596 rbd_warn(rbd_dev,
5597 "WARNING: kernel layering is EXPERIMENTAL!");
5598 }
5599
6d69bb53 5600 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
5601 if (ret)
5602 goto err_out_probe;
5603
5604 dout("discovered format %u image, header name is %s\n",
c41d13a3 5605 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 5606 return 0;
e8f59b59 5607
6fd48b3b
AE
5608err_out_probe:
5609 rbd_dev_unprobe(rbd_dev);
b644de2b 5610err_out_watch:
6d69bb53 5611 if (!depth)
99d16943 5612 rbd_unregister_watch(rbd_dev);
332bb12d
AE
5613err_out_format:
5614 rbd_dev->image_format = 0;
5655c4d9
AE
5615 kfree(rbd_dev->spec->image_id);
5616 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
5617 return ret;
5618}
5619
9b60e70b
ID
5620static ssize_t do_rbd_add(struct bus_type *bus,
5621 const char *buf,
5622 size_t count)
602adf40 5623{
cb8627c7 5624 struct rbd_device *rbd_dev = NULL;
dc79b113 5625 struct ceph_options *ceph_opts = NULL;
4e9afeba 5626 struct rbd_options *rbd_opts = NULL;
859c31df 5627 struct rbd_spec *spec = NULL;
9d3997fd 5628 struct rbd_client *rbdc;
b51c83c2 5629 int rc;
602adf40
YS
5630
5631 if (!try_module_get(THIS_MODULE))
5632 return -ENODEV;
5633
602adf40 5634 /* parse add command */
859c31df 5635 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5636 if (rc < 0)
dd5ac32d 5637 goto out;
78cea76e 5638
9d3997fd
AE
5639 rbdc = rbd_get_client(ceph_opts);
5640 if (IS_ERR(rbdc)) {
5641 rc = PTR_ERR(rbdc);
0ddebc0c 5642 goto err_out_args;
9d3997fd 5643 }
602adf40 5644
602adf40 5645 /* pick the pool */
dd435855 5646 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
1fe48023
ID
5647 if (rc < 0) {
5648 if (rc == -ENOENT)
5649 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 5650 goto err_out_client;
1fe48023 5651 }
c0cd10db 5652 spec->pool_id = (u64)rc;
859c31df 5653
d147543d 5654 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
5655 if (!rbd_dev) {
5656 rc = -ENOMEM;
bd4ba655 5657 goto err_out_client;
b51c83c2 5658 }
c53d5893
AE
5659 rbdc = NULL; /* rbd_dev now owns this */
5660 spec = NULL; /* rbd_dev now owns this */
d147543d 5661 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 5662
0d6d1e9c
MC
5663 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5664 if (!rbd_dev->config_info) {
5665 rc = -ENOMEM;
5666 goto err_out_rbd_dev;
5667 }
5668
811c6688 5669 down_write(&rbd_dev->header_rwsem);
6d69bb53 5670 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
5671 if (rc < 0) {
5672 up_write(&rbd_dev->header_rwsem);
c53d5893 5673 goto err_out_rbd_dev;
0d6d1e9c 5674 }
05fd6f6f 5675
7ce4eef7 5676 /* If we are mapping a snapshot it must be marked read-only */
7ce4eef7 5677 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9568c93e 5678 rbd_dev->opts->read_only = true;
7ce4eef7 5679
b536f69a 5680 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 5681 if (rc)
8b679ec5 5682 goto err_out_image_probe;
3abef3b3 5683
e010dd0a
ID
5684 if (rbd_dev->opts->exclusive) {
5685 rc = rbd_add_acquire_lock(rbd_dev);
5686 if (rc)
5687 goto err_out_device_setup;
3abef3b3
AE
5688 }
5689
5769ed0c
ID
5690 /* Everything's ready. Announce the disk to the world. */
5691
5692 rc = device_add(&rbd_dev->dev);
5693 if (rc)
e010dd0a 5694 goto err_out_image_lock;
5769ed0c
ID
5695
5696 add_disk(rbd_dev->disk);
5697 /* see rbd_init_disk() */
5698 blk_put_queue(rbd_dev->disk->queue);
5699
5700 spin_lock(&rbd_dev_list_lock);
5701 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5702 spin_unlock(&rbd_dev_list_lock);
5703
5704 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5705 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5706 rbd_dev->header.features);
dd5ac32d
ID
5707 rc = count;
5708out:
5709 module_put(THIS_MODULE);
5710 return rc;
b536f69a 5711
e010dd0a
ID
5712err_out_image_lock:
5713 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
5714err_out_device_setup:
5715 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
5716err_out_image_probe:
5717 rbd_dev_image_release(rbd_dev);
c53d5893
AE
5718err_out_rbd_dev:
5719 rbd_dev_destroy(rbd_dev);
bd4ba655 5720err_out_client:
9d3997fd 5721 rbd_put_client(rbdc);
0ddebc0c 5722err_out_args:
859c31df 5723 rbd_spec_put(spec);
d147543d 5724 kfree(rbd_opts);
dd5ac32d 5725 goto out;
602adf40
YS
5726}
5727
9b60e70b
ID
5728static ssize_t rbd_add(struct bus_type *bus,
5729 const char *buf,
5730 size_t count)
5731{
5732 if (single_major)
5733 return -EINVAL;
5734
5735 return do_rbd_add(bus, buf, count);
5736}
5737
5738static ssize_t rbd_add_single_major(struct bus_type *bus,
5739 const char *buf,
5740 size_t count)
5741{
5742 return do_rbd_add(bus, buf, count);
5743}
5744
05a46afd
AE
5745static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5746{
ad945fc1 5747 while (rbd_dev->parent) {
05a46afd
AE
5748 struct rbd_device *first = rbd_dev;
5749 struct rbd_device *second = first->parent;
5750 struct rbd_device *third;
5751
5752 /*
5753 * Follow to the parent with no grandparent and
5754 * remove it.
5755 */
5756 while (second && (third = second->parent)) {
5757 first = second;
5758 second = third;
5759 }
ad945fc1 5760 rbd_assert(second);
8ad42cd0 5761 rbd_dev_image_release(second);
8b679ec5 5762 rbd_dev_destroy(second);
ad945fc1
AE
5763 first->parent = NULL;
5764 first->parent_overlap = 0;
5765
5766 rbd_assert(first->parent_spec);
05a46afd
AE
5767 rbd_spec_put(first->parent_spec);
5768 first->parent_spec = NULL;
05a46afd
AE
5769 }
5770}
5771
9b60e70b
ID
5772static ssize_t do_rbd_remove(struct bus_type *bus,
5773 const char *buf,
5774 size_t count)
602adf40
YS
5775{
5776 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5777 struct list_head *tmp;
5778 int dev_id;
0276dca6 5779 char opt_buf[6];
82a442d2 5780 bool already = false;
0276dca6 5781 bool force = false;
0d8189e1 5782 int ret;
602adf40 5783
0276dca6
MC
5784 dev_id = -1;
5785 opt_buf[0] = '\0';
5786 sscanf(buf, "%d %5s", &dev_id, opt_buf);
5787 if (dev_id < 0) {
5788 pr_err("dev_id out of range\n");
602adf40 5789 return -EINVAL;
0276dca6
MC
5790 }
5791 if (opt_buf[0] != '\0') {
5792 if (!strcmp(opt_buf, "force")) {
5793 force = true;
5794 } else {
5795 pr_err("bad remove option at '%s'\n", opt_buf);
5796 return -EINVAL;
5797 }
5798 }
602adf40 5799
751cc0e3
AE
5800 ret = -ENOENT;
5801 spin_lock(&rbd_dev_list_lock);
5802 list_for_each(tmp, &rbd_dev_list) {
5803 rbd_dev = list_entry(tmp, struct rbd_device, node);
5804 if (rbd_dev->dev_id == dev_id) {
5805 ret = 0;
5806 break;
5807 }
42382b70 5808 }
751cc0e3
AE
5809 if (!ret) {
5810 spin_lock_irq(&rbd_dev->lock);
0276dca6 5811 if (rbd_dev->open_count && !force)
751cc0e3
AE
5812 ret = -EBUSY;
5813 else
82a442d2
AE
5814 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5815 &rbd_dev->flags);
751cc0e3
AE
5816 spin_unlock_irq(&rbd_dev->lock);
5817 }
5818 spin_unlock(&rbd_dev_list_lock);
82a442d2 5819 if (ret < 0 || already)
1ba0f1e7 5820 return ret;
751cc0e3 5821
0276dca6
MC
5822 if (force) {
5823 /*
5824 * Prevent new IO from being queued and wait for existing
5825 * IO to complete/fail.
5826 */
5827 blk_mq_freeze_queue(rbd_dev->disk->queue);
5828 blk_set_queue_dying(rbd_dev->disk->queue);
5829 }
5830
5769ed0c
ID
5831 del_gendisk(rbd_dev->disk);
5832 spin_lock(&rbd_dev_list_lock);
5833 list_del_init(&rbd_dev->node);
5834 spin_unlock(&rbd_dev_list_lock);
5835 device_del(&rbd_dev->dev);
fca27065 5836
e010dd0a 5837 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 5838 rbd_dev_device_release(rbd_dev);
8ad42cd0 5839 rbd_dev_image_release(rbd_dev);
8b679ec5 5840 rbd_dev_destroy(rbd_dev);
1ba0f1e7 5841 return count;
602adf40
YS
5842}
5843
9b60e70b
ID
5844static ssize_t rbd_remove(struct bus_type *bus,
5845 const char *buf,
5846 size_t count)
5847{
5848 if (single_major)
5849 return -EINVAL;
5850
5851 return do_rbd_remove(bus, buf, count);
5852}
5853
5854static ssize_t rbd_remove_single_major(struct bus_type *bus,
5855 const char *buf,
5856 size_t count)
5857{
5858 return do_rbd_remove(bus, buf, count);
5859}
5860
602adf40
YS
5861/*
5862 * create control files in sysfs
dfc5606d 5863 * /sys/bus/rbd/...
602adf40
YS
5864 */
5865static int rbd_sysfs_init(void)
5866{
dfc5606d 5867 int ret;
602adf40 5868
fed4c143 5869 ret = device_register(&rbd_root_dev);
21079786 5870 if (ret < 0)
dfc5606d 5871 return ret;
602adf40 5872
fed4c143
AE
5873 ret = bus_register(&rbd_bus_type);
5874 if (ret < 0)
5875 device_unregister(&rbd_root_dev);
602adf40 5876
602adf40
YS
5877 return ret;
5878}
5879
5880static void rbd_sysfs_cleanup(void)
5881{
dfc5606d 5882 bus_unregister(&rbd_bus_type);
fed4c143 5883 device_unregister(&rbd_root_dev);
602adf40
YS
5884}
5885
1c2a9dfe
AE
5886static int rbd_slab_init(void)
5887{
5888 rbd_assert(!rbd_img_request_cache);
03d94406 5889 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
5890 if (!rbd_img_request_cache)
5891 return -ENOMEM;
5892
5893 rbd_assert(!rbd_obj_request_cache);
03d94406 5894 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
5895 if (!rbd_obj_request_cache)
5896 goto out_err;
5897
6c696d85 5898 return 0;
1c2a9dfe 5899
6c696d85 5900out_err:
868311b1
AE
5901 kmem_cache_destroy(rbd_img_request_cache);
5902 rbd_img_request_cache = NULL;
1c2a9dfe
AE
5903 return -ENOMEM;
5904}
5905
5906static void rbd_slab_exit(void)
5907{
868311b1
AE
5908 rbd_assert(rbd_obj_request_cache);
5909 kmem_cache_destroy(rbd_obj_request_cache);
5910 rbd_obj_request_cache = NULL;
5911
1c2a9dfe
AE
5912 rbd_assert(rbd_img_request_cache);
5913 kmem_cache_destroy(rbd_img_request_cache);
5914 rbd_img_request_cache = NULL;
5915}
5916
cc344fa1 5917static int __init rbd_init(void)
602adf40
YS
5918{
5919 int rc;
5920
1e32d34c
AE
5921 if (!libceph_compatible(NULL)) {
5922 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5923 return -EINVAL;
5924 }
e1b4d96d 5925
1c2a9dfe 5926 rc = rbd_slab_init();
602adf40
YS
5927 if (rc)
5928 return rc;
e1b4d96d 5929
f5ee37bd
ID
5930 /*
5931 * The number of active work items is limited by the number of
f77303bd 5932 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
5933 */
5934 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5935 if (!rbd_wq) {
5936 rc = -ENOMEM;
5937 goto err_out_slab;
5938 }
5939
9b60e70b
ID
5940 if (single_major) {
5941 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5942 if (rbd_major < 0) {
5943 rc = rbd_major;
f5ee37bd 5944 goto err_out_wq;
9b60e70b
ID
5945 }
5946 }
5947
1c2a9dfe
AE
5948 rc = rbd_sysfs_init();
5949 if (rc)
9b60e70b
ID
5950 goto err_out_blkdev;
5951
5952 if (single_major)
5953 pr_info("loaded (major %d)\n", rbd_major);
5954 else
5955 pr_info("loaded\n");
1c2a9dfe 5956
e1b4d96d
ID
5957 return 0;
5958
9b60e70b
ID
5959err_out_blkdev:
5960 if (single_major)
5961 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
5962err_out_wq:
5963 destroy_workqueue(rbd_wq);
e1b4d96d
ID
5964err_out_slab:
5965 rbd_slab_exit();
1c2a9dfe 5966 return rc;
602adf40
YS
5967}
5968
cc344fa1 5969static void __exit rbd_exit(void)
602adf40 5970{
ffe312cf 5971 ida_destroy(&rbd_dev_id_ida);
602adf40 5972 rbd_sysfs_cleanup();
9b60e70b
ID
5973 if (single_major)
5974 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 5975 destroy_workqueue(rbd_wq);
1c2a9dfe 5976 rbd_slab_exit();
602adf40
YS
5977}
5978
5979module_init(rbd_init);
5980module_exit(rbd_exit);
5981
d552c619 5982MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
5983MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5984MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
5985/* following authorship retained from original osdblk.c */
5986MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5987
90da258b 5988MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 5989MODULE_LICENSE("GPL");