rbd: clear ->xferred on error from rbd_obj_issue_copyup()
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
43df3d35 35#include <linux/ceph/striper.h>
602adf40 36#include <linux/ceph/decode.h>
59c2be1e 37#include <linux/parser.h>
30d1cff8 38#include <linux/bsearch.h>
602adf40
YS
39
40#include <linux/kernel.h>
41#include <linux/device.h>
42#include <linux/module.h>
7ad18afa 43#include <linux/blk-mq.h>
602adf40
YS
44#include <linux/fs.h>
45#include <linux/blkdev.h>
1c2a9dfe 46#include <linux/slab.h>
f8a22fc2 47#include <linux/idr.h>
bc1ecc65 48#include <linux/workqueue.h>
602adf40
YS
49
50#include "rbd_types.h"
51
aafb230e
AE
52#define RBD_DEBUG /* Activate rbd_assert() calls */
53
a2acd00e
AE
54/*
55 * Increment the given counter and return its updated value.
56 * If the counter is already 0 it will not be incremented.
57 * If the counter is already at its maximum value returns
58 * -EINVAL without updating it.
59 */
60static int atomic_inc_return_safe(atomic_t *v)
61{
62 unsigned int counter;
63
bfc18e38 64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
a2acd00e
AE
65 if (counter <= (unsigned int)INT_MAX)
66 return (int)counter;
67
68 atomic_dec(v);
69
70 return -EINVAL;
71}
72
73/* Decrement the counter. Return the resulting value, or -EINVAL */
74static int atomic_dec_return_safe(atomic_t *v)
75{
76 int counter;
77
78 counter = atomic_dec_return(v);
79 if (counter >= 0)
80 return counter;
81
82 atomic_inc(v);
83
84 return -EINVAL;
85}
86
f0f8cef5 87#define RBD_DRV_NAME "rbd"
602adf40 88
7e513d43
ID
89#define RBD_MINORS_PER_MAJOR 256
90#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 91
6d69bb53
ID
92#define RBD_MAX_PARENT_CHAIN_LEN 16
93
d4b125e9
AE
94#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95#define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
35d489f9 98#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
99
100#define RBD_SNAP_HEAD_NAME "-"
101
9682fc6d
AE
102#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
103
9e15b77d
AE
104/* This allows a single page to hold an image name sent by OSD */
105#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 106#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 107
1e130199 108#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 109
ed95b21a 110#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
111#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
112
d889140c
AE
113/* Feature bits */
114
8767b293
ID
115#define RBD_FEATURE_LAYERING (1ULL<<0)
116#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118#define RBD_FEATURE_DATA_POOL (1ULL<<7)
e573427a 119#define RBD_FEATURE_OPERATIONS (1ULL<<8)
8767b293 120
ed95b21a
ID
121#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
122 RBD_FEATURE_STRIPINGV2 | \
7e97332e 123 RBD_FEATURE_EXCLUSIVE_LOCK | \
e573427a
ID
124 RBD_FEATURE_DATA_POOL | \
125 RBD_FEATURE_OPERATIONS)
d889140c
AE
126
127/* Features supported by this (client software) implementation. */
128
770eba6e 129#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 130
81a89793
AE
131/*
132 * An RBD device name will be "rbd#", where the "rbd" comes from
133 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 134 */
602adf40
YS
135#define DEV_NAME_LEN 32
136
137/*
138 * block device image metadata (in-memory version)
139 */
140struct rbd_image_header {
f35a4dee 141 /* These six fields never change for a given rbd image */
849b4260 142 char *object_prefix;
602adf40 143 __u8 obj_order;
f35a4dee
AE
144 u64 stripe_unit;
145 u64 stripe_count;
7e97332e 146 s64 data_pool_id;
f35a4dee 147 u64 features; /* Might be changeable someday? */
602adf40 148
f84344f3
AE
149 /* The remaining fields need to be updated occasionally */
150 u64 image_size;
151 struct ceph_snap_context *snapc;
f35a4dee
AE
152 char *snap_names; /* format 1 only */
153 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
154};
155
0d7dbfce
AE
156/*
157 * An rbd image specification.
158 *
159 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
160 * identify an image. Each rbd_dev structure includes a pointer to
161 * an rbd_spec structure that encapsulates this identity.
162 *
163 * Each of the id's in an rbd_spec has an associated name. For a
164 * user-mapped image, the names are supplied and the id's associated
165 * with them are looked up. For a layered image, a parent image is
166 * defined by the tuple, and the names are looked up.
167 *
168 * An rbd_dev structure contains a parent_spec pointer which is
169 * non-null if the image it represents is a child in a layered
170 * image. This pointer will refer to the rbd_spec structure used
171 * by the parent rbd_dev for its own identity (i.e., the structure
172 * is shared between the parent and child).
173 *
174 * Since these structures are populated once, during the discovery
175 * phase of image construction, they are effectively immutable so
176 * we make no effort to synchronize access to them.
177 *
178 * Note that code herein does not assume the image name is known (it
179 * could be a null pointer).
0d7dbfce
AE
180 */
181struct rbd_spec {
182 u64 pool_id;
ecb4dc22 183 const char *pool_name;
b26c047b 184 const char *pool_ns; /* NULL if default, never "" */
0d7dbfce 185
ecb4dc22
AE
186 const char *image_id;
187 const char *image_name;
0d7dbfce
AE
188
189 u64 snap_id;
ecb4dc22 190 const char *snap_name;
0d7dbfce
AE
191
192 struct kref kref;
193};
194
602adf40 195/*
f0f8cef5 196 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
197 */
198struct rbd_client {
199 struct ceph_client *client;
200 struct kref kref;
201 struct list_head node;
202};
203
bf0d5f50 204struct rbd_img_request;
bf0d5f50 205
9969ebc5 206enum obj_request_type {
a1fbb5e7 207 OBJ_REQUEST_NODATA = 1,
5359a17d 208 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
7e07efb1 209 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
afb97888 210 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
9969ebc5 211};
bf0d5f50 212
6d2940c8 213enum obj_operation_type {
a1fbb5e7 214 OBJ_OP_READ = 1,
6d2940c8 215 OBJ_OP_WRITE,
90e98c52 216 OBJ_OP_DISCARD,
6484cbe9 217 OBJ_OP_ZEROOUT,
6d2940c8
GZ
218};
219
3da691bf
ID
220/*
221 * Writes go through the following state machine to deal with
222 * layering:
223 *
224 * need copyup
225 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
226 * | ^ |
227 * v \------------------------------/
228 * done
229 * ^
230 * |
231 * RBD_OBJ_WRITE_FLAT
232 *
233 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
234 * there is a parent or not.
235 */
236enum rbd_obj_write_state {
237 RBD_OBJ_WRITE_FLAT = 1,
238 RBD_OBJ_WRITE_GUARD,
239 RBD_OBJ_WRITE_COPYUP,
926f9b3f
AE
240};
241
bf0d5f50 242struct rbd_obj_request {
43df3d35 243 struct ceph_object_extent ex;
c5b5ef6c 244 union {
3da691bf
ID
245 bool tried_parent; /* for reads */
246 enum rbd_obj_write_state write_state; /* for writes */
c5b5ef6c 247 };
bf0d5f50 248
51c3509e 249 struct rbd_img_request *img_request;
86bd7998
ID
250 struct ceph_file_extent *img_extents;
251 u32 num_img_extents;
bf0d5f50 252
788e2df3 253 union {
5359a17d 254 struct ceph_bio_iter bio_pos;
788e2df3 255 struct {
7e07efb1
ID
256 struct ceph_bvec_iter bvec_pos;
257 u32 bvec_count;
afb97888 258 u32 bvec_idx;
788e2df3
AE
259 };
260 };
7e07efb1
ID
261 struct bio_vec *copyup_bvecs;
262 u32 copyup_bvec_count;
bf0d5f50
AE
263
264 struct ceph_osd_request *osd_req;
265
266 u64 xferred; /* bytes transferred */
1b83bef2 267 int result;
bf0d5f50 268
bf0d5f50
AE
269 struct kref kref;
270};
271
0c425248 272enum img_req_flags {
9849e986 273 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 274 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
275};
276
bf0d5f50 277struct rbd_img_request {
bf0d5f50 278 struct rbd_device *rbd_dev;
9bb0248d 279 enum obj_operation_type op_type;
ecc633ca 280 enum obj_request_type data_type;
0c425248 281 unsigned long flags;
bf0d5f50 282 union {
9849e986 283 u64 snap_id; /* for reads */
bf0d5f50 284 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
285 };
286 union {
287 struct request *rq; /* block request */
288 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 289 };
15961b44 290 spinlock_t completion_lock;
55f27e09 291 u64 xferred;/* aggregate bytes transferred */
a5a337d4 292 int result; /* first nonzero obj_request result */
bf0d5f50 293
43df3d35 294 struct list_head object_extents; /* obj_req.ex structs */
7114edac 295 u32 pending_count;
bf0d5f50
AE
296
297 struct kref kref;
298};
299
300#define for_each_obj_request(ireq, oreq) \
43df3d35 301 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 302#define for_each_obj_request_safe(ireq, oreq, n) \
43df3d35 303 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
bf0d5f50 304
99d16943
ID
305enum rbd_watch_state {
306 RBD_WATCH_STATE_UNREGISTERED,
307 RBD_WATCH_STATE_REGISTERED,
308 RBD_WATCH_STATE_ERROR,
309};
310
ed95b21a
ID
311enum rbd_lock_state {
312 RBD_LOCK_STATE_UNLOCKED,
313 RBD_LOCK_STATE_LOCKED,
314 RBD_LOCK_STATE_RELEASING,
315};
316
317/* WatchNotify::ClientId */
318struct rbd_client_id {
319 u64 gid;
320 u64 handle;
321};
322
f84344f3 323struct rbd_mapping {
99c1f08f 324 u64 size;
34b13184 325 u64 features;
f84344f3
AE
326};
327
602adf40
YS
328/*
329 * a single device
330 */
331struct rbd_device {
de71a297 332 int dev_id; /* blkdev unique id */
602adf40
YS
333
334 int major; /* blkdev assigned major */
dd82fff1 335 int minor;
602adf40 336 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 337
a30b71b9 338 u32 image_format; /* Either 1 or 2 */
602adf40
YS
339 struct rbd_client *rbd_client;
340
341 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342
b82d167b 343 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
344
345 struct rbd_image_header header;
b82d167b 346 unsigned long flags; /* possibly lock protected */
0d7dbfce 347 struct rbd_spec *spec;
d147543d 348 struct rbd_options *opts;
0d6d1e9c 349 char *config_info; /* add{,_single_major} string */
602adf40 350
c41d13a3 351 struct ceph_object_id header_oid;
922dab61 352 struct ceph_object_locator header_oloc;
971f839a 353
1643dfa4 354 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 355
99d16943
ID
356 struct mutex watch_mutex;
357 enum rbd_watch_state watch_state;
922dab61 358 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
359 u64 watch_cookie;
360 struct delayed_work watch_dwork;
59c2be1e 361
ed95b21a
ID
362 struct rw_semaphore lock_rwsem;
363 enum rbd_lock_state lock_state;
cbbfb0ff 364 char lock_cookie[32];
ed95b21a
ID
365 struct rbd_client_id owner_cid;
366 struct work_struct acquired_lock_work;
367 struct work_struct released_lock_work;
368 struct delayed_work lock_dwork;
369 struct work_struct unlock_work;
370 wait_queue_head_t lock_waitq;
371
1643dfa4 372 struct workqueue_struct *task_wq;
59c2be1e 373
86b00e0d
AE
374 struct rbd_spec *parent_spec;
375 u64 parent_overlap;
a2acd00e 376 atomic_t parent_ref;
2f82ee54 377 struct rbd_device *parent;
86b00e0d 378
7ad18afa
CH
379 /* Block layer tags. */
380 struct blk_mq_tag_set tag_set;
381
c666601a
JD
382 /* protects updating the header */
383 struct rw_semaphore header_rwsem;
f84344f3
AE
384
385 struct rbd_mapping mapping;
602adf40
YS
386
387 struct list_head node;
dfc5606d 388
dfc5606d
YS
389 /* sysfs related */
390 struct device dev;
b82d167b 391 unsigned long open_count; /* protected by lock */
dfc5606d
YS
392};
393
b82d167b 394/*
87c0fded
ID
395 * Flag bits for rbd_dev->flags:
396 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
397 * by rbd_dev->lock
398 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 399 */
6d292906
AE
400enum rbd_dev_flags {
401 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 402 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 403 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
404};
405
cfbf6377 406static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 407
602adf40 408static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
409static DEFINE_SPINLOCK(rbd_dev_list_lock);
410
432b8587
AE
411static LIST_HEAD(rbd_client_list); /* clients */
412static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 413
78c2a44a
AE
414/* Slab caches for frequently-allocated structures */
415
1c2a9dfe 416static struct kmem_cache *rbd_img_request_cache;
868311b1 417static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 418
9b60e70b 419static int rbd_major;
f8a22fc2
ID
420static DEFINE_IDA(rbd_dev_id_ida);
421
f5ee37bd
ID
422static struct workqueue_struct *rbd_wq;
423
9b60e70b 424/*
3cfa3b16 425 * single-major requires >= 0.75 version of userspace rbd utility.
9b60e70b 426 */
3cfa3b16 427static bool single_major = true;
5657a819 428module_param(single_major, bool, 0444);
3cfa3b16 429MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
9b60e70b 430
f0f8cef5
AE
431static ssize_t rbd_add(struct bus_type *bus, const char *buf,
432 size_t count);
433static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
434 size_t count);
9b60e70b
ID
435static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
436 size_t count);
437static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
438 size_t count);
6d69bb53 439static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
f0f8cef5 440
9b60e70b
ID
441static int rbd_dev_id_to_minor(int dev_id)
442{
7e513d43 443 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
444}
445
446static int minor_to_rbd_dev_id(int minor)
447{
7e513d43 448 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
449}
450
ed95b21a
ID
451static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
452{
453 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
454 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
455}
456
457static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
458{
459 bool is_lock_owner;
460
461 down_read(&rbd_dev->lock_rwsem);
462 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
463 up_read(&rbd_dev->lock_rwsem);
464 return is_lock_owner;
465}
466
8767b293
ID
467static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
468{
469 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
470}
471
5657a819
JP
472static BUS_ATTR(add, 0200, NULL, rbd_add);
473static BUS_ATTR(remove, 0200, NULL, rbd_remove);
474static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
475static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
476static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
b15a21dd
GKH
477
478static struct attribute *rbd_bus_attrs[] = {
479 &bus_attr_add.attr,
480 &bus_attr_remove.attr,
9b60e70b
ID
481 &bus_attr_add_single_major.attr,
482 &bus_attr_remove_single_major.attr,
8767b293 483 &bus_attr_supported_features.attr,
b15a21dd 484 NULL,
f0f8cef5 485};
92c76dc0
ID
486
487static umode_t rbd_bus_is_visible(struct kobject *kobj,
488 struct attribute *attr, int index)
489{
9b60e70b
ID
490 if (!single_major &&
491 (attr == &bus_attr_add_single_major.attr ||
492 attr == &bus_attr_remove_single_major.attr))
493 return 0;
494
92c76dc0
ID
495 return attr->mode;
496}
497
498static const struct attribute_group rbd_bus_group = {
499 .attrs = rbd_bus_attrs,
500 .is_visible = rbd_bus_is_visible,
501};
502__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
503
504static struct bus_type rbd_bus_type = {
505 .name = "rbd",
b15a21dd 506 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
507};
508
509static void rbd_root_dev_release(struct device *dev)
510{
511}
512
513static struct device rbd_root_dev = {
514 .init_name = "rbd",
515 .release = rbd_root_dev_release,
516};
517
06ecc6cb
AE
518static __printf(2, 3)
519void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
520{
521 struct va_format vaf;
522 va_list args;
523
524 va_start(args, fmt);
525 vaf.fmt = fmt;
526 vaf.va = &args;
527
528 if (!rbd_dev)
529 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
530 else if (rbd_dev->disk)
531 printk(KERN_WARNING "%s: %s: %pV\n",
532 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
533 else if (rbd_dev->spec && rbd_dev->spec->image_name)
534 printk(KERN_WARNING "%s: image %s: %pV\n",
535 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
536 else if (rbd_dev->spec && rbd_dev->spec->image_id)
537 printk(KERN_WARNING "%s: id %s: %pV\n",
538 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
539 else /* punt */
540 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
541 RBD_DRV_NAME, rbd_dev, &vaf);
542 va_end(args);
543}
544
aafb230e
AE
545#ifdef RBD_DEBUG
546#define rbd_assert(expr) \
547 if (unlikely(!(expr))) { \
548 printk(KERN_ERR "\nAssertion failure in %s() " \
549 "at line %d:\n\n" \
550 "\trbd_assert(%s);\n\n", \
551 __func__, __LINE__, #expr); \
552 BUG(); \
553 }
554#else /* !RBD_DEBUG */
555# define rbd_assert(expr) ((void) 0)
556#endif /* !RBD_DEBUG */
dfc5606d 557
05a46afd 558static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 559
cc4a38bd 560static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 561static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 562static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 563static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
564static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
565 u64 snap_id);
2ad3d716
AE
566static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
567 u8 *order, u64 *snap_size);
568static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
569 u64 *snap_features);
59c2be1e 570
602adf40
YS
571static int rbd_open(struct block_device *bdev, fmode_t mode)
572{
f0f8cef5 573 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 574 bool removing = false;
602adf40 575
a14ea269 576 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
577 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
578 removing = true;
579 else
580 rbd_dev->open_count++;
a14ea269 581 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
582 if (removing)
583 return -ENOENT;
584
c3e946ce 585 (void) get_device(&rbd_dev->dev);
340c7a2b 586
602adf40
YS
587 return 0;
588}
589
db2a144b 590static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
591{
592 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
593 unsigned long open_count_before;
594
a14ea269 595 spin_lock_irq(&rbd_dev->lock);
b82d167b 596 open_count_before = rbd_dev->open_count--;
a14ea269 597 spin_unlock_irq(&rbd_dev->lock);
b82d167b 598 rbd_assert(open_count_before > 0);
dfc5606d 599
c3e946ce 600 put_device(&rbd_dev->dev);
dfc5606d
YS
601}
602
131fd9f6
GZ
603static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
604{
1de797bb 605 int ro;
131fd9f6 606
1de797bb 607 if (get_user(ro, (int __user *)arg))
131fd9f6
GZ
608 return -EFAULT;
609
1de797bb 610 /* Snapshots can't be marked read-write */
131fd9f6
GZ
611 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
612 return -EROFS;
613
1de797bb
ID
614 /* Let blkdev_roset() handle it */
615 return -ENOTTY;
131fd9f6
GZ
616}
617
618static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
619 unsigned int cmd, unsigned long arg)
620{
621 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
1de797bb 622 int ret;
131fd9f6 623
131fd9f6
GZ
624 switch (cmd) {
625 case BLKROSET:
626 ret = rbd_ioctl_set_ro(rbd_dev, arg);
627 break;
628 default:
629 ret = -ENOTTY;
630 }
631
131fd9f6
GZ
632 return ret;
633}
634
635#ifdef CONFIG_COMPAT
636static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
637 unsigned int cmd, unsigned long arg)
638{
639 return rbd_ioctl(bdev, mode, cmd, arg);
640}
641#endif /* CONFIG_COMPAT */
642
602adf40
YS
643static const struct block_device_operations rbd_bd_ops = {
644 .owner = THIS_MODULE,
645 .open = rbd_open,
dfc5606d 646 .release = rbd_release,
131fd9f6
GZ
647 .ioctl = rbd_ioctl,
648#ifdef CONFIG_COMPAT
649 .compat_ioctl = rbd_compat_ioctl,
650#endif
602adf40
YS
651};
652
653/*
7262cfca 654 * Initialize an rbd client instance. Success or not, this function
cfbf6377 655 * consumes ceph_opts. Caller holds client_mutex.
602adf40 656 */
f8c38929 657static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
658{
659 struct rbd_client *rbdc;
660 int ret = -ENOMEM;
661
37206ee5 662 dout("%s:\n", __func__);
602adf40
YS
663 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
664 if (!rbdc)
665 goto out_opt;
666
667 kref_init(&rbdc->kref);
668 INIT_LIST_HEAD(&rbdc->node);
669
74da4a0f 670 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 671 if (IS_ERR(rbdc->client))
08f75463 672 goto out_rbdc;
43ae4701 673 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
674
675 ret = ceph_open_session(rbdc->client);
676 if (ret < 0)
08f75463 677 goto out_client;
602adf40 678
432b8587 679 spin_lock(&rbd_client_list_lock);
602adf40 680 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 681 spin_unlock(&rbd_client_list_lock);
602adf40 682
37206ee5 683 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 684
602adf40 685 return rbdc;
08f75463 686out_client:
602adf40 687 ceph_destroy_client(rbdc->client);
08f75463 688out_rbdc:
602adf40
YS
689 kfree(rbdc);
690out_opt:
43ae4701
AE
691 if (ceph_opts)
692 ceph_destroy_options(ceph_opts);
37206ee5
AE
693 dout("%s: error %d\n", __func__, ret);
694
28f259b7 695 return ERR_PTR(ret);
602adf40
YS
696}
697
2f82ee54
AE
698static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
699{
700 kref_get(&rbdc->kref);
701
702 return rbdc;
703}
704
602adf40 705/*
1f7ba331
AE
706 * Find a ceph client with specific addr and configuration. If
707 * found, bump its reference count.
602adf40 708 */
1f7ba331 709static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
710{
711 struct rbd_client *client_node;
1f7ba331 712 bool found = false;
602adf40 713
43ae4701 714 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
715 return NULL;
716
1f7ba331
AE
717 spin_lock(&rbd_client_list_lock);
718 list_for_each_entry(client_node, &rbd_client_list, node) {
719 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
720 __rbd_get_client(client_node);
721
1f7ba331
AE
722 found = true;
723 break;
724 }
725 }
726 spin_unlock(&rbd_client_list_lock);
727
728 return found ? client_node : NULL;
602adf40
YS
729}
730
59c2be1e 731/*
210c104c 732 * (Per device) rbd map options
59c2be1e
YS
733 */
734enum {
b5584180 735 Opt_queue_depth,
0c93e1b7 736 Opt_alloc_size,
34f55d0b 737 Opt_lock_timeout,
59c2be1e
YS
738 Opt_last_int,
739 /* int args above */
b26c047b 740 Opt_pool_ns,
59c2be1e
YS
741 Opt_last_string,
742 /* string args above */
cc0538b6
AE
743 Opt_read_only,
744 Opt_read_write,
80de1912 745 Opt_lock_on_read,
e010dd0a 746 Opt_exclusive,
d9360540 747 Opt_notrim,
210c104c 748 Opt_err
59c2be1e
YS
749};
750
43ae4701 751static match_table_t rbd_opts_tokens = {
b5584180 752 {Opt_queue_depth, "queue_depth=%d"},
0c93e1b7 753 {Opt_alloc_size, "alloc_size=%d"},
34f55d0b 754 {Opt_lock_timeout, "lock_timeout=%d"},
59c2be1e 755 /* int args above */
b26c047b 756 {Opt_pool_ns, "_pool_ns=%s"},
59c2be1e 757 /* string args above */
be466c1c 758 {Opt_read_only, "read_only"},
cc0538b6
AE
759 {Opt_read_only, "ro"}, /* Alternate spelling */
760 {Opt_read_write, "read_write"},
761 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 762 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 763 {Opt_exclusive, "exclusive"},
d9360540 764 {Opt_notrim, "notrim"},
210c104c 765 {Opt_err, NULL}
59c2be1e
YS
766};
767
98571b5a 768struct rbd_options {
b5584180 769 int queue_depth;
0c93e1b7 770 int alloc_size;
34f55d0b 771 unsigned long lock_timeout;
98571b5a 772 bool read_only;
80de1912 773 bool lock_on_read;
e010dd0a 774 bool exclusive;
d9360540 775 bool trim;
98571b5a
AE
776};
777
b5584180 778#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
0c93e1b7 779#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
34f55d0b 780#define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
98571b5a 781#define RBD_READ_ONLY_DEFAULT false
80de1912 782#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 783#define RBD_EXCLUSIVE_DEFAULT false
d9360540 784#define RBD_TRIM_DEFAULT true
98571b5a 785
c300156b
ID
786struct parse_rbd_opts_ctx {
787 struct rbd_spec *spec;
788 struct rbd_options *opts;
789};
790
59c2be1e
YS
791static int parse_rbd_opts_token(char *c, void *private)
792{
c300156b 793 struct parse_rbd_opts_ctx *pctx = private;
59c2be1e
YS
794 substring_t argstr[MAX_OPT_ARGS];
795 int token, intval, ret;
796
43ae4701 797 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
798 if (token < Opt_last_int) {
799 ret = match_int(&argstr[0], &intval);
800 if (ret < 0) {
2f56b6ba 801 pr_err("bad option arg (not int) at '%s'\n", c);
59c2be1e
YS
802 return ret;
803 }
804 dout("got int token %d val %d\n", token, intval);
805 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 806 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
807 } else {
808 dout("got token %d\n", token);
809 }
810
811 switch (token) {
b5584180
ID
812 case Opt_queue_depth:
813 if (intval < 1) {
814 pr_err("queue_depth out of range\n");
815 return -EINVAL;
816 }
c300156b 817 pctx->opts->queue_depth = intval;
b5584180 818 break;
0c93e1b7
ID
819 case Opt_alloc_size:
820 if (intval < 1) {
821 pr_err("alloc_size out of range\n");
822 return -EINVAL;
823 }
824 if (!is_power_of_2(intval)) {
825 pr_err("alloc_size must be a power of 2\n");
826 return -EINVAL;
827 }
828 pctx->opts->alloc_size = intval;
829 break;
34f55d0b
DY
830 case Opt_lock_timeout:
831 /* 0 is "wait forever" (i.e. infinite timeout) */
832 if (intval < 0 || intval > INT_MAX / 1000) {
833 pr_err("lock_timeout out of range\n");
834 return -EINVAL;
835 }
c300156b 836 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
34f55d0b 837 break;
b26c047b
ID
838 case Opt_pool_ns:
839 kfree(pctx->spec->pool_ns);
840 pctx->spec->pool_ns = match_strdup(argstr);
841 if (!pctx->spec->pool_ns)
842 return -ENOMEM;
34f55d0b 843 break;
cc0538b6 844 case Opt_read_only:
c300156b 845 pctx->opts->read_only = true;
cc0538b6
AE
846 break;
847 case Opt_read_write:
c300156b 848 pctx->opts->read_only = false;
cc0538b6 849 break;
80de1912 850 case Opt_lock_on_read:
c300156b 851 pctx->opts->lock_on_read = true;
80de1912 852 break;
e010dd0a 853 case Opt_exclusive:
c300156b 854 pctx->opts->exclusive = true;
e010dd0a 855 break;
d9360540 856 case Opt_notrim:
c300156b 857 pctx->opts->trim = false;
d9360540 858 break;
59c2be1e 859 default:
210c104c
ID
860 /* libceph prints "bad option" msg */
861 return -EINVAL;
59c2be1e 862 }
210c104c 863
59c2be1e
YS
864 return 0;
865}
866
6d2940c8
GZ
867static char* obj_op_name(enum obj_operation_type op_type)
868{
869 switch (op_type) {
870 case OBJ_OP_READ:
871 return "read";
872 case OBJ_OP_WRITE:
873 return "write";
90e98c52
GZ
874 case OBJ_OP_DISCARD:
875 return "discard";
6484cbe9
ID
876 case OBJ_OP_ZEROOUT:
877 return "zeroout";
6d2940c8
GZ
878 default:
879 return "???";
880 }
881}
882
602adf40
YS
883/*
884 * Destroy ceph client
d23a4b3f 885 *
432b8587 886 * Caller must hold rbd_client_list_lock.
602adf40
YS
887 */
888static void rbd_client_release(struct kref *kref)
889{
890 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
891
37206ee5 892 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 893 spin_lock(&rbd_client_list_lock);
602adf40 894 list_del(&rbdc->node);
cd9d9f5d 895 spin_unlock(&rbd_client_list_lock);
602adf40
YS
896
897 ceph_destroy_client(rbdc->client);
898 kfree(rbdc);
899}
900
901/*
902 * Drop reference to ceph client node. If it's not referenced anymore, release
903 * it.
904 */
9d3997fd 905static void rbd_put_client(struct rbd_client *rbdc)
602adf40 906{
c53d5893
AE
907 if (rbdc)
908 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
909}
910
dd435855
ID
911static int wait_for_latest_osdmap(struct ceph_client *client)
912{
913 u64 newest_epoch;
914 int ret;
915
916 ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
917 if (ret)
918 return ret;
919
920 if (client->osdc.osdmap->epoch >= newest_epoch)
921 return 0;
922
923 ceph_osdc_maybe_request_map(&client->osdc);
924 return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
925 client->options->mount_timeout);
926}
927
5feb0d8d
ID
928/*
929 * Get a ceph client with specific addr and configuration, if one does
930 * not exist create it. Either way, ceph_opts is consumed by this
931 * function.
932 */
933static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
934{
935 struct rbd_client *rbdc;
dd435855 936 int ret;
5feb0d8d
ID
937
938 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
939 rbdc = rbd_client_find(ceph_opts);
dd435855 940 if (rbdc) {
5feb0d8d 941 ceph_destroy_options(ceph_opts);
dd435855
ID
942
943 /*
944 * Using an existing client. Make sure ->pg_pools is up to
945 * date before we look up the pool id in do_rbd_add().
946 */
947 ret = wait_for_latest_osdmap(rbdc->client);
948 if (ret) {
949 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
950 rbd_put_client(rbdc);
951 rbdc = ERR_PTR(ret);
952 }
953 } else {
5feb0d8d 954 rbdc = rbd_client_create(ceph_opts);
dd435855 955 }
5feb0d8d
ID
956 mutex_unlock(&client_mutex);
957
958 return rbdc;
959}
960
a30b71b9
AE
961static bool rbd_image_format_valid(u32 image_format)
962{
963 return image_format == 1 || image_format == 2;
964}
965
8e94af8e
AE
966static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
967{
103a150f
AE
968 size_t size;
969 u32 snap_count;
970
971 /* The header has to start with the magic rbd header text */
972 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
973 return false;
974
db2388b6
AE
975 /* The bio layer requires at least sector-sized I/O */
976
977 if (ondisk->options.order < SECTOR_SHIFT)
978 return false;
979
980 /* If we use u64 in a few spots we may be able to loosen this */
981
982 if (ondisk->options.order > 8 * sizeof (int) - 1)
983 return false;
984
103a150f
AE
985 /*
986 * The size of a snapshot header has to fit in a size_t, and
987 * that limits the number of snapshots.
988 */
989 snap_count = le32_to_cpu(ondisk->snap_count);
990 size = SIZE_MAX - sizeof (struct ceph_snap_context);
991 if (snap_count > size / sizeof (__le64))
992 return false;
993
994 /*
995 * Not only that, but the size of the entire the snapshot
996 * header must also be representable in a size_t.
997 */
998 size -= snap_count * sizeof (__le64);
999 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1000 return false;
1001
1002 return true;
8e94af8e
AE
1003}
1004
5bc3fb17
ID
1005/*
1006 * returns the size of an object in the image
1007 */
1008static u32 rbd_obj_bytes(struct rbd_image_header *header)
1009{
1010 return 1U << header->obj_order;
1011}
1012
263423f8
ID
1013static void rbd_init_layout(struct rbd_device *rbd_dev)
1014{
1015 if (rbd_dev->header.stripe_unit == 0 ||
1016 rbd_dev->header.stripe_count == 0) {
1017 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1018 rbd_dev->header.stripe_count = 1;
1019 }
1020
1021 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1022 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1023 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
1024 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1025 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
1026 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1027}
1028
602adf40 1029/*
bb23e37a
AE
1030 * Fill an rbd image header with information from the given format 1
1031 * on-disk header.
602adf40 1032 */
662518b1 1033static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 1034 struct rbd_image_header_ondisk *ondisk)
602adf40 1035{
662518b1 1036 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
1037 bool first_time = header->object_prefix == NULL;
1038 struct ceph_snap_context *snapc;
1039 char *object_prefix = NULL;
1040 char *snap_names = NULL;
1041 u64 *snap_sizes = NULL;
ccece235 1042 u32 snap_count;
bb23e37a 1043 int ret = -ENOMEM;
621901d6 1044 u32 i;
602adf40 1045
bb23e37a 1046 /* Allocate this now to avoid having to handle failure below */
6a52325f 1047
bb23e37a 1048 if (first_time) {
848d796c
ID
1049 object_prefix = kstrndup(ondisk->object_prefix,
1050 sizeof(ondisk->object_prefix),
1051 GFP_KERNEL);
bb23e37a
AE
1052 if (!object_prefix)
1053 return -ENOMEM;
bb23e37a 1054 }
00f1f36f 1055
bb23e37a 1056 /* Allocate the snapshot context and fill it in */
00f1f36f 1057
bb23e37a
AE
1058 snap_count = le32_to_cpu(ondisk->snap_count);
1059 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1060 if (!snapc)
1061 goto out_err;
1062 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1063 if (snap_count) {
bb23e37a 1064 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1065 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1066
bb23e37a 1067 /* We'll keep a copy of the snapshot names... */
621901d6 1068
bb23e37a
AE
1069 if (snap_names_len > (u64)SIZE_MAX)
1070 goto out_2big;
1071 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1072 if (!snap_names)
6a52325f
AE
1073 goto out_err;
1074
bb23e37a 1075 /* ...as well as the array of their sizes. */
88a25a5f
ME
1076 snap_sizes = kmalloc_array(snap_count,
1077 sizeof(*header->snap_sizes),
1078 GFP_KERNEL);
bb23e37a 1079 if (!snap_sizes)
6a52325f 1080 goto out_err;
bb23e37a 1081
f785cc1d 1082 /*
bb23e37a
AE
1083 * Copy the names, and fill in each snapshot's id
1084 * and size.
1085 *
99a41ebc 1086 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1087 * ondisk buffer we're working with has
f785cc1d
AE
1088 * snap_names_len bytes beyond the end of the
1089 * snapshot id array, this memcpy() is safe.
1090 */
bb23e37a
AE
1091 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1092 snaps = ondisk->snaps;
1093 for (i = 0; i < snap_count; i++) {
1094 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1095 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1096 }
602adf40 1097 }
6a52325f 1098
bb23e37a 1099 /* We won't fail any more, fill in the header */
621901d6 1100
bb23e37a
AE
1101 if (first_time) {
1102 header->object_prefix = object_prefix;
1103 header->obj_order = ondisk->options.order;
263423f8 1104 rbd_init_layout(rbd_dev);
602adf40 1105 } else {
662518b1
AE
1106 ceph_put_snap_context(header->snapc);
1107 kfree(header->snap_names);
1108 kfree(header->snap_sizes);
602adf40 1109 }
849b4260 1110
bb23e37a 1111 /* The remaining fields always get updated (when we refresh) */
621901d6 1112
f84344f3 1113 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1114 header->snapc = snapc;
1115 header->snap_names = snap_names;
1116 header->snap_sizes = snap_sizes;
468521c1 1117
602adf40 1118 return 0;
bb23e37a
AE
1119out_2big:
1120 ret = -EIO;
6a52325f 1121out_err:
bb23e37a
AE
1122 kfree(snap_sizes);
1123 kfree(snap_names);
1124 ceph_put_snap_context(snapc);
1125 kfree(object_prefix);
ccece235 1126
bb23e37a 1127 return ret;
602adf40
YS
1128}
1129
9682fc6d
AE
1130static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1131{
1132 const char *snap_name;
1133
1134 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1135
1136 /* Skip over names until we find the one we are looking for */
1137
1138 snap_name = rbd_dev->header.snap_names;
1139 while (which--)
1140 snap_name += strlen(snap_name) + 1;
1141
1142 return kstrdup(snap_name, GFP_KERNEL);
1143}
1144
30d1cff8
AE
1145/*
1146 * Snapshot id comparison function for use with qsort()/bsearch().
1147 * Note that result is for snapshots in *descending* order.
1148 */
1149static int snapid_compare_reverse(const void *s1, const void *s2)
1150{
1151 u64 snap_id1 = *(u64 *)s1;
1152 u64 snap_id2 = *(u64 *)s2;
1153
1154 if (snap_id1 < snap_id2)
1155 return 1;
1156 return snap_id1 == snap_id2 ? 0 : -1;
1157}
1158
1159/*
1160 * Search a snapshot context to see if the given snapshot id is
1161 * present.
1162 *
1163 * Returns the position of the snapshot id in the array if it's found,
1164 * or BAD_SNAP_INDEX otherwise.
1165 *
1166 * Note: The snapshot array is in kept sorted (by the osd) in
1167 * reverse order, highest snapshot id first.
1168 */
9682fc6d
AE
1169static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1170{
1171 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1172 u64 *found;
9682fc6d 1173
30d1cff8
AE
1174 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1175 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1176
30d1cff8 1177 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1178}
1179
2ad3d716
AE
1180static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1181 u64 snap_id)
9e15b77d 1182{
54cac61f 1183 u32 which;
da6a6b63 1184 const char *snap_name;
9e15b77d 1185
54cac61f
AE
1186 which = rbd_dev_snap_index(rbd_dev, snap_id);
1187 if (which == BAD_SNAP_INDEX)
da6a6b63 1188 return ERR_PTR(-ENOENT);
54cac61f 1189
da6a6b63
JD
1190 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1191 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1192}
1193
1194static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1195{
9e15b77d
AE
1196 if (snap_id == CEPH_NOSNAP)
1197 return RBD_SNAP_HEAD_NAME;
1198
54cac61f
AE
1199 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1200 if (rbd_dev->image_format == 1)
1201 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1202
54cac61f 1203 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1204}
1205
2ad3d716
AE
1206static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1207 u64 *snap_size)
602adf40 1208{
2ad3d716
AE
1209 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1210 if (snap_id == CEPH_NOSNAP) {
1211 *snap_size = rbd_dev->header.image_size;
1212 } else if (rbd_dev->image_format == 1) {
1213 u32 which;
602adf40 1214
2ad3d716
AE
1215 which = rbd_dev_snap_index(rbd_dev, snap_id);
1216 if (which == BAD_SNAP_INDEX)
1217 return -ENOENT;
e86924a8 1218
2ad3d716
AE
1219 *snap_size = rbd_dev->header.snap_sizes[which];
1220 } else {
1221 u64 size = 0;
1222 int ret;
1223
1224 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1225 if (ret)
1226 return ret;
1227
1228 *snap_size = size;
1229 }
1230 return 0;
602adf40
YS
1231}
1232
2ad3d716
AE
1233static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1234 u64 *snap_features)
602adf40 1235{
2ad3d716
AE
1236 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1237 if (snap_id == CEPH_NOSNAP) {
1238 *snap_features = rbd_dev->header.features;
1239 } else if (rbd_dev->image_format == 1) {
1240 *snap_features = 0; /* No features for format 1 */
602adf40 1241 } else {
2ad3d716
AE
1242 u64 features = 0;
1243 int ret;
8b0241f8 1244
2ad3d716
AE
1245 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1246 if (ret)
1247 return ret;
1248
1249 *snap_features = features;
1250 }
1251 return 0;
1252}
1253
1254static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1255{
8f4b7d98 1256 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1257 u64 size = 0;
1258 u64 features = 0;
1259 int ret;
1260
2ad3d716
AE
1261 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1262 if (ret)
1263 return ret;
1264 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1265 if (ret)
1266 return ret;
1267
1268 rbd_dev->mapping.size = size;
1269 rbd_dev->mapping.features = features;
1270
8b0241f8 1271 return 0;
602adf40
YS
1272}
1273
d1cf5788
AE
1274static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1275{
1276 rbd_dev->mapping.size = 0;
1277 rbd_dev->mapping.features = 0;
200a6a8b
AE
1278}
1279
5359a17d 1280static void zero_bvec(struct bio_vec *bv)
602adf40 1281{
602adf40 1282 void *buf;
5359a17d 1283 unsigned long flags;
602adf40 1284
5359a17d
ID
1285 buf = bvec_kmap_irq(bv, &flags);
1286 memset(buf, 0, bv->bv_len);
1287 flush_dcache_page(bv->bv_page);
1288 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1289}
1290
5359a17d 1291static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
b9434c5b 1292{
5359a17d 1293 struct ceph_bio_iter it = *bio_pos;
b9434c5b 1294
5359a17d
ID
1295 ceph_bio_iter_advance(&it, off);
1296 ceph_bio_iter_advance_step(&it, bytes, ({
1297 zero_bvec(&bv);
1298 }));
b9434c5b
AE
1299}
1300
7e07efb1 1301static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
602adf40 1302{
7e07efb1 1303 struct ceph_bvec_iter it = *bvec_pos;
602adf40 1304
7e07efb1
ID
1305 ceph_bvec_iter_advance(&it, off);
1306 ceph_bvec_iter_advance_step(&it, bytes, ({
1307 zero_bvec(&bv);
1308 }));
f7760dad
AE
1309}
1310
1311/*
3da691bf 1312 * Zero a range in @obj_req data buffer defined by a bio (list) or
afb97888 1313 * (private) bio_vec array.
f7760dad 1314 *
3da691bf 1315 * @off is relative to the start of the data buffer.
926f9b3f 1316 */
3da691bf
ID
1317static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1318 u32 bytes)
926f9b3f 1319{
ecc633ca 1320 switch (obj_req->img_request->data_type) {
3da691bf
ID
1321 case OBJ_REQUEST_BIO:
1322 zero_bios(&obj_req->bio_pos, off, bytes);
1323 break;
1324 case OBJ_REQUEST_BVECS:
afb97888 1325 case OBJ_REQUEST_OWN_BVECS:
3da691bf
ID
1326 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1327 break;
1328 default:
1329 rbd_assert(0);
6365d33a
AE
1330 }
1331}
1332
bf0d5f50
AE
1333static void rbd_obj_request_destroy(struct kref *kref);
1334static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1335{
1336 rbd_assert(obj_request != NULL);
37206ee5 1337 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1338 kref_read(&obj_request->kref));
bf0d5f50
AE
1339 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1340}
1341
0f2d5be7
AE
1342static void rbd_img_request_get(struct rbd_img_request *img_request)
1343{
1344 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1345 kref_read(&img_request->kref));
0f2d5be7
AE
1346 kref_get(&img_request->kref);
1347}
1348
bf0d5f50
AE
1349static void rbd_img_request_destroy(struct kref *kref);
1350static void rbd_img_request_put(struct rbd_img_request *img_request)
1351{
1352 rbd_assert(img_request != NULL);
37206ee5 1353 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1354 kref_read(&img_request->kref));
e93aca0a 1355 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1356}
1357
1358static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1359 struct rbd_obj_request *obj_request)
1360{
25dcf954
AE
1361 rbd_assert(obj_request->img_request == NULL);
1362
b155e86c 1363 /* Image request now owns object's original reference */
bf0d5f50 1364 obj_request->img_request = img_request;
7114edac 1365 img_request->pending_count++;
15961b44 1366 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
bf0d5f50
AE
1367}
1368
1369static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1370 struct rbd_obj_request *obj_request)
1371{
15961b44 1372 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
43df3d35 1373 list_del(&obj_request->ex.oe_item);
bf0d5f50 1374 rbd_assert(obj_request->img_request == img_request);
bf0d5f50
AE
1375 rbd_obj_request_put(obj_request);
1376}
1377
980917fc 1378static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1379{
980917fc
ID
1380 struct ceph_osd_request *osd_req = obj_request->osd_req;
1381
a90bb0c1 1382 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
43df3d35
ID
1383 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1384 obj_request->ex.oe_len, osd_req);
980917fc 1385 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1386}
1387
0c425248
AE
1388/*
1389 * The default/initial value for all image request flags is 0. Each
1390 * is conditionally set to 1 at image request initialization time
1391 * and currently never change thereafter.
1392 */
d0b2e944
AE
1393static void img_request_layered_set(struct rbd_img_request *img_request)
1394{
1395 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1396 smp_mb();
1397}
1398
a2acd00e
AE
1399static void img_request_layered_clear(struct rbd_img_request *img_request)
1400{
1401 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1402 smp_mb();
1403}
1404
d0b2e944
AE
1405static bool img_request_layered_test(struct rbd_img_request *img_request)
1406{
1407 smp_mb();
1408 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1409}
1410
3da691bf 1411static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
6e2a4505 1412{
3da691bf 1413 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
b9434c5b 1414
43df3d35
ID
1415 return !obj_req->ex.oe_off &&
1416 obj_req->ex.oe_len == rbd_dev->layout.object_size;
6e2a4505
AE
1417}
1418
3da691bf 1419static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
bf0d5f50 1420{
3da691bf 1421 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
bf0d5f50 1422
43df3d35 1423 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
3da691bf 1424 rbd_dev->layout.object_size;
0dcc685e
ID
1425}
1426
86bd7998 1427static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
bf0d5f50 1428{
86bd7998
ID
1429 return ceph_file_extents_bytes(obj_req->img_extents,
1430 obj_req->num_img_extents);
bf0d5f50
AE
1431}
1432
3da691bf 1433static bool rbd_img_is_write(struct rbd_img_request *img_req)
bf0d5f50 1434{
9bb0248d 1435 switch (img_req->op_type) {
3da691bf
ID
1436 case OBJ_OP_READ:
1437 return false;
1438 case OBJ_OP_WRITE:
1439 case OBJ_OP_DISCARD:
6484cbe9 1440 case OBJ_OP_ZEROOUT:
3da691bf
ID
1441 return true;
1442 default:
c6244b3b 1443 BUG();
3da691bf 1444 }
90e98c52
GZ
1445}
1446
3da691bf 1447static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
2761713d 1448
85e084fe 1449static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50 1450{
3da691bf 1451 struct rbd_obj_request *obj_req = osd_req->r_priv;
bf0d5f50 1452
3da691bf
ID
1453 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1454 osd_req->r_result, obj_req);
1455 rbd_assert(osd_req == obj_req->osd_req);
bf0d5f50 1456
3da691bf
ID
1457 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1458 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1459 obj_req->xferred = osd_req->r_result;
1460 else
1461 /*
1462 * Writes aren't allowed to return a data payload. In some
1463 * guarded write cases (e.g. stat + zero on an empty object)
1464 * a stat response makes it through, but we don't care.
1465 */
1466 obj_req->xferred = 0;
bf0d5f50 1467
3da691bf 1468 rbd_obj_handle_request(obj_req);
bf0d5f50
AE
1469}
1470
9d4df01f 1471static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1472{
8c042b0d 1473 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1474
a162b308 1475 osd_req->r_flags = CEPH_OSD_FLAG_READ;
7c84883a 1476 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1477}
1478
1479static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1480{
9d4df01f 1481 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1482
a162b308 1483 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
fac02ddf 1484 ktime_get_real_ts64(&osd_req->r_mtime);
43df3d35 1485 osd_req->r_data_offset = obj_request->ex.oe_off;
430c28c3
AE
1486}
1487
bc81207e 1488static struct ceph_osd_request *
a162b308 1489rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
bc81207e 1490{
a162b308
ID
1491 struct rbd_img_request *img_req = obj_req->img_request;
1492 struct rbd_device *rbd_dev = img_req->rbd_dev;
bc81207e
ID
1493 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1494 struct ceph_osd_request *req;
a90bb0c1
ID
1495 const char *name_format = rbd_dev->image_format == 1 ?
1496 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e 1497
a162b308
ID
1498 req = ceph_osdc_alloc_request(osdc,
1499 (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1500 num_ops, false, GFP_NOIO);
bc81207e
ID
1501 if (!req)
1502 return NULL;
1503
bc81207e 1504 req->r_callback = rbd_osd_req_callback;
a162b308 1505 req->r_priv = obj_req;
bc81207e 1506
b26c047b
ID
1507 /*
1508 * Data objects may be stored in a separate pool, but always in
1509 * the same namespace in that pool as the header in its pool.
1510 */
1511 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
bc81207e 1512 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
b26c047b 1513
a90bb0c1 1514 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
43df3d35 1515 rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
bc81207e
ID
1516 goto err_req;
1517
bc81207e
ID
1518 return req;
1519
1520err_req:
1521 ceph_osdc_put_request(req);
1522 return NULL;
1523}
1524
bf0d5f50
AE
1525static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1526{
1527 ceph_osdc_put_request(osd_req);
1528}
1529
ecc633ca 1530static struct rbd_obj_request *rbd_obj_request_create(void)
bf0d5f50
AE
1531{
1532 struct rbd_obj_request *obj_request;
bf0d5f50 1533
5a60e876 1534 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 1535 if (!obj_request)
f907ad55 1536 return NULL;
f907ad55 1537
43df3d35 1538 ceph_object_extent_init(&obj_request->ex);
bf0d5f50
AE
1539 kref_init(&obj_request->kref);
1540
67e2b652 1541 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
1542 return obj_request;
1543}
1544
1545static void rbd_obj_request_destroy(struct kref *kref)
1546{
1547 struct rbd_obj_request *obj_request;
7e07efb1 1548 u32 i;
bf0d5f50
AE
1549
1550 obj_request = container_of(kref, struct rbd_obj_request, kref);
1551
37206ee5
AE
1552 dout("%s: obj %p\n", __func__, obj_request);
1553
bf0d5f50
AE
1554 if (obj_request->osd_req)
1555 rbd_osd_req_destroy(obj_request->osd_req);
1556
ecc633ca 1557 switch (obj_request->img_request->data_type) {
9969ebc5 1558 case OBJ_REQUEST_NODATA:
bf0d5f50 1559 case OBJ_REQUEST_BIO:
7e07efb1 1560 case OBJ_REQUEST_BVECS:
5359a17d 1561 break; /* Nothing to do */
afb97888
ID
1562 case OBJ_REQUEST_OWN_BVECS:
1563 kfree(obj_request->bvec_pos.bvecs);
788e2df3 1564 break;
7e07efb1
ID
1565 default:
1566 rbd_assert(0);
bf0d5f50
AE
1567 }
1568
86bd7998 1569 kfree(obj_request->img_extents);
7e07efb1
ID
1570 if (obj_request->copyup_bvecs) {
1571 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1572 if (obj_request->copyup_bvecs[i].bv_page)
1573 __free_page(obj_request->copyup_bvecs[i].bv_page);
1574 }
1575 kfree(obj_request->copyup_bvecs);
bf0d5f50
AE
1576 }
1577
868311b1 1578 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1579}
1580
fb65d228
AE
1581/* It's OK to call this for a device with no parent */
1582
1583static void rbd_spec_put(struct rbd_spec *spec);
1584static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1585{
1586 rbd_dev_remove_parent(rbd_dev);
1587 rbd_spec_put(rbd_dev->parent_spec);
1588 rbd_dev->parent_spec = NULL;
1589 rbd_dev->parent_overlap = 0;
1590}
1591
a2acd00e
AE
1592/*
1593 * Parent image reference counting is used to determine when an
1594 * image's parent fields can be safely torn down--after there are no
1595 * more in-flight requests to the parent image. When the last
1596 * reference is dropped, cleaning them up is safe.
1597 */
1598static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1599{
1600 int counter;
1601
1602 if (!rbd_dev->parent_spec)
1603 return;
1604
1605 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1606 if (counter > 0)
1607 return;
1608
1609 /* Last reference; clean up parent data structures */
1610
1611 if (!counter)
1612 rbd_dev_unparent(rbd_dev);
1613 else
9584d508 1614 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
1615}
1616
1617/*
1618 * If an image has a non-zero parent overlap, get a reference to its
1619 * parent.
1620 *
1621 * Returns true if the rbd device has a parent with a non-zero
1622 * overlap and a reference for it was successfully taken, or
1623 * false otherwise.
1624 */
1625static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1626{
ae43e9d0 1627 int counter = 0;
a2acd00e
AE
1628
1629 if (!rbd_dev->parent_spec)
1630 return false;
1631
ae43e9d0
ID
1632 down_read(&rbd_dev->header_rwsem);
1633 if (rbd_dev->parent_overlap)
1634 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1635 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
1636
1637 if (counter < 0)
9584d508 1638 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 1639
ae43e9d0 1640 return counter > 0;
a2acd00e
AE
1641}
1642
bf0d5f50
AE
1643/*
1644 * Caller is responsible for filling in the list of object requests
1645 * that comprises the image request, and the Linux request pointer
1646 * (if there is one).
1647 */
cc344fa1
AE
1648static struct rbd_img_request *rbd_img_request_create(
1649 struct rbd_device *rbd_dev,
6d2940c8 1650 enum obj_operation_type op_type,
4e752f0a 1651 struct ceph_snap_context *snapc)
bf0d5f50
AE
1652{
1653 struct rbd_img_request *img_request;
bf0d5f50 1654
a0c5895b 1655 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
1656 if (!img_request)
1657 return NULL;
1658
bf0d5f50 1659 img_request->rbd_dev = rbd_dev;
9bb0248d 1660 img_request->op_type = op_type;
9bb0248d 1661 if (!rbd_img_is_write(img_request))
bf0d5f50 1662 img_request->snap_id = rbd_dev->spec->snap_id;
9bb0248d
ID
1663 else
1664 img_request->snapc = snapc;
1665
a2acd00e 1666 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1667 img_request_layered_set(img_request);
a0c5895b 1668
bf0d5f50 1669 spin_lock_init(&img_request->completion_lock);
43df3d35 1670 INIT_LIST_HEAD(&img_request->object_extents);
bf0d5f50
AE
1671 kref_init(&img_request->kref);
1672
dfd9875f
ID
1673 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1674 obj_op_name(op_type), img_request);
bf0d5f50
AE
1675 return img_request;
1676}
1677
1678static void rbd_img_request_destroy(struct kref *kref)
1679{
1680 struct rbd_img_request *img_request;
1681 struct rbd_obj_request *obj_request;
1682 struct rbd_obj_request *next_obj_request;
1683
1684 img_request = container_of(kref, struct rbd_img_request, kref);
1685
37206ee5
AE
1686 dout("%s: img %p\n", __func__, img_request);
1687
bf0d5f50
AE
1688 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1689 rbd_img_obj_request_del(img_request, obj_request);
1690
a2acd00e
AE
1691 if (img_request_layered_test(img_request)) {
1692 img_request_layered_clear(img_request);
1693 rbd_dev_parent_put(img_request->rbd_dev);
1694 }
1695
9bb0248d 1696 if (rbd_img_is_write(img_request))
812164f8 1697 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1698
1c2a9dfe 1699 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1700}
1701
86bd7998
ID
1702static void prune_extents(struct ceph_file_extent *img_extents,
1703 u32 *num_img_extents, u64 overlap)
e93f3152 1704{
86bd7998 1705 u32 cnt = *num_img_extents;
e93f3152 1706
86bd7998
ID
1707 /* drop extents completely beyond the overlap */
1708 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1709 cnt--;
e93f3152 1710
86bd7998
ID
1711 if (cnt) {
1712 struct ceph_file_extent *ex = &img_extents[cnt - 1];
e93f3152 1713
86bd7998
ID
1714 /* trim final overlapping extent */
1715 if (ex->fe_off + ex->fe_len > overlap)
1716 ex->fe_len = overlap - ex->fe_off;
1717 }
e93f3152 1718
86bd7998 1719 *num_img_extents = cnt;
e93f3152
AE
1720}
1721
86bd7998
ID
1722/*
1723 * Determine the byte range(s) covered by either just the object extent
1724 * or the entire object in the parent image.
1725 */
1726static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1727 bool entire)
e93f3152 1728{
86bd7998
ID
1729 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1730 int ret;
e93f3152 1731
86bd7998
ID
1732 if (!rbd_dev->parent_overlap)
1733 return 0;
e93f3152 1734
86bd7998
ID
1735 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1736 entire ? 0 : obj_req->ex.oe_off,
1737 entire ? rbd_dev->layout.object_size :
1738 obj_req->ex.oe_len,
1739 &obj_req->img_extents,
1740 &obj_req->num_img_extents);
1741 if (ret)
1742 return ret;
e93f3152 1743
86bd7998
ID
1744 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1745 rbd_dev->parent_overlap);
1746 return 0;
e93f3152
AE
1747}
1748
3da691bf 1749static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1217857f 1750{
ecc633ca 1751 switch (obj_req->img_request->data_type) {
3da691bf
ID
1752 case OBJ_REQUEST_BIO:
1753 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1754 &obj_req->bio_pos,
43df3d35 1755 obj_req->ex.oe_len);
3da691bf
ID
1756 break;
1757 case OBJ_REQUEST_BVECS:
afb97888 1758 case OBJ_REQUEST_OWN_BVECS:
3da691bf 1759 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
43df3d35 1760 obj_req->ex.oe_len);
afb97888 1761 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
3da691bf
ID
1762 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1763 &obj_req->bvec_pos);
1764 break;
1765 default:
1766 rbd_assert(0);
1217857f 1767 }
3da691bf 1768}
1217857f 1769
3da691bf
ID
1770static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1771{
a162b308 1772 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
3da691bf
ID
1773 if (!obj_req->osd_req)
1774 return -ENOMEM;
2a842aca 1775
3da691bf 1776 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
43df3d35 1777 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf 1778 rbd_osd_req_setup_data(obj_req, 0);
7ad18afa 1779
3da691bf
ID
1780 rbd_osd_req_format_read(obj_req);
1781 return 0;
1782}
1783
1784static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1785 unsigned int which)
1786{
1787 struct page **pages;
8b3e1a56 1788
3da691bf
ID
1789 /*
1790 * The response data for a STAT call consists of:
1791 * le64 length;
1792 * struct {
1793 * le32 tv_sec;
1794 * le32 tv_nsec;
1795 * } mtime;
1796 */
1797 pages = ceph_alloc_page_vector(1, GFP_NOIO);
1798 if (IS_ERR(pages))
1799 return PTR_ERR(pages);
1800
1801 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1802 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1803 8 + sizeof(struct ceph_timespec),
1804 0, false, true);
1805 return 0;
1217857f
AE
1806}
1807
3da691bf
ID
1808static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1809 unsigned int which)
2169238d 1810{
3da691bf
ID
1811 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1812 u16 opcode;
2169238d 1813
3da691bf
ID
1814 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1815 rbd_dev->layout.object_size,
1816 rbd_dev->layout.object_size);
2169238d 1817
3da691bf
ID
1818 if (rbd_obj_is_entire(obj_req))
1819 opcode = CEPH_OSD_OP_WRITEFULL;
1820 else
1821 opcode = CEPH_OSD_OP_WRITE;
2169238d 1822
3da691bf 1823 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
43df3d35 1824 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
3da691bf 1825 rbd_osd_req_setup_data(obj_req, which++);
2169238d 1826
3da691bf
ID
1827 rbd_assert(which == obj_req->osd_req->r_num_ops);
1828 rbd_osd_req_format_write(obj_req);
1829}
2169238d 1830
3da691bf
ID
1831static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1832{
3da691bf
ID
1833 unsigned int num_osd_ops, which = 0;
1834 int ret;
1835
86bd7998
ID
1836 /* reverse map the entire object onto the parent */
1837 ret = rbd_obj_calc_img_extents(obj_req, true);
1838 if (ret)
1839 return ret;
1840
1841 if (obj_req->num_img_extents) {
3da691bf
ID
1842 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1843 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1844 } else {
1845 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1846 num_osd_ops = 2; /* setallochint + write/writefull */
2169238d
AE
1847 }
1848
a162b308 1849 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1850 if (!obj_req->osd_req)
1851 return -ENOMEM;
2169238d 1852
86bd7998 1853 if (obj_req->num_img_extents) {
3da691bf
ID
1854 ret = __rbd_obj_setup_stat(obj_req, which++);
1855 if (ret)
1856 return ret;
1857 }
1858
1859 __rbd_obj_setup_write(obj_req, which);
1860 return 0;
2169238d
AE
1861}
1862
6484cbe9
ID
1863static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
1864{
1865 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
1866 CEPH_OSD_OP_ZERO;
1867}
1868
1869static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
1870{
0c93e1b7
ID
1871 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1872 u64 off = obj_req->ex.oe_off;
1873 u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
6484cbe9
ID
1874 int ret;
1875
0c93e1b7
ID
1876 /*
1877 * Align the range to alloc_size boundary and punt on discards
1878 * that are too small to free up any space.
1879 *
1880 * alloc_size == object_size && is_tail() is a special case for
1881 * filestore with filestore_punch_hole = false, needed to allow
1882 * truncate (in addition to delete).
1883 */
1884 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
1885 !rbd_obj_is_tail(obj_req)) {
1886 off = round_up(off, rbd_dev->opts->alloc_size);
1887 next_off = round_down(next_off, rbd_dev->opts->alloc_size);
1888 if (off >= next_off)
1889 return 1;
1890 }
1891
6484cbe9
ID
1892 /* reverse map the entire object onto the parent */
1893 ret = rbd_obj_calc_img_extents(obj_req, true);
1894 if (ret)
1895 return ret;
1896
1897 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1898 if (!obj_req->osd_req)
1899 return -ENOMEM;
1900
1901 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
1902 osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
1903 } else {
0c93e1b7
ID
1904 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
1905 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
1906 off, next_off - off);
6484cbe9
ID
1907 osd_req_op_extent_init(obj_req->osd_req, 0,
1908 truncate_or_zero_opcode(obj_req),
0c93e1b7 1909 off, next_off - off, 0, 0);
6484cbe9
ID
1910 }
1911
1912 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1913 rbd_osd_req_format_write(obj_req);
1914 return 0;
1915}
1916
1917static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
3da691bf
ID
1918 unsigned int which)
1919{
3b434a2a
JD
1920 u16 opcode;
1921
3da691bf 1922 if (rbd_obj_is_entire(obj_req)) {
86bd7998 1923 if (obj_req->num_img_extents) {
2bb1e56e
ID
1924 osd_req_op_init(obj_req->osd_req, which++,
1925 CEPH_OSD_OP_CREATE, 0);
3b434a2a
JD
1926 opcode = CEPH_OSD_OP_TRUNCATE;
1927 } else {
3da691bf
ID
1928 osd_req_op_init(obj_req->osd_req, which++,
1929 CEPH_OSD_OP_DELETE, 0);
1930 opcode = 0;
3b434a2a 1931 }
3b434a2a 1932 } else {
6484cbe9 1933 opcode = truncate_or_zero_opcode(obj_req);
3b434a2a
JD
1934 }
1935
3da691bf
ID
1936 if (opcode)
1937 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
43df3d35 1938 obj_req->ex.oe_off, obj_req->ex.oe_len,
3da691bf
ID
1939 0, 0);
1940
1941 rbd_assert(which == obj_req->osd_req->r_num_ops);
1942 rbd_osd_req_format_write(obj_req);
3b434a2a
JD
1943}
1944
6484cbe9 1945static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
bf0d5f50 1946{
3da691bf
ID
1947 unsigned int num_osd_ops, which = 0;
1948 int ret;
37206ee5 1949
86bd7998
ID
1950 /* reverse map the entire object onto the parent */
1951 ret = rbd_obj_calc_img_extents(obj_req, true);
1952 if (ret)
1953 return ret;
f1a4739f 1954
3da691bf
ID
1955 if (rbd_obj_is_entire(obj_req)) {
1956 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2bb1e56e
ID
1957 if (obj_req->num_img_extents)
1958 num_osd_ops = 2; /* create + truncate */
1959 else
1960 num_osd_ops = 1; /* delete */
3da691bf 1961 } else {
86bd7998 1962 if (obj_req->num_img_extents) {
3da691bf
ID
1963 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1964 num_osd_ops = 2; /* stat + truncate/zero */
1965 } else {
1966 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1967 num_osd_ops = 1; /* truncate/zero */
1968 }
f1a4739f
AE
1969 }
1970
a162b308 1971 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
1972 if (!obj_req->osd_req)
1973 return -ENOMEM;
bf0d5f50 1974
86bd7998 1975 if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
3da691bf
ID
1976 ret = __rbd_obj_setup_stat(obj_req, which++);
1977 if (ret)
1978 return ret;
1979 }
3b434a2a 1980
6484cbe9 1981 __rbd_obj_setup_zeroout(obj_req, which);
3da691bf
ID
1982 return 0;
1983}
9d4df01f 1984
3da691bf
ID
1985/*
1986 * For each object request in @img_req, allocate an OSD request, add
1987 * individual OSD ops and prepare them for submission. The number of
1988 * OSD ops depends on op_type and the overlap point (if any).
1989 */
1990static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1991{
0c93e1b7 1992 struct rbd_obj_request *obj_req, *next_obj_req;
3da691bf 1993 int ret;
430c28c3 1994
0c93e1b7 1995 for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
9bb0248d 1996 switch (img_req->op_type) {
3da691bf
ID
1997 case OBJ_OP_READ:
1998 ret = rbd_obj_setup_read(obj_req);
1999 break;
2000 case OBJ_OP_WRITE:
2001 ret = rbd_obj_setup_write(obj_req);
2002 break;
2003 case OBJ_OP_DISCARD:
2004 ret = rbd_obj_setup_discard(obj_req);
2005 break;
6484cbe9
ID
2006 case OBJ_OP_ZEROOUT:
2007 ret = rbd_obj_setup_zeroout(obj_req);
2008 break;
3da691bf
ID
2009 default:
2010 rbd_assert(0);
2011 }
0c93e1b7 2012 if (ret < 0)
3da691bf 2013 return ret;
0c93e1b7
ID
2014 if (ret > 0) {
2015 img_req->xferred += obj_req->ex.oe_len;
2016 img_req->pending_count--;
2017 rbd_img_obj_request_del(img_req, obj_req);
2018 continue;
2019 }
26f887e0
ID
2020
2021 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
2022 if (ret)
2023 return ret;
bf0d5f50
AE
2024 }
2025
2026 return 0;
3da691bf 2027}
bf0d5f50 2028
5a237819
ID
2029union rbd_img_fill_iter {
2030 struct ceph_bio_iter bio_iter;
2031 struct ceph_bvec_iter bvec_iter;
2032};
bf0d5f50 2033
5a237819
ID
2034struct rbd_img_fill_ctx {
2035 enum obj_request_type pos_type;
2036 union rbd_img_fill_iter *pos;
2037 union rbd_img_fill_iter iter;
2038 ceph_object_extent_fn_t set_pos_fn;
afb97888
ID
2039 ceph_object_extent_fn_t count_fn;
2040 ceph_object_extent_fn_t copy_fn;
5a237819 2041};
bf0d5f50 2042
5a237819 2043static struct ceph_object_extent *alloc_object_extent(void *arg)
0eefd470 2044{
5a237819
ID
2045 struct rbd_img_request *img_req = arg;
2046 struct rbd_obj_request *obj_req;
0eefd470 2047
5a237819
ID
2048 obj_req = rbd_obj_request_create();
2049 if (!obj_req)
2050 return NULL;
2761713d 2051
5a237819
ID
2052 rbd_img_obj_request_add(img_req, obj_req);
2053 return &obj_req->ex;
2054}
0eefd470 2055
afb97888
ID
2056/*
2057 * While su != os && sc == 1 is technically not fancy (it's the same
2058 * layout as su == os && sc == 1), we can't use the nocopy path for it
2059 * because ->set_pos_fn() should be called only once per object.
2060 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2061 * treat su != os && sc == 1 as fancy.
2062 */
2063static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2064{
2065 return l->stripe_unit != l->object_size;
2066}
0eefd470 2067
afb97888
ID
2068static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2069 struct ceph_file_extent *img_extents,
2070 u32 num_img_extents,
2071 struct rbd_img_fill_ctx *fctx)
2072{
2073 u32 i;
2074 int ret;
2075
2076 img_req->data_type = fctx->pos_type;
0eefd470
AE
2077
2078 /*
afb97888
ID
2079 * Create object requests and set each object request's starting
2080 * position in the provided bio (list) or bio_vec array.
0eefd470 2081 */
afb97888
ID
2082 fctx->iter = *fctx->pos;
2083 for (i = 0; i < num_img_extents; i++) {
2084 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2085 img_extents[i].fe_off,
2086 img_extents[i].fe_len,
2087 &img_req->object_extents,
2088 alloc_object_extent, img_req,
2089 fctx->set_pos_fn, &fctx->iter);
2090 if (ret)
2091 return ret;
2092 }
0eefd470 2093
afb97888 2094 return __rbd_img_fill_request(img_req);
0eefd470
AE
2095}
2096
5a237819
ID
2097/*
2098 * Map a list of image extents to a list of object extents, create the
2099 * corresponding object requests (normally each to a different object,
2100 * but not always) and add them to @img_req. For each object request,
afb97888 2101 * set up its data descriptor to point to the corresponding chunk(s) of
5a237819
ID
2102 * @fctx->pos data buffer.
2103 *
afb97888
ID
2104 * Because ceph_file_to_extents() will merge adjacent object extents
2105 * together, each object request's data descriptor may point to multiple
2106 * different chunks of @fctx->pos data buffer.
2107 *
5a237819
ID
2108 * @fctx->pos data buffer is assumed to be large enough.
2109 */
2110static int rbd_img_fill_request(struct rbd_img_request *img_req,
2111 struct ceph_file_extent *img_extents,
2112 u32 num_img_extents,
2113 struct rbd_img_fill_ctx *fctx)
3d7efd18 2114{
afb97888
ID
2115 struct rbd_device *rbd_dev = img_req->rbd_dev;
2116 struct rbd_obj_request *obj_req;
5a237819
ID
2117 u32 i;
2118 int ret;
2119
afb97888
ID
2120 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2121 !rbd_layout_is_fancy(&rbd_dev->layout))
2122 return rbd_img_fill_request_nocopy(img_req, img_extents,
2123 num_img_extents, fctx);
3d7efd18 2124
afb97888 2125 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
0eefd470 2126
bbea1c1a 2127 /*
afb97888
ID
2128 * Create object requests and determine ->bvec_count for each object
2129 * request. Note that ->bvec_count sum over all object requests may
2130 * be greater than the number of bio_vecs in the provided bio (list)
2131 * or bio_vec array because when mapped, those bio_vecs can straddle
2132 * stripe unit boundaries.
bbea1c1a 2133 */
5a237819
ID
2134 fctx->iter = *fctx->pos;
2135 for (i = 0; i < num_img_extents; i++) {
afb97888 2136 ret = ceph_file_to_extents(&rbd_dev->layout,
5a237819
ID
2137 img_extents[i].fe_off,
2138 img_extents[i].fe_len,
2139 &img_req->object_extents,
2140 alloc_object_extent, img_req,
afb97888
ID
2141 fctx->count_fn, &fctx->iter);
2142 if (ret)
2143 return ret;
bbea1c1a 2144 }
0eefd470 2145
afb97888
ID
2146 for_each_obj_request(img_req, obj_req) {
2147 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2148 sizeof(*obj_req->bvec_pos.bvecs),
2149 GFP_NOIO);
2150 if (!obj_req->bvec_pos.bvecs)
2151 return -ENOMEM;
2152 }
0eefd470 2153
8785b1d4 2154 /*
afb97888
ID
2155 * Fill in each object request's private bio_vec array, splitting and
2156 * rearranging the provided bio_vecs in stripe unit chunks as needed.
8785b1d4 2157 */
afb97888
ID
2158 fctx->iter = *fctx->pos;
2159 for (i = 0; i < num_img_extents; i++) {
2160 ret = ceph_iterate_extents(&rbd_dev->layout,
2161 img_extents[i].fe_off,
2162 img_extents[i].fe_len,
2163 &img_req->object_extents,
2164 fctx->copy_fn, &fctx->iter);
5a237819
ID
2165 if (ret)
2166 return ret;
2167 }
3d7efd18 2168
5a237819
ID
2169 return __rbd_img_fill_request(img_req);
2170}
2171
2172static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2173 u64 off, u64 len)
2174{
2175 struct ceph_file_extent ex = { off, len };
2176 union rbd_img_fill_iter dummy;
2177 struct rbd_img_fill_ctx fctx = {
2178 .pos_type = OBJ_REQUEST_NODATA,
2179 .pos = &dummy,
2180 };
2181
2182 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2183}
2184
2185static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2186{
2187 struct rbd_obj_request *obj_req =
2188 container_of(ex, struct rbd_obj_request, ex);
2189 struct ceph_bio_iter *it = arg;
3d7efd18 2190
5a237819
ID
2191 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2192 obj_req->bio_pos = *it;
2193 ceph_bio_iter_advance(it, bytes);
2194}
3d7efd18 2195
afb97888
ID
2196static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2197{
2198 struct rbd_obj_request *obj_req =
2199 container_of(ex, struct rbd_obj_request, ex);
2200 struct ceph_bio_iter *it = arg;
0eefd470 2201
afb97888
ID
2202 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2203 ceph_bio_iter_advance_step(it, bytes, ({
2204 obj_req->bvec_count++;
2205 }));
0eefd470 2206
afb97888 2207}
0eefd470 2208
afb97888
ID
2209static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2210{
2211 struct rbd_obj_request *obj_req =
2212 container_of(ex, struct rbd_obj_request, ex);
2213 struct ceph_bio_iter *it = arg;
0eefd470 2214
afb97888
ID
2215 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2216 ceph_bio_iter_advance_step(it, bytes, ({
2217 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2218 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2219 }));
3d7efd18
AE
2220}
2221
5a237819
ID
2222static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2223 struct ceph_file_extent *img_extents,
2224 u32 num_img_extents,
2225 struct ceph_bio_iter *bio_pos)
2226{
2227 struct rbd_img_fill_ctx fctx = {
2228 .pos_type = OBJ_REQUEST_BIO,
2229 .pos = (union rbd_img_fill_iter *)bio_pos,
2230 .set_pos_fn = set_bio_pos,
afb97888
ID
2231 .count_fn = count_bio_bvecs,
2232 .copy_fn = copy_bio_bvecs,
5a237819 2233 };
3d7efd18 2234
5a237819
ID
2235 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2236 &fctx);
2237}
3d7efd18 2238
5a237819
ID
2239static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2240 u64 off, u64 len, struct bio *bio)
2241{
2242 struct ceph_file_extent ex = { off, len };
2243 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
3d7efd18 2244
5a237819
ID
2245 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2246}
a9e8ba2c 2247
5a237819
ID
2248static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2249{
2250 struct rbd_obj_request *obj_req =
2251 container_of(ex, struct rbd_obj_request, ex);
2252 struct ceph_bvec_iter *it = arg;
3d7efd18 2253
5a237819
ID
2254 obj_req->bvec_pos = *it;
2255 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2256 ceph_bvec_iter_advance(it, bytes);
2257}
3d7efd18 2258
afb97888
ID
2259static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2260{
2261 struct rbd_obj_request *obj_req =
2262 container_of(ex, struct rbd_obj_request, ex);
2263 struct ceph_bvec_iter *it = arg;
058aa991 2264
afb97888
ID
2265 ceph_bvec_iter_advance_step(it, bytes, ({
2266 obj_req->bvec_count++;
2267 }));
2268}
058aa991 2269
afb97888
ID
2270static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2271{
2272 struct rbd_obj_request *obj_req =
2273 container_of(ex, struct rbd_obj_request, ex);
2274 struct ceph_bvec_iter *it = arg;
3d7efd18 2275
afb97888
ID
2276 ceph_bvec_iter_advance_step(it, bytes, ({
2277 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2278 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2279 }));
3d7efd18
AE
2280}
2281
5a237819
ID
2282static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2283 struct ceph_file_extent *img_extents,
2284 u32 num_img_extents,
2285 struct ceph_bvec_iter *bvec_pos)
c5b5ef6c 2286{
5a237819
ID
2287 struct rbd_img_fill_ctx fctx = {
2288 .pos_type = OBJ_REQUEST_BVECS,
2289 .pos = (union rbd_img_fill_iter *)bvec_pos,
2290 .set_pos_fn = set_bvec_pos,
afb97888
ID
2291 .count_fn = count_bvecs,
2292 .copy_fn = copy_bvecs,
5a237819 2293 };
c5b5ef6c 2294
5a237819
ID
2295 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2296 &fctx);
2297}
c5b5ef6c 2298
5a237819
ID
2299static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2300 struct ceph_file_extent *img_extents,
2301 u32 num_img_extents,
2302 struct bio_vec *bvecs)
2303{
2304 struct ceph_bvec_iter it = {
2305 .bvecs = bvecs,
2306 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2307 num_img_extents) },
2308 };
c5b5ef6c 2309
5a237819
ID
2310 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2311 &it);
2312}
c5b5ef6c 2313
efbd1a11 2314static void rbd_img_request_submit(struct rbd_img_request *img_request)
bf0d5f50 2315{
bf0d5f50 2316 struct rbd_obj_request *obj_request;
c5b5ef6c 2317
37206ee5 2318 dout("%s: img %p\n", __func__, img_request);
c2e82414 2319
663ae2cc 2320 rbd_img_request_get(img_request);
efbd1a11 2321 for_each_obj_request(img_request, obj_request)
3da691bf 2322 rbd_obj_request_submit(obj_request);
c2e82414 2323
663ae2cc 2324 rbd_img_request_put(img_request);
c5b5ef6c
AE
2325}
2326
86bd7998 2327static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
c5b5ef6c 2328{
3da691bf
ID
2329 struct rbd_img_request *img_req = obj_req->img_request;
2330 struct rbd_img_request *child_img_req;
c5b5ef6c
AE
2331 int ret;
2332
e93aca0a
ID
2333 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2334 OBJ_OP_READ, NULL);
3da691bf 2335 if (!child_img_req)
710214e3
ID
2336 return -ENOMEM;
2337
e93aca0a
ID
2338 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2339 child_img_req->obj_request = obj_req;
a90bb0c1 2340
3da691bf 2341 if (!rbd_img_is_write(img_req)) {
ecc633ca 2342 switch (img_req->data_type) {
3da691bf 2343 case OBJ_REQUEST_BIO:
5a237819
ID
2344 ret = __rbd_img_fill_from_bio(child_img_req,
2345 obj_req->img_extents,
2346 obj_req->num_img_extents,
2347 &obj_req->bio_pos);
3da691bf
ID
2348 break;
2349 case OBJ_REQUEST_BVECS:
afb97888 2350 case OBJ_REQUEST_OWN_BVECS:
5a237819
ID
2351 ret = __rbd_img_fill_from_bvecs(child_img_req,
2352 obj_req->img_extents,
2353 obj_req->num_img_extents,
2354 &obj_req->bvec_pos);
3da691bf
ID
2355 break;
2356 default:
2357 rbd_assert(0);
2358 }
2359 } else {
5a237819
ID
2360 ret = rbd_img_fill_from_bvecs(child_img_req,
2361 obj_req->img_extents,
2362 obj_req->num_img_extents,
2363 obj_req->copyup_bvecs);
3da691bf
ID
2364 }
2365 if (ret) {
2366 rbd_img_request_put(child_img_req);
2367 return ret;
2368 }
2369
2370 rbd_img_request_submit(child_img_req);
2371 return 0;
2372}
2373
2374static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2375{
2376 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2377 int ret;
2378
2379 if (obj_req->result == -ENOENT &&
86bd7998
ID
2380 rbd_dev->parent_overlap && !obj_req->tried_parent) {
2381 /* reverse map this object extent onto the parent */
2382 ret = rbd_obj_calc_img_extents(obj_req, false);
3da691bf
ID
2383 if (ret) {
2384 obj_req->result = ret;
2385 return true;
2386 }
86bd7998
ID
2387
2388 if (obj_req->num_img_extents) {
2389 obj_req->tried_parent = true;
2390 ret = rbd_obj_read_from_parent(obj_req);
2391 if (ret) {
2392 obj_req->result = ret;
2393 return true;
2394 }
2395 return false;
2396 }
710214e3
ID
2397 }
2398
c5b5ef6c 2399 /*
3da691bf
ID
2400 * -ENOENT means a hole in the image -- zero-fill the entire
2401 * length of the request. A short read also implies zero-fill
2402 * to the end of the request. In both cases we update xferred
2403 * count to indicate the whole request was satisfied.
c5b5ef6c 2404 */
3da691bf 2405 if (obj_req->result == -ENOENT ||
43df3d35 2406 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
3da691bf
ID
2407 rbd_assert(!obj_req->xferred || !obj_req->result);
2408 rbd_obj_zero_range(obj_req, obj_req->xferred,
43df3d35 2409 obj_req->ex.oe_len - obj_req->xferred);
3da691bf 2410 obj_req->result = 0;
43df3d35 2411 obj_req->xferred = obj_req->ex.oe_len;
710214e3 2412 }
c5b5ef6c 2413
3da691bf
ID
2414 return true;
2415}
c5b5ef6c 2416
3da691bf
ID
2417/*
2418 * copyup_bvecs pages are never highmem pages
2419 */
2420static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2421{
2422 struct ceph_bvec_iter it = {
2423 .bvecs = bvecs,
2424 .iter = { .bi_size = bytes },
2425 };
c5b5ef6c 2426
3da691bf
ID
2427 ceph_bvec_iter_advance_step(&it, bytes, ({
2428 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2429 bv.bv_len))
2430 return false;
2431 }));
2432 return true;
c5b5ef6c
AE
2433}
2434
3da691bf 2435static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
b454e36d 2436{
3da691bf 2437 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
fe943d50 2438 int ret;
70d045f6 2439
3da691bf
ID
2440 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2441 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2442 rbd_osd_req_destroy(obj_req->osd_req);
70d045f6 2443
b454e36d 2444 /*
3da691bf
ID
2445 * Create a copyup request with the same number of OSD ops as
2446 * the original request. The original request was stat + op(s),
2447 * the new copyup request will be copyup + the same op(s).
b454e36d 2448 */
a162b308 2449 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
2450 if (!obj_req->osd_req)
2451 return -ENOMEM;
b454e36d 2452
24639ce5 2453 ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
fe943d50
CX
2454 if (ret)
2455 return ret;
2456
c622d226 2457 /*
3da691bf
ID
2458 * Only send non-zero copyup data to save some I/O and network
2459 * bandwidth -- zero copyup data is equivalent to the object not
2460 * existing.
c622d226 2461 */
3da691bf
ID
2462 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2463 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2464 bytes = 0;
2465 }
3da691bf 2466 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
0010f705
ID
2467 obj_req->copyup_bvecs,
2468 obj_req->copyup_bvec_count,
2469 bytes);
3da691bf 2470
9bb0248d 2471 switch (obj_req->img_request->op_type) {
3da691bf
ID
2472 case OBJ_OP_WRITE:
2473 __rbd_obj_setup_write(obj_req, 1);
2474 break;
6484cbe9 2475 case OBJ_OP_ZEROOUT:
3da691bf 2476 rbd_assert(!rbd_obj_is_entire(obj_req));
6484cbe9 2477 __rbd_obj_setup_zeroout(obj_req, 1);
3da691bf
ID
2478 break;
2479 default:
2480 rbd_assert(0);
2481 }
70d045f6 2482
26f887e0
ID
2483 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
2484 if (ret)
2485 return ret;
2486
3da691bf 2487 rbd_obj_request_submit(obj_req);
3da691bf 2488 return 0;
70d045f6
ID
2489}
2490
7e07efb1 2491static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
70d045f6 2492{
7e07efb1 2493 u32 i;
b454e36d 2494
7e07efb1
ID
2495 rbd_assert(!obj_req->copyup_bvecs);
2496 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2497 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2498 sizeof(*obj_req->copyup_bvecs),
2499 GFP_NOIO);
2500 if (!obj_req->copyup_bvecs)
2501 return -ENOMEM;
b454e36d 2502
7e07efb1
ID
2503 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2504 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2505
2506 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2507 if (!obj_req->copyup_bvecs[i].bv_page)
2508 return -ENOMEM;
3d7efd18 2509
7e07efb1
ID
2510 obj_req->copyup_bvecs[i].bv_offset = 0;
2511 obj_req->copyup_bvecs[i].bv_len = len;
2512 obj_overlap -= len;
2513 }
b454e36d 2514
7e07efb1
ID
2515 rbd_assert(!obj_overlap);
2516 return 0;
b454e36d
AE
2517}
2518
3da691bf 2519static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
bf0d5f50 2520{
3da691bf 2521 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3da691bf 2522 int ret;
bf0d5f50 2523
86bd7998
ID
2524 rbd_assert(obj_req->num_img_extents);
2525 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2526 rbd_dev->parent_overlap);
2527 if (!obj_req->num_img_extents) {
3da691bf
ID
2528 /*
2529 * The overlap has become 0 (most likely because the
2530 * image has been flattened). Use rbd_obj_issue_copyup()
2531 * to re-submit the original write request -- the copyup
2532 * operation itself will be a no-op, since someone must
2533 * have populated the child object while we weren't
2534 * looking. Move to WRITE_FLAT state as we'll be done
2535 * with the operation once the null copyup completes.
2536 */
2537 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2538 return rbd_obj_issue_copyup(obj_req, 0);
bf0d5f50
AE
2539 }
2540
86bd7998 2541 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3da691bf
ID
2542 if (ret)
2543 return ret;
2544
2545 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
86bd7998 2546 return rbd_obj_read_from_parent(obj_req);
bf0d5f50 2547}
8b3e1a56 2548
3da691bf 2549static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
8b3e1a56 2550{
3da691bf 2551 int ret;
8b3e1a56 2552
3da691bf
ID
2553again:
2554 switch (obj_req->write_state) {
2555 case RBD_OBJ_WRITE_GUARD:
2556 rbd_assert(!obj_req->xferred);
2557 if (obj_req->result == -ENOENT) {
2558 /*
2559 * The target object doesn't exist. Read the data for
2560 * the entire target object up to the overlap point (if
2561 * any) from the parent, so we can use it for a copyup.
2562 */
2563 ret = rbd_obj_handle_write_guard(obj_req);
2564 if (ret) {
2565 obj_req->result = ret;
2566 return true;
2567 }
2568 return false;
2569 }
2570 /* fall through */
2571 case RBD_OBJ_WRITE_FLAT:
2572 if (!obj_req->result)
2573 /*
2574 * There is no such thing as a successful short
2575 * write -- indicate the whole request was satisfied.
2576 */
43df3d35 2577 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2578 return true;
2579 case RBD_OBJ_WRITE_COPYUP:
2580 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2581 if (obj_req->result)
2582 goto again;
8b3e1a56 2583
3da691bf
ID
2584 rbd_assert(obj_req->xferred);
2585 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2586 if (ret) {
2587 obj_req->result = ret;
356889c4 2588 obj_req->xferred = 0;
3da691bf
ID
2589 return true;
2590 }
2591 return false;
2592 default:
c6244b3b 2593 BUG();
3da691bf
ID
2594 }
2595}
02c74fba 2596
3da691bf
ID
2597/*
2598 * Returns true if @obj_req is completed, or false otherwise.
2599 */
2600static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2601{
9bb0248d 2602 switch (obj_req->img_request->op_type) {
3da691bf
ID
2603 case OBJ_OP_READ:
2604 return rbd_obj_handle_read(obj_req);
2605 case OBJ_OP_WRITE:
2606 return rbd_obj_handle_write(obj_req);
2607 case OBJ_OP_DISCARD:
6484cbe9 2608 case OBJ_OP_ZEROOUT:
3da691bf
ID
2609 if (rbd_obj_handle_write(obj_req)) {
2610 /*
2611 * Hide -ENOENT from delete/truncate/zero -- discarding
2612 * a non-existent object is not a problem.
2613 */
2614 if (obj_req->result == -ENOENT) {
2615 obj_req->result = 0;
43df3d35 2616 obj_req->xferred = obj_req->ex.oe_len;
3da691bf
ID
2617 }
2618 return true;
2619 }
2620 return false;
2621 default:
c6244b3b 2622 BUG();
3da691bf
ID
2623 }
2624}
02c74fba 2625
7114edac
ID
2626static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2627{
2628 struct rbd_img_request *img_req = obj_req->img_request;
2629
2630 rbd_assert((!obj_req->result &&
43df3d35 2631 obj_req->xferred == obj_req->ex.oe_len) ||
7114edac
ID
2632 (obj_req->result < 0 && !obj_req->xferred));
2633 if (!obj_req->result) {
2634 img_req->xferred += obj_req->xferred;
980917fc 2635 return;
02c74fba 2636 }
a9e8ba2c 2637
7114edac
ID
2638 rbd_warn(img_req->rbd_dev,
2639 "%s at objno %llu %llu~%llu result %d xferred %llu",
43df3d35
ID
2640 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2641 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
7114edac
ID
2642 obj_req->xferred);
2643 if (!img_req->result) {
2644 img_req->result = obj_req->result;
2645 img_req->xferred = 0;
2646 }
2647}
a9e8ba2c 2648
3da691bf
ID
2649static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2650{
2651 struct rbd_obj_request *obj_req = img_req->obj_request;
a9e8ba2c 2652
3da691bf 2653 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
86bd7998
ID
2654 rbd_assert((!img_req->result &&
2655 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2656 (img_req->result < 0 && !img_req->xferred));
8b3e1a56 2657
3da691bf
ID
2658 obj_req->result = img_req->result;
2659 obj_req->xferred = img_req->xferred;
2660 rbd_img_request_put(img_req);
8b3e1a56
AE
2661}
2662
7114edac 2663static void rbd_img_end_request(struct rbd_img_request *img_req)
8b3e1a56 2664{
7114edac
ID
2665 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2666 rbd_assert((!img_req->result &&
2667 img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2668 (img_req->result < 0 && !img_req->xferred));
8b3e1a56 2669
7114edac
ID
2670 blk_mq_end_request(img_req->rq,
2671 errno_to_blk_status(img_req->result));
2672 rbd_img_request_put(img_req);
3da691bf 2673}
8b3e1a56 2674
3da691bf
ID
2675static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2676{
7114edac 2677 struct rbd_img_request *img_req;
8b3e1a56 2678
7114edac 2679again:
3da691bf
ID
2680 if (!__rbd_obj_handle_request(obj_req))
2681 return;
8b3e1a56 2682
7114edac
ID
2683 img_req = obj_req->img_request;
2684 spin_lock(&img_req->completion_lock);
2685 rbd_obj_end_request(obj_req);
2686 rbd_assert(img_req->pending_count);
2687 if (--img_req->pending_count) {
2688 spin_unlock(&img_req->completion_lock);
2689 return;
2690 }
8b3e1a56 2691
7114edac
ID
2692 spin_unlock(&img_req->completion_lock);
2693 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2694 obj_req = img_req->obj_request;
2695 rbd_img_end_child_request(img_req);
2696 goto again;
2697 }
2698 rbd_img_end_request(img_req);
8b3e1a56 2699}
bf0d5f50 2700
ed95b21a 2701static const struct rbd_client_id rbd_empty_cid;
b8d70035 2702
ed95b21a
ID
2703static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2704 const struct rbd_client_id *rhs)
2705{
2706 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2707}
2708
2709static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2710{
2711 struct rbd_client_id cid;
2712
2713 mutex_lock(&rbd_dev->watch_mutex);
2714 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2715 cid.handle = rbd_dev->watch_cookie;
2716 mutex_unlock(&rbd_dev->watch_mutex);
2717 return cid;
2718}
2719
2720/*
2721 * lock_rwsem must be held for write
2722 */
2723static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2724 const struct rbd_client_id *cid)
2725{
2726 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2727 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2728 cid->gid, cid->handle);
2729 rbd_dev->owner_cid = *cid; /* struct */
2730}
2731
2732static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2733{
2734 mutex_lock(&rbd_dev->watch_mutex);
2735 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2736 mutex_unlock(&rbd_dev->watch_mutex);
2737}
2738
edd8ca80
FM
2739static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2740{
2741 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2742
2743 strcpy(rbd_dev->lock_cookie, cookie);
2744 rbd_set_owner_cid(rbd_dev, &cid);
2745 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2746}
2747
ed95b21a
ID
2748/*
2749 * lock_rwsem must be held for write
2750 */
2751static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 2752{
922dab61 2753 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2754 char cookie[32];
e627db08 2755 int ret;
b8d70035 2756
cbbfb0ff
ID
2757 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2758 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 2759
ed95b21a
ID
2760 format_lock_cookie(rbd_dev, cookie);
2761 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2762 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2763 RBD_LOCK_TAG, "", 0);
e627db08 2764 if (ret)
ed95b21a 2765 return ret;
b8d70035 2766
ed95b21a 2767 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
edd8ca80 2768 __rbd_lock(rbd_dev, cookie);
ed95b21a 2769 return 0;
b8d70035
AE
2770}
2771
ed95b21a
ID
2772/*
2773 * lock_rwsem must be held for write
2774 */
bbead745 2775static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 2776{
922dab61 2777 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
2778 int ret;
2779
cbbfb0ff
ID
2780 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2781 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 2782
ed95b21a 2783 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 2784 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
2785 if (ret && ret != -ENOENT)
2786 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 2787
bbead745
ID
2788 /* treat errors as the image is unlocked */
2789 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 2790 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
2791 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2792 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
2793}
2794
ed95b21a
ID
2795static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2796 enum rbd_notify_op notify_op,
2797 struct page ***preply_pages,
2798 size_t *preply_len)
9969ebc5
AE
2799{
2800 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2801 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
08a79102
KS
2802 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2803 int buf_size = sizeof(buf);
ed95b21a 2804 void *p = buf;
9969ebc5 2805
ed95b21a 2806 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 2807
ed95b21a
ID
2808 /* encode *LockPayload NotifyMessage (op + ClientId) */
2809 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2810 ceph_encode_32(&p, notify_op);
2811 ceph_encode_64(&p, cid.gid);
2812 ceph_encode_64(&p, cid.handle);
8eb87565 2813
ed95b21a
ID
2814 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2815 &rbd_dev->header_oloc, buf, buf_size,
2816 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
2817}
2818
ed95b21a
ID
2819static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2820 enum rbd_notify_op notify_op)
b30a01f2 2821{
ed95b21a
ID
2822 struct page **reply_pages;
2823 size_t reply_len;
b30a01f2 2824
ed95b21a
ID
2825 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2826 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2827}
b30a01f2 2828
ed95b21a
ID
2829static void rbd_notify_acquired_lock(struct work_struct *work)
2830{
2831 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2832 acquired_lock_work);
76756a51 2833
ed95b21a 2834 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
2835}
2836
ed95b21a 2837static void rbd_notify_released_lock(struct work_struct *work)
c525f036 2838{
ed95b21a
ID
2839 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2840 released_lock_work);
811c6688 2841
ed95b21a 2842 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
2843}
2844
ed95b21a 2845static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 2846{
ed95b21a
ID
2847 struct page **reply_pages;
2848 size_t reply_len;
2849 bool lock_owner_responded = false;
36be9a76
AE
2850 int ret;
2851
ed95b21a 2852 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 2853
ed95b21a
ID
2854 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2855 &reply_pages, &reply_len);
2856 if (ret && ret != -ETIMEDOUT) {
2857 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 2858 goto out;
ed95b21a 2859 }
36be9a76 2860
ed95b21a
ID
2861 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2862 void *p = page_address(reply_pages[0]);
2863 void *const end = p + reply_len;
2864 u32 n;
36be9a76 2865
ed95b21a
ID
2866 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2867 while (n--) {
2868 u8 struct_v;
2869 u32 len;
36be9a76 2870
ed95b21a
ID
2871 ceph_decode_need(&p, end, 8 + 8, e_inval);
2872 p += 8 + 8; /* skip gid and cookie */
04017e29 2873
ed95b21a
ID
2874 ceph_decode_32_safe(&p, end, len, e_inval);
2875 if (!len)
2876 continue;
2877
2878 if (lock_owner_responded) {
2879 rbd_warn(rbd_dev,
2880 "duplicate lock owners detected");
2881 ret = -EIO;
2882 goto out;
2883 }
2884
2885 lock_owner_responded = true;
2886 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2887 &struct_v, &len);
2888 if (ret) {
2889 rbd_warn(rbd_dev,
2890 "failed to decode ResponseMessage: %d",
2891 ret);
2892 goto e_inval;
2893 }
2894
2895 ret = ceph_decode_32(&p);
2896 }
2897 }
2898
2899 if (!lock_owner_responded) {
2900 rbd_warn(rbd_dev, "no lock owners detected");
2901 ret = -ETIMEDOUT;
2902 }
2903
2904out:
2905 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2906 return ret;
2907
2908e_inval:
2909 ret = -EINVAL;
2910 goto out;
2911}
2912
2913static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2914{
2915 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2916
2917 cancel_delayed_work(&rbd_dev->lock_dwork);
2918 if (wake_all)
2919 wake_up_all(&rbd_dev->lock_waitq);
2920 else
2921 wake_up(&rbd_dev->lock_waitq);
2922}
2923
2924static int get_lock_owner_info(struct rbd_device *rbd_dev,
2925 struct ceph_locker **lockers, u32 *num_lockers)
2926{
2927 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2928 u8 lock_type;
2929 char *lock_tag;
2930 int ret;
2931
2932 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2933
2934 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2935 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2936 &lock_type, &lock_tag, lockers, num_lockers);
2937 if (ret)
2938 return ret;
2939
2940 if (*num_lockers == 0) {
2941 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2942 goto out;
2943 }
2944
2945 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2946 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2947 lock_tag);
2948 ret = -EBUSY;
2949 goto out;
2950 }
2951
2952 if (lock_type == CEPH_CLS_LOCK_SHARED) {
2953 rbd_warn(rbd_dev, "shared lock type detected");
2954 ret = -EBUSY;
2955 goto out;
2956 }
2957
2958 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2959 strlen(RBD_LOCK_COOKIE_PREFIX))) {
2960 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2961 (*lockers)[0].id.cookie);
2962 ret = -EBUSY;
2963 goto out;
2964 }
2965
2966out:
2967 kfree(lock_tag);
2968 return ret;
2969}
2970
2971static int find_watcher(struct rbd_device *rbd_dev,
2972 const struct ceph_locker *locker)
2973{
2974 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2975 struct ceph_watch_item *watchers;
2976 u32 num_watchers;
2977 u64 cookie;
2978 int i;
2979 int ret;
2980
2981 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2982 &rbd_dev->header_oloc, &watchers,
2983 &num_watchers);
2984 if (ret)
2985 return ret;
2986
2987 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2988 for (i = 0; i < num_watchers; i++) {
2989 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2990 sizeof(locker->info.addr)) &&
2991 watchers[i].cookie == cookie) {
2992 struct rbd_client_id cid = {
2993 .gid = le64_to_cpu(watchers[i].name.num),
2994 .handle = cookie,
2995 };
2996
2997 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2998 rbd_dev, cid.gid, cid.handle);
2999 rbd_set_owner_cid(rbd_dev, &cid);
3000 ret = 1;
3001 goto out;
3002 }
3003 }
3004
3005 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3006 ret = 0;
3007out:
3008 kfree(watchers);
3009 return ret;
3010}
3011
3012/*
3013 * lock_rwsem must be held for write
3014 */
3015static int rbd_try_lock(struct rbd_device *rbd_dev)
3016{
3017 struct ceph_client *client = rbd_dev->rbd_client->client;
3018 struct ceph_locker *lockers;
3019 u32 num_lockers;
3020 int ret;
3021
3022 for (;;) {
3023 ret = rbd_lock(rbd_dev);
3024 if (ret != -EBUSY)
3025 return ret;
3026
3027 /* determine if the current lock holder is still alive */
3028 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3029 if (ret)
3030 return ret;
3031
3032 if (num_lockers == 0)
3033 goto again;
3034
3035 ret = find_watcher(rbd_dev, lockers);
3036 if (ret) {
3037 if (ret > 0)
3038 ret = 0; /* have to request lock */
3039 goto out;
3040 }
3041
3042 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3043 ENTITY_NAME(lockers[0].id.name));
3044
3045 ret = ceph_monc_blacklist_add(&client->monc,
3046 &lockers[0].info.addr);
3047 if (ret) {
3048 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3049 ENTITY_NAME(lockers[0].id.name), ret);
3050 goto out;
3051 }
3052
3053 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3054 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3055 lockers[0].id.cookie,
3056 &lockers[0].id.name);
3057 if (ret && ret != -ENOENT)
3058 goto out;
3059
3060again:
3061 ceph_free_lockers(lockers, num_lockers);
3062 }
3063
3064out:
3065 ceph_free_lockers(lockers, num_lockers);
3066 return ret;
3067}
3068
3069/*
3070 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3071 */
3072static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3073 int *pret)
3074{
3075 enum rbd_lock_state lock_state;
3076
3077 down_read(&rbd_dev->lock_rwsem);
3078 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3079 rbd_dev->lock_state);
3080 if (__rbd_is_lock_owner(rbd_dev)) {
3081 lock_state = rbd_dev->lock_state;
3082 up_read(&rbd_dev->lock_rwsem);
3083 return lock_state;
3084 }
3085
3086 up_read(&rbd_dev->lock_rwsem);
3087 down_write(&rbd_dev->lock_rwsem);
3088 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3089 rbd_dev->lock_state);
3090 if (!__rbd_is_lock_owner(rbd_dev)) {
3091 *pret = rbd_try_lock(rbd_dev);
3092 if (*pret)
3093 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3094 }
3095
3096 lock_state = rbd_dev->lock_state;
3097 up_write(&rbd_dev->lock_rwsem);
3098 return lock_state;
3099}
3100
3101static void rbd_acquire_lock(struct work_struct *work)
3102{
3103 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3104 struct rbd_device, lock_dwork);
3105 enum rbd_lock_state lock_state;
37f13252 3106 int ret = 0;
ed95b21a
ID
3107
3108 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3109again:
3110 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3111 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3112 if (lock_state == RBD_LOCK_STATE_LOCKED)
3113 wake_requests(rbd_dev, true);
3114 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3115 rbd_dev, lock_state, ret);
3116 return;
3117 }
3118
3119 ret = rbd_request_lock(rbd_dev);
3120 if (ret == -ETIMEDOUT) {
3121 goto again; /* treat this as a dead client */
e010dd0a
ID
3122 } else if (ret == -EROFS) {
3123 rbd_warn(rbd_dev, "peer will not release lock");
3124 /*
3125 * If this is rbd_add_acquire_lock(), we want to fail
3126 * immediately -- reuse BLACKLISTED flag. Otherwise we
3127 * want to block.
3128 */
3129 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3130 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3131 /* wake "rbd map --exclusive" process */
3132 wake_requests(rbd_dev, false);
3133 }
ed95b21a
ID
3134 } else if (ret < 0) {
3135 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3136 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3137 RBD_RETRY_DELAY);
3138 } else {
3139 /*
3140 * lock owner acked, but resend if we don't see them
3141 * release the lock
3142 */
3143 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3144 rbd_dev);
3145 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3146 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3147 }
3148}
3149
3150/*
3151 * lock_rwsem must be held for write
3152 */
3153static bool rbd_release_lock(struct rbd_device *rbd_dev)
3154{
3155 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3156 rbd_dev->lock_state);
3157 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3158 return false;
3159
3160 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3161 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3162 /*
ed95b21a 3163 * Ensure that all in-flight IO is flushed.
52bb1f9b 3164 *
ed95b21a
ID
3165 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3166 * may be shared with other devices.
52bb1f9b 3167 */
ed95b21a
ID
3168 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3169 up_read(&rbd_dev->lock_rwsem);
3170
3171 down_write(&rbd_dev->lock_rwsem);
3172 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3173 rbd_dev->lock_state);
3174 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3175 return false;
3176
bbead745
ID
3177 rbd_unlock(rbd_dev);
3178 /*
3179 * Give others a chance to grab the lock - we would re-acquire
3180 * almost immediately if we got new IO during ceph_osdc_sync()
3181 * otherwise. We need to ack our own notifications, so this
3182 * lock_dwork will be requeued from rbd_wait_state_locked()
3183 * after wake_requests() in rbd_handle_released_lock().
3184 */
3185 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3186 return true;
3187}
3188
3189static void rbd_release_lock_work(struct work_struct *work)
3190{
3191 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3192 unlock_work);
3193
3194 down_write(&rbd_dev->lock_rwsem);
3195 rbd_release_lock(rbd_dev);
3196 up_write(&rbd_dev->lock_rwsem);
3197}
3198
3199static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3200 void **p)
3201{
3202 struct rbd_client_id cid = { 0 };
3203
3204 if (struct_v >= 2) {
3205 cid.gid = ceph_decode_64(p);
3206 cid.handle = ceph_decode_64(p);
3207 }
3208
3209 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3210 cid.handle);
3211 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3212 down_write(&rbd_dev->lock_rwsem);
3213 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3214 /*
3215 * we already know that the remote client is
3216 * the owner
3217 */
3218 up_write(&rbd_dev->lock_rwsem);
3219 return;
3220 }
3221
3222 rbd_set_owner_cid(rbd_dev, &cid);
3223 downgrade_write(&rbd_dev->lock_rwsem);
3224 } else {
3225 down_read(&rbd_dev->lock_rwsem);
3226 }
3227
3228 if (!__rbd_is_lock_owner(rbd_dev))
3229 wake_requests(rbd_dev, false);
3230 up_read(&rbd_dev->lock_rwsem);
3231}
3232
3233static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3234 void **p)
3235{
3236 struct rbd_client_id cid = { 0 };
3237
3238 if (struct_v >= 2) {
3239 cid.gid = ceph_decode_64(p);
3240 cid.handle = ceph_decode_64(p);
3241 }
3242
3243 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3244 cid.handle);
3245 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3246 down_write(&rbd_dev->lock_rwsem);
3247 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3248 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3249 __func__, rbd_dev, cid.gid, cid.handle,
3250 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3251 up_write(&rbd_dev->lock_rwsem);
3252 return;
3253 }
3254
3255 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3256 downgrade_write(&rbd_dev->lock_rwsem);
3257 } else {
3258 down_read(&rbd_dev->lock_rwsem);
3259 }
3260
3261 if (!__rbd_is_lock_owner(rbd_dev))
3262 wake_requests(rbd_dev, false);
3263 up_read(&rbd_dev->lock_rwsem);
3264}
3265
3b77faa0
ID
3266/*
3267 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3268 * ResponseMessage is needed.
3269 */
3270static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3271 void **p)
ed95b21a
ID
3272{
3273 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3274 struct rbd_client_id cid = { 0 };
3b77faa0 3275 int result = 1;
ed95b21a
ID
3276
3277 if (struct_v >= 2) {
3278 cid.gid = ceph_decode_64(p);
3279 cid.handle = ceph_decode_64(p);
3280 }
3281
3282 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3283 cid.handle);
3284 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3285 return result;
ed95b21a
ID
3286
3287 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3288 if (__rbd_is_lock_owner(rbd_dev)) {
3289 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3290 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3291 goto out_unlock;
3292
3293 /*
3294 * encode ResponseMessage(0) so the peer can detect
3295 * a missing owner
3296 */
3297 result = 0;
3298
3299 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3300 if (!rbd_dev->opts->exclusive) {
3301 dout("%s rbd_dev %p queueing unlock_work\n",
3302 __func__, rbd_dev);
3303 queue_work(rbd_dev->task_wq,
3304 &rbd_dev->unlock_work);
3305 } else {
3306 /* refuse to release the lock */
3307 result = -EROFS;
3308 }
ed95b21a
ID
3309 }
3310 }
3b77faa0
ID
3311
3312out_unlock:
ed95b21a 3313 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3314 return result;
ed95b21a
ID
3315}
3316
3317static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3318 u64 notify_id, u64 cookie, s32 *result)
3319{
3320 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
08a79102
KS
3321 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3322 int buf_size = sizeof(buf);
ed95b21a
ID
3323 int ret;
3324
3325 if (result) {
3326 void *p = buf;
3327
3328 /* encode ResponseMessage */
3329 ceph_start_encoding(&p, 1, 1,
3330 buf_size - CEPH_ENCODING_START_BLK_LEN);
3331 ceph_encode_32(&p, *result);
3332 } else {
3333 buf_size = 0;
3334 }
b8d70035 3335
922dab61
ID
3336 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3337 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3338 buf, buf_size);
52bb1f9b 3339 if (ret)
ed95b21a
ID
3340 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3341}
3342
3343static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3344 u64 cookie)
3345{
3346 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3347 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3348}
3349
3350static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3351 u64 notify_id, u64 cookie, s32 result)
3352{
3353 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3354 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3355}
3356
3357static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3358 u64 notifier_id, void *data, size_t data_len)
3359{
3360 struct rbd_device *rbd_dev = arg;
3361 void *p = data;
3362 void *const end = p + data_len;
d4c2269b 3363 u8 struct_v = 0;
ed95b21a
ID
3364 u32 len;
3365 u32 notify_op;
3366 int ret;
3367
3368 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3369 __func__, rbd_dev, cookie, notify_id, data_len);
3370 if (data_len) {
3371 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3372 &struct_v, &len);
3373 if (ret) {
3374 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3375 ret);
3376 return;
3377 }
3378
3379 notify_op = ceph_decode_32(&p);
3380 } else {
3381 /* legacy notification for header updates */
3382 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3383 len = 0;
3384 }
3385
3386 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3387 switch (notify_op) {
3388 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3389 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3390 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3391 break;
3392 case RBD_NOTIFY_OP_RELEASED_LOCK:
3393 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3394 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3395 break;
3396 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3397 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3398 if (ret <= 0)
ed95b21a 3399 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3400 cookie, ret);
ed95b21a
ID
3401 else
3402 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3403 break;
3404 case RBD_NOTIFY_OP_HEADER_UPDATE:
3405 ret = rbd_dev_refresh(rbd_dev);
3406 if (ret)
3407 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3408
3409 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3410 break;
3411 default:
3412 if (rbd_is_lock_owner(rbd_dev))
3413 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3414 cookie, -EOPNOTSUPP);
3415 else
3416 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3417 break;
3418 }
b8d70035
AE
3419}
3420
99d16943
ID
3421static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3422
922dab61 3423static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3424{
922dab61 3425 struct rbd_device *rbd_dev = arg;
bb040aa0 3426
922dab61 3427 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3428
ed95b21a
ID
3429 down_write(&rbd_dev->lock_rwsem);
3430 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3431 up_write(&rbd_dev->lock_rwsem);
3432
99d16943
ID
3433 mutex_lock(&rbd_dev->watch_mutex);
3434 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3435 __rbd_unregister_watch(rbd_dev);
3436 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3437
99d16943 3438 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3439 }
99d16943 3440 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3441}
3442
9969ebc5 3443/*
99d16943 3444 * watch_mutex must be locked
9969ebc5 3445 */
99d16943 3446static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3447{
3448 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3449 struct ceph_osd_linger_request *handle;
9969ebc5 3450
922dab61 3451 rbd_assert(!rbd_dev->watch_handle);
99d16943 3452 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3453
922dab61
ID
3454 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3455 &rbd_dev->header_oloc, rbd_watch_cb,
3456 rbd_watch_errcb, rbd_dev);
3457 if (IS_ERR(handle))
3458 return PTR_ERR(handle);
8eb87565 3459
922dab61 3460 rbd_dev->watch_handle = handle;
b30a01f2 3461 return 0;
b30a01f2
ID
3462}
3463
99d16943
ID
3464/*
3465 * watch_mutex must be locked
3466 */
3467static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3468{
922dab61
ID
3469 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3470 int ret;
b30a01f2 3471
99d16943
ID
3472 rbd_assert(rbd_dev->watch_handle);
3473 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3474
922dab61
ID
3475 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3476 if (ret)
3477 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3478
922dab61 3479 rbd_dev->watch_handle = NULL;
c525f036
ID
3480}
3481
99d16943
ID
3482static int rbd_register_watch(struct rbd_device *rbd_dev)
3483{
3484 int ret;
3485
3486 mutex_lock(&rbd_dev->watch_mutex);
3487 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3488 ret = __rbd_register_watch(rbd_dev);
3489 if (ret)
3490 goto out;
3491
3492 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3493 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3494
3495out:
3496 mutex_unlock(&rbd_dev->watch_mutex);
3497 return ret;
3498}
3499
3500static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3501{
99d16943
ID
3502 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3503
ed95b21a
ID
3504 cancel_work_sync(&rbd_dev->acquired_lock_work);
3505 cancel_work_sync(&rbd_dev->released_lock_work);
3506 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3507 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3508}
3509
3510static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3511{
ed95b21a 3512 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3513 cancel_tasks_sync(rbd_dev);
3514
3515 mutex_lock(&rbd_dev->watch_mutex);
3516 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3517 __rbd_unregister_watch(rbd_dev);
3518 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3519 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3520
23edca86 3521 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
811c6688 3522 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3523}
3524
14bb211d
ID
3525/*
3526 * lock_rwsem must be held for write
3527 */
3528static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3529{
3530 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3531 char cookie[32];
3532 int ret;
3533
3534 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3535
3536 format_lock_cookie(rbd_dev, cookie);
3537 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3538 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3539 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3540 RBD_LOCK_TAG, cookie);
3541 if (ret) {
3542 if (ret != -EOPNOTSUPP)
3543 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3544 ret);
3545
3546 /*
3547 * Lock cookie cannot be updated on older OSDs, so do
3548 * a manual release and queue an acquire.
3549 */
3550 if (rbd_release_lock(rbd_dev))
3551 queue_delayed_work(rbd_dev->task_wq,
3552 &rbd_dev->lock_dwork, 0);
3553 } else {
edd8ca80 3554 __rbd_lock(rbd_dev, cookie);
14bb211d
ID
3555 }
3556}
3557
99d16943
ID
3558static void rbd_reregister_watch(struct work_struct *work)
3559{
3560 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3561 struct rbd_device, watch_dwork);
3562 int ret;
3563
3564 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3565
3566 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3567 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3568 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3569 return;
87c0fded 3570 }
99d16943
ID
3571
3572 ret = __rbd_register_watch(rbd_dev);
3573 if (ret) {
3574 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3575 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3576 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3577 wake_requests(rbd_dev, true);
87c0fded 3578 } else {
99d16943
ID
3579 queue_delayed_work(rbd_dev->task_wq,
3580 &rbd_dev->watch_dwork,
3581 RBD_RETRY_DELAY);
87c0fded
ID
3582 }
3583 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3584 return;
99d16943
ID
3585 }
3586
3587 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3588 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3589 mutex_unlock(&rbd_dev->watch_mutex);
3590
14bb211d
ID
3591 down_write(&rbd_dev->lock_rwsem);
3592 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3593 rbd_reacquire_lock(rbd_dev);
3594 up_write(&rbd_dev->lock_rwsem);
3595
99d16943
ID
3596 ret = rbd_dev_refresh(rbd_dev);
3597 if (ret)
f6870cc9 3598 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
99d16943
ID
3599}
3600
36be9a76 3601/*
f40eb349
AE
3602 * Synchronous osd object method call. Returns the number of bytes
3603 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3604 */
3605static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3606 struct ceph_object_id *oid,
3607 struct ceph_object_locator *oloc,
36be9a76 3608 const char *method_name,
4157976b 3609 const void *outbound,
36be9a76 3610 size_t outbound_size,
4157976b 3611 void *inbound,
e2a58ee5 3612 size_t inbound_size)
36be9a76 3613{
ecd4a68a
ID
3614 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3615 struct page *req_page = NULL;
3616 struct page *reply_page;
36be9a76
AE
3617 int ret;
3618
3619 /*
6010a451
AE
3620 * Method calls are ultimately read operations. The result
3621 * should placed into the inbound buffer provided. They
3622 * also supply outbound data--parameters for the object
3623 * method. Currently if this is present it will be a
3624 * snapshot id.
36be9a76 3625 */
ecd4a68a
ID
3626 if (outbound) {
3627 if (outbound_size > PAGE_SIZE)
3628 return -E2BIG;
36be9a76 3629
ecd4a68a
ID
3630 req_page = alloc_page(GFP_KERNEL);
3631 if (!req_page)
3632 return -ENOMEM;
04017e29 3633
ecd4a68a 3634 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3635 }
36be9a76 3636
ecd4a68a
ID
3637 reply_page = alloc_page(GFP_KERNEL);
3638 if (!reply_page) {
3639 if (req_page)
3640 __free_page(req_page);
3641 return -ENOMEM;
3642 }
57385b51 3643
ecd4a68a
ID
3644 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3645 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3646 reply_page, &inbound_size);
3647 if (!ret) {
3648 memcpy(inbound, page_address(reply_page), inbound_size);
3649 ret = inbound_size;
3650 }
36be9a76 3651
ecd4a68a
ID
3652 if (req_page)
3653 __free_page(req_page);
3654 __free_page(reply_page);
36be9a76
AE
3655 return ret;
3656}
3657
ed95b21a
ID
3658/*
3659 * lock_rwsem must be held for read
3660 */
2f18d466 3661static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
ed95b21a
ID
3662{
3663 DEFINE_WAIT(wait);
34f55d0b 3664 unsigned long timeout;
2f18d466
ID
3665 int ret = 0;
3666
3667 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
3668 return -EBLACKLISTED;
3669
3670 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3671 return 0;
3672
3673 if (!may_acquire) {
3674 rbd_warn(rbd_dev, "exclusive lock required");
3675 return -EROFS;
3676 }
ed95b21a
ID
3677
3678 do {
3679 /*
3680 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3681 * and cancel_delayed_work() in wake_requests().
3682 */
3683 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3684 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3685 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3686 TASK_UNINTERRUPTIBLE);
3687 up_read(&rbd_dev->lock_rwsem);
34f55d0b
DY
3688 timeout = schedule_timeout(ceph_timeout_jiffies(
3689 rbd_dev->opts->lock_timeout));
ed95b21a 3690 down_read(&rbd_dev->lock_rwsem);
2f18d466
ID
3691 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3692 ret = -EBLACKLISTED;
3693 break;
3694 }
34f55d0b
DY
3695 if (!timeout) {
3696 rbd_warn(rbd_dev, "timed out waiting for lock");
3697 ret = -ETIMEDOUT;
3698 break;
3699 }
2f18d466 3700 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
87c0fded 3701
ed95b21a 3702 finish_wait(&rbd_dev->lock_waitq, &wait);
2f18d466 3703 return ret;
ed95b21a
ID
3704}
3705
7ad18afa 3706static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 3707{
7ad18afa
CH
3708 struct request *rq = blk_mq_rq_from_pdu(work);
3709 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 3710 struct rbd_img_request *img_request;
4e752f0a 3711 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
3712 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3713 u64 length = blk_rq_bytes(rq);
6d2940c8 3714 enum obj_operation_type op_type;
4e752f0a 3715 u64 mapping_size;
80de1912 3716 bool must_be_locked;
bf0d5f50
AE
3717 int result;
3718
aebf526b
CH
3719 switch (req_op(rq)) {
3720 case REQ_OP_DISCARD:
90e98c52 3721 op_type = OBJ_OP_DISCARD;
aebf526b 3722 break;
6484cbe9
ID
3723 case REQ_OP_WRITE_ZEROES:
3724 op_type = OBJ_OP_ZEROOUT;
3725 break;
aebf526b 3726 case REQ_OP_WRITE:
6d2940c8 3727 op_type = OBJ_OP_WRITE;
aebf526b
CH
3728 break;
3729 case REQ_OP_READ:
6d2940c8 3730 op_type = OBJ_OP_READ;
aebf526b
CH
3731 break;
3732 default:
3733 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3734 result = -EIO;
3735 goto err;
3736 }
6d2940c8 3737
bc1ecc65 3738 /* Ignore/skip any zero-length requests */
bf0d5f50 3739
bc1ecc65
ID
3740 if (!length) {
3741 dout("%s: zero-length request\n", __func__);
3742 result = 0;
3743 goto err_rq;
3744 }
bf0d5f50 3745
9568c93e
ID
3746 rbd_assert(op_type == OBJ_OP_READ ||
3747 rbd_dev->spec->snap_id == CEPH_NOSNAP);
4dda41d3 3748
bc1ecc65
ID
3749 /*
3750 * Quit early if the mapped snapshot no longer exists. It's
3751 * still possible the snapshot will have disappeared by the
3752 * time our request arrives at the osd, but there's no sense in
3753 * sending it if we already know.
3754 */
3755 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3756 dout("request for non-existent snapshot");
3757 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3758 result = -ENXIO;
3759 goto err_rq;
3760 }
4dda41d3 3761
bc1ecc65
ID
3762 if (offset && length > U64_MAX - offset + 1) {
3763 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3764 length);
3765 result = -EINVAL;
3766 goto err_rq; /* Shouldn't happen */
3767 }
4dda41d3 3768
7ad18afa
CH
3769 blk_mq_start_request(rq);
3770
4e752f0a
JD
3771 down_read(&rbd_dev->header_rwsem);
3772 mapping_size = rbd_dev->mapping.size;
6d2940c8 3773 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
3774 snapc = rbd_dev->header.snapc;
3775 ceph_get_snap_context(snapc);
3776 }
3777 up_read(&rbd_dev->header_rwsem);
3778
3779 if (offset + length > mapping_size) {
bc1ecc65 3780 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 3781 length, mapping_size);
bc1ecc65
ID
3782 result = -EIO;
3783 goto err_rq;
3784 }
bf0d5f50 3785
f9bebd58
ID
3786 must_be_locked =
3787 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3788 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
3789 if (must_be_locked) {
3790 down_read(&rbd_dev->lock_rwsem);
2f18d466
ID
3791 result = rbd_wait_state_locked(rbd_dev,
3792 !rbd_dev->opts->exclusive);
3793 if (result)
87c0fded 3794 goto err_unlock;
ed95b21a
ID
3795 }
3796
dfd9875f 3797 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
bc1ecc65
ID
3798 if (!img_request) {
3799 result = -ENOMEM;
ed95b21a 3800 goto err_unlock;
bc1ecc65
ID
3801 }
3802 img_request->rq = rq;
70b16db8 3803 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 3804
6484cbe9 3805 if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
5a237819 3806 result = rbd_img_fill_nodata(img_request, offset, length);
90e98c52 3807 else
5a237819
ID
3808 result = rbd_img_fill_from_bio(img_request, offset, length,
3809 rq->bio);
0c93e1b7 3810 if (result || !img_request->pending_count)
bc1ecc65 3811 goto err_img_request;
bf0d5f50 3812
efbd1a11 3813 rbd_img_request_submit(img_request);
ed95b21a
ID
3814 if (must_be_locked)
3815 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 3816 return;
bf0d5f50 3817
bc1ecc65
ID
3818err_img_request:
3819 rbd_img_request_put(img_request);
ed95b21a
ID
3820err_unlock:
3821 if (must_be_locked)
3822 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
3823err_rq:
3824 if (result)
3825 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 3826 obj_op_name(op_type), length, offset, result);
e96a650a 3827 ceph_put_snap_context(snapc);
7ad18afa 3828err:
2a842aca 3829 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 3830}
bf0d5f50 3831
fc17b653 3832static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 3833 const struct blk_mq_queue_data *bd)
bc1ecc65 3834{
7ad18afa
CH
3835 struct request *rq = bd->rq;
3836 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 3837
7ad18afa 3838 queue_work(rbd_wq, work);
fc17b653 3839 return BLK_STS_OK;
bf0d5f50
AE
3840}
3841
602adf40
YS
3842static void rbd_free_disk(struct rbd_device *rbd_dev)
3843{
5769ed0c
ID
3844 blk_cleanup_queue(rbd_dev->disk->queue);
3845 blk_mq_free_tag_set(&rbd_dev->tag_set);
3846 put_disk(rbd_dev->disk);
a0cab924 3847 rbd_dev->disk = NULL;
602adf40
YS
3848}
3849
788e2df3 3850static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
3851 struct ceph_object_id *oid,
3852 struct ceph_object_locator *oloc,
3853 void *buf, int buf_len)
788e2df3
AE
3854
3855{
fe5478e0
ID
3856 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3857 struct ceph_osd_request *req;
3858 struct page **pages;
3859 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
3860 int ret;
3861
fe5478e0
ID
3862 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3863 if (!req)
3864 return -ENOMEM;
788e2df3 3865
fe5478e0
ID
3866 ceph_oid_copy(&req->r_base_oid, oid);
3867 ceph_oloc_copy(&req->r_base_oloc, oloc);
3868 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 3869
fe5478e0
ID
3870 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3871 if (IS_ERR(pages)) {
3872 ret = PTR_ERR(pages);
3873 goto out_req;
3874 }
1ceae7ef 3875
fe5478e0
ID
3876 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3877 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3878 true);
3879
26f887e0
ID
3880 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3881 if (ret)
3882 goto out_req;
3883
fe5478e0
ID
3884 ceph_osdc_start_request(osdc, req, false);
3885 ret = ceph_osdc_wait_request(osdc, req);
3886 if (ret >= 0)
3887 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 3888
fe5478e0
ID
3889out_req:
3890 ceph_osdc_put_request(req);
788e2df3
AE
3891 return ret;
3892}
3893
602adf40 3894/*
662518b1
AE
3895 * Read the complete header for the given rbd device. On successful
3896 * return, the rbd_dev->header field will contain up-to-date
3897 * information about the image.
602adf40 3898 */
99a41ebc 3899static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3900{
4156d998 3901 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3902 u32 snap_count = 0;
4156d998
AE
3903 u64 names_size = 0;
3904 u32 want_count;
3905 int ret;
602adf40 3906
00f1f36f 3907 /*
4156d998
AE
3908 * The complete header will include an array of its 64-bit
3909 * snapshot ids, followed by the names of those snapshots as
3910 * a contiguous block of NUL-terminated strings. Note that
3911 * the number of snapshots could change by the time we read
3912 * it in, in which case we re-read it.
00f1f36f 3913 */
4156d998
AE
3914 do {
3915 size_t size;
3916
3917 kfree(ondisk);
3918
3919 size = sizeof (*ondisk);
3920 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3921 size += names_size;
3922 ondisk = kmalloc(size, GFP_KERNEL);
3923 if (!ondisk)
662518b1 3924 return -ENOMEM;
4156d998 3925
fe5478e0
ID
3926 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3927 &rbd_dev->header_oloc, ondisk, size);
4156d998 3928 if (ret < 0)
662518b1 3929 goto out;
c0cd10db 3930 if ((size_t)ret < size) {
4156d998 3931 ret = -ENXIO;
06ecc6cb
AE
3932 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3933 size, ret);
662518b1 3934 goto out;
4156d998
AE
3935 }
3936 if (!rbd_dev_ondisk_valid(ondisk)) {
3937 ret = -ENXIO;
06ecc6cb 3938 rbd_warn(rbd_dev, "invalid header");
662518b1 3939 goto out;
81e759fb 3940 }
602adf40 3941
4156d998
AE
3942 names_size = le64_to_cpu(ondisk->snap_names_len);
3943 want_count = snap_count;
3944 snap_count = le32_to_cpu(ondisk->snap_count);
3945 } while (snap_count != want_count);
00f1f36f 3946
662518b1
AE
3947 ret = rbd_header_from_disk(rbd_dev, ondisk);
3948out:
4156d998
AE
3949 kfree(ondisk);
3950
3951 return ret;
602adf40
YS
3952}
3953
15228ede
AE
3954/*
3955 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3956 * has disappeared from the (just updated) snapshot context.
3957 */
3958static void rbd_exists_validate(struct rbd_device *rbd_dev)
3959{
3960 u64 snap_id;
3961
3962 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3963 return;
3964
3965 snap_id = rbd_dev->spec->snap_id;
3966 if (snap_id == CEPH_NOSNAP)
3967 return;
3968
3969 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3970 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3971}
3972
9875201e
JD
3973static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3974{
3975 sector_t size;
9875201e
JD
3976
3977 /*
811c6688
ID
3978 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3979 * try to update its size. If REMOVING is set, updating size
3980 * is just useless work since the device can't be opened.
9875201e 3981 */
811c6688
ID
3982 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3983 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
3984 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3985 dout("setting size to %llu sectors", (unsigned long long)size);
3986 set_capacity(rbd_dev->disk, size);
3987 revalidate_disk(rbd_dev->disk);
3988 }
3989}
3990
cc4a38bd 3991static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3992{
e627db08 3993 u64 mapping_size;
1fe5e993
AE
3994 int ret;
3995
cfbf6377 3996 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3997 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
3998
3999 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4000 if (ret)
73e39e4d 4001 goto out;
15228ede 4002
e8f59b59
ID
4003 /*
4004 * If there is a parent, see if it has disappeared due to the
4005 * mapped image getting flattened.
4006 */
4007 if (rbd_dev->parent) {
4008 ret = rbd_dev_v2_parent_info(rbd_dev);
4009 if (ret)
73e39e4d 4010 goto out;
e8f59b59
ID
4011 }
4012
5ff1108c 4013 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4014 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4015 } else {
4016 /* validate mapped snapshot's EXISTS flag */
4017 rbd_exists_validate(rbd_dev);
4018 }
15228ede 4019
73e39e4d 4020out:
cfbf6377 4021 up_write(&rbd_dev->header_rwsem);
73e39e4d 4022 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4023 rbd_dev_update_size(rbd_dev);
1fe5e993 4024
73e39e4d 4025 return ret;
1fe5e993
AE
4026}
4027
d6296d39
CH
4028static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4029 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
4030{
4031 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4032
4033 INIT_WORK(work, rbd_queue_workfn);
4034 return 0;
4035}
4036
f363b089 4037static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 4038 .queue_rq = rbd_queue_rq,
7ad18afa
CH
4039 .init_request = rbd_init_request,
4040};
4041
602adf40
YS
4042static int rbd_init_disk(struct rbd_device *rbd_dev)
4043{
4044 struct gendisk *disk;
4045 struct request_queue *q;
420efbdf
ID
4046 unsigned int objset_bytes =
4047 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
7ad18afa 4048 int err;
602adf40 4049
602adf40 4050 /* create gendisk info */
7e513d43
ID
4051 disk = alloc_disk(single_major ?
4052 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4053 RBD_MINORS_PER_MAJOR);
602adf40 4054 if (!disk)
1fcdb8aa 4055 return -ENOMEM;
602adf40 4056
f0f8cef5 4057 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4058 rbd_dev->dev_id);
602adf40 4059 disk->major = rbd_dev->major;
dd82fff1 4060 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4061 if (single_major)
4062 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4063 disk->fops = &rbd_bd_ops;
4064 disk->private_data = rbd_dev;
4065
7ad18afa
CH
4066 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4067 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4068 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4069 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4070 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4071 rbd_dev->tag_set.nr_hw_queues = 1;
4072 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4073
4074 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4075 if (err)
602adf40 4076 goto out_disk;
029bcbd8 4077
7ad18afa
CH
4078 q = blk_mq_init_queue(&rbd_dev->tag_set);
4079 if (IS_ERR(q)) {
4080 err = PTR_ERR(q);
4081 goto out_tag_set;
4082 }
4083
8b904b5b 4084 blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
d8a2c89c 4085 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4086
420efbdf 4087 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
0d9fde4f 4088 q->limits.max_sectors = queue_max_hw_sectors(q);
21acdf45 4089 blk_queue_max_segments(q, USHRT_MAX);
24f1df60 4090 blk_queue_max_segment_size(q, UINT_MAX);
420efbdf
ID
4091 blk_queue_io_min(q, objset_bytes);
4092 blk_queue_io_opt(q, objset_bytes);
029bcbd8 4093
d9360540
ID
4094 if (rbd_dev->opts->trim) {
4095 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4096 q->limits.discard_granularity = objset_bytes;
4097 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4098 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4099 }
90e98c52 4100
bae818ee 4101 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 4102 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 4103
5769ed0c
ID
4104 /*
4105 * disk_release() expects a queue ref from add_disk() and will
4106 * put it. Hold an extra ref until add_disk() is called.
4107 */
4108 WARN_ON(!blk_get_queue(q));
602adf40 4109 disk->queue = q;
602adf40
YS
4110 q->queuedata = rbd_dev;
4111
4112 rbd_dev->disk = disk;
602adf40 4113
602adf40 4114 return 0;
7ad18afa
CH
4115out_tag_set:
4116 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4117out_disk:
4118 put_disk(disk);
7ad18afa 4119 return err;
602adf40
YS
4120}
4121
dfc5606d
YS
4122/*
4123 sysfs
4124*/
4125
593a9e7b
AE
4126static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4127{
4128 return container_of(dev, struct rbd_device, dev);
4129}
4130
dfc5606d
YS
4131static ssize_t rbd_size_show(struct device *dev,
4132 struct device_attribute *attr, char *buf)
4133{
593a9e7b 4134 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4135
fc71d833
AE
4136 return sprintf(buf, "%llu\n",
4137 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4138}
4139
34b13184
AE
4140/*
4141 * Note this shows the features for whatever's mapped, which is not
4142 * necessarily the base image.
4143 */
4144static ssize_t rbd_features_show(struct device *dev,
4145 struct device_attribute *attr, char *buf)
4146{
4147 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4148
4149 return sprintf(buf, "0x%016llx\n",
fc71d833 4150 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4151}
4152
dfc5606d
YS
4153static ssize_t rbd_major_show(struct device *dev,
4154 struct device_attribute *attr, char *buf)
4155{
593a9e7b 4156 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4157
fc71d833
AE
4158 if (rbd_dev->major)
4159 return sprintf(buf, "%d\n", rbd_dev->major);
4160
4161 return sprintf(buf, "(none)\n");
dd82fff1
ID
4162}
4163
4164static ssize_t rbd_minor_show(struct device *dev,
4165 struct device_attribute *attr, char *buf)
4166{
4167 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4168
dd82fff1 4169 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4170}
4171
005a07bf
ID
4172static ssize_t rbd_client_addr_show(struct device *dev,
4173 struct device_attribute *attr, char *buf)
4174{
4175 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4176 struct ceph_entity_addr *client_addr =
4177 ceph_client_addr(rbd_dev->rbd_client->client);
4178
4179 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4180 le32_to_cpu(client_addr->nonce));
4181}
4182
dfc5606d
YS
4183static ssize_t rbd_client_id_show(struct device *dev,
4184 struct device_attribute *attr, char *buf)
602adf40 4185{
593a9e7b 4186 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4187
1dbb4399 4188 return sprintf(buf, "client%lld\n",
033268a5 4189 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4190}
4191
267fb90b
MC
4192static ssize_t rbd_cluster_fsid_show(struct device *dev,
4193 struct device_attribute *attr, char *buf)
4194{
4195 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4196
4197 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4198}
4199
0d6d1e9c
MC
4200static ssize_t rbd_config_info_show(struct device *dev,
4201 struct device_attribute *attr, char *buf)
4202{
4203 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4204
4205 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4206}
4207
dfc5606d
YS
4208static ssize_t rbd_pool_show(struct device *dev,
4209 struct device_attribute *attr, char *buf)
602adf40 4210{
593a9e7b 4211 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4212
0d7dbfce 4213 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4214}
4215
9bb2f334
AE
4216static ssize_t rbd_pool_id_show(struct device *dev,
4217 struct device_attribute *attr, char *buf)
4218{
4219 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4220
0d7dbfce 4221 return sprintf(buf, "%llu\n",
fc71d833 4222 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4223}
4224
b26c047b
ID
4225static ssize_t rbd_pool_ns_show(struct device *dev,
4226 struct device_attribute *attr, char *buf)
4227{
4228 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4229
4230 return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
4231}
4232
dfc5606d
YS
4233static ssize_t rbd_name_show(struct device *dev,
4234 struct device_attribute *attr, char *buf)
4235{
593a9e7b 4236 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4237
a92ffdf8
AE
4238 if (rbd_dev->spec->image_name)
4239 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4240
4241 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4242}
4243
589d30e0
AE
4244static ssize_t rbd_image_id_show(struct device *dev,
4245 struct device_attribute *attr, char *buf)
4246{
4247 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4248
0d7dbfce 4249 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4250}
4251
34b13184
AE
4252/*
4253 * Shows the name of the currently-mapped snapshot (or
4254 * RBD_SNAP_HEAD_NAME for the base image).
4255 */
dfc5606d
YS
4256static ssize_t rbd_snap_show(struct device *dev,
4257 struct device_attribute *attr,
4258 char *buf)
4259{
593a9e7b 4260 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4261
0d7dbfce 4262 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4263}
4264
92a58671
MC
4265static ssize_t rbd_snap_id_show(struct device *dev,
4266 struct device_attribute *attr, char *buf)
4267{
4268 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4269
4270 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4271}
4272
86b00e0d 4273/*
ff96128f
ID
4274 * For a v2 image, shows the chain of parent images, separated by empty
4275 * lines. For v1 images or if there is no parent, shows "(no parent
4276 * image)".
86b00e0d
AE
4277 */
4278static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4279 struct device_attribute *attr,
4280 char *buf)
86b00e0d
AE
4281{
4282 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4283 ssize_t count = 0;
86b00e0d 4284
ff96128f 4285 if (!rbd_dev->parent)
86b00e0d
AE
4286 return sprintf(buf, "(no parent image)\n");
4287
ff96128f
ID
4288 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4289 struct rbd_spec *spec = rbd_dev->parent_spec;
4290
4291 count += sprintf(&buf[count], "%s"
4292 "pool_id %llu\npool_name %s\n"
e92c0eaf 4293 "pool_ns %s\n"
ff96128f
ID
4294 "image_id %s\nimage_name %s\n"
4295 "snap_id %llu\nsnap_name %s\n"
4296 "overlap %llu\n",
4297 !count ? "" : "\n", /* first? */
4298 spec->pool_id, spec->pool_name,
e92c0eaf 4299 spec->pool_ns ?: "",
ff96128f
ID
4300 spec->image_id, spec->image_name ?: "(unknown)",
4301 spec->snap_id, spec->snap_name,
4302 rbd_dev->parent_overlap);
4303 }
4304
4305 return count;
86b00e0d
AE
4306}
4307
dfc5606d
YS
4308static ssize_t rbd_image_refresh(struct device *dev,
4309 struct device_attribute *attr,
4310 const char *buf,
4311 size_t size)
4312{
593a9e7b 4313 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4314 int ret;
602adf40 4315
cc4a38bd 4316 ret = rbd_dev_refresh(rbd_dev);
e627db08 4317 if (ret)
52bb1f9b 4318 return ret;
b813623a 4319
52bb1f9b 4320 return size;
dfc5606d 4321}
602adf40 4322
5657a819
JP
4323static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
4324static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
4325static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
4326static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
4327static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
4328static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
4329static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
4330static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
4331static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
4332static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
b26c047b 4333static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5657a819
JP
4334static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
4335static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
4336static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
4337static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
4338static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
4339static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
dfc5606d
YS
4340
4341static struct attribute *rbd_attrs[] = {
4342 &dev_attr_size.attr,
34b13184 4343 &dev_attr_features.attr,
dfc5606d 4344 &dev_attr_major.attr,
dd82fff1 4345 &dev_attr_minor.attr,
005a07bf 4346 &dev_attr_client_addr.attr,
dfc5606d 4347 &dev_attr_client_id.attr,
267fb90b 4348 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4349 &dev_attr_config_info.attr,
dfc5606d 4350 &dev_attr_pool.attr,
9bb2f334 4351 &dev_attr_pool_id.attr,
b26c047b 4352 &dev_attr_pool_ns.attr,
dfc5606d 4353 &dev_attr_name.attr,
589d30e0 4354 &dev_attr_image_id.attr,
dfc5606d 4355 &dev_attr_current_snap.attr,
92a58671 4356 &dev_attr_snap_id.attr,
86b00e0d 4357 &dev_attr_parent.attr,
dfc5606d 4358 &dev_attr_refresh.attr,
dfc5606d
YS
4359 NULL
4360};
4361
4362static struct attribute_group rbd_attr_group = {
4363 .attrs = rbd_attrs,
4364};
4365
4366static const struct attribute_group *rbd_attr_groups[] = {
4367 &rbd_attr_group,
4368 NULL
4369};
4370
6cac4695 4371static void rbd_dev_release(struct device *dev);
dfc5606d 4372
b9942bc9 4373static const struct device_type rbd_device_type = {
dfc5606d
YS
4374 .name = "rbd",
4375 .groups = rbd_attr_groups,
6cac4695 4376 .release = rbd_dev_release,
dfc5606d
YS
4377};
4378
8b8fb99c
AE
4379static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4380{
4381 kref_get(&spec->kref);
4382
4383 return spec;
4384}
4385
4386static void rbd_spec_free(struct kref *kref);
4387static void rbd_spec_put(struct rbd_spec *spec)
4388{
4389 if (spec)
4390 kref_put(&spec->kref, rbd_spec_free);
4391}
4392
4393static struct rbd_spec *rbd_spec_alloc(void)
4394{
4395 struct rbd_spec *spec;
4396
4397 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4398 if (!spec)
4399 return NULL;
04077599
ID
4400
4401 spec->pool_id = CEPH_NOPOOL;
4402 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4403 kref_init(&spec->kref);
4404
8b8fb99c
AE
4405 return spec;
4406}
4407
4408static void rbd_spec_free(struct kref *kref)
4409{
4410 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4411
4412 kfree(spec->pool_name);
b26c047b 4413 kfree(spec->pool_ns);
8b8fb99c
AE
4414 kfree(spec->image_id);
4415 kfree(spec->image_name);
4416 kfree(spec->snap_name);
4417 kfree(spec);
4418}
4419
1643dfa4 4420static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4421{
99d16943 4422 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4423 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4424
c41d13a3 4425 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4426 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4427 kfree(rbd_dev->config_info);
c41d13a3 4428
dd5ac32d
ID
4429 rbd_put_client(rbd_dev->rbd_client);
4430 rbd_spec_put(rbd_dev->spec);
4431 kfree(rbd_dev->opts);
4432 kfree(rbd_dev);
1643dfa4
ID
4433}
4434
4435static void rbd_dev_release(struct device *dev)
4436{
4437 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4438 bool need_put = !!rbd_dev->opts;
4439
4440 if (need_put) {
4441 destroy_workqueue(rbd_dev->task_wq);
4442 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4443 }
4444
4445 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4446
4447 /*
4448 * This is racy, but way better than putting module outside of
4449 * the release callback. The race window is pretty small, so
4450 * doing something similar to dm (dm-builtin.c) is overkill.
4451 */
4452 if (need_put)
4453 module_put(THIS_MODULE);
4454}
4455
1643dfa4
ID
4456static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4457 struct rbd_spec *spec)
c53d5893
AE
4458{
4459 struct rbd_device *rbd_dev;
4460
1643dfa4 4461 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4462 if (!rbd_dev)
4463 return NULL;
4464
4465 spin_lock_init(&rbd_dev->lock);
4466 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4467 init_rwsem(&rbd_dev->header_rwsem);
4468
7e97332e 4469 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4470 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4471 rbd_dev->header_oloc.pool = spec->pool_id;
b26c047b
ID
4472 if (spec->pool_ns) {
4473 WARN_ON(!*spec->pool_ns);
4474 rbd_dev->header_oloc.pool_ns =
4475 ceph_find_or_create_string(spec->pool_ns,
4476 strlen(spec->pool_ns));
4477 }
c41d13a3 4478
99d16943
ID
4479 mutex_init(&rbd_dev->watch_mutex);
4480 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4481 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4482
ed95b21a
ID
4483 init_rwsem(&rbd_dev->lock_rwsem);
4484 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4485 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4486 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4487 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4488 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4489 init_waitqueue_head(&rbd_dev->lock_waitq);
4490
dd5ac32d
ID
4491 rbd_dev->dev.bus = &rbd_bus_type;
4492 rbd_dev->dev.type = &rbd_device_type;
4493 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4494 device_initialize(&rbd_dev->dev);
4495
c53d5893 4496 rbd_dev->rbd_client = rbdc;
d147543d 4497 rbd_dev->spec = spec;
0903e875 4498
1643dfa4
ID
4499 return rbd_dev;
4500}
4501
4502/*
4503 * Create a mapping rbd_dev.
4504 */
4505static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4506 struct rbd_spec *spec,
4507 struct rbd_options *opts)
4508{
4509 struct rbd_device *rbd_dev;
4510
4511 rbd_dev = __rbd_dev_create(rbdc, spec);
4512 if (!rbd_dev)
4513 return NULL;
4514
4515 rbd_dev->opts = opts;
4516
4517 /* get an id and fill in device name */
4518 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4519 minor_to_rbd_dev_id(1 << MINORBITS),
4520 GFP_KERNEL);
4521 if (rbd_dev->dev_id < 0)
4522 goto fail_rbd_dev;
4523
4524 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4525 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4526 rbd_dev->name);
4527 if (!rbd_dev->task_wq)
4528 goto fail_dev_id;
dd5ac32d 4529
1643dfa4
ID
4530 /* we have a ref from do_rbd_add() */
4531 __module_get(THIS_MODULE);
dd5ac32d 4532
1643dfa4 4533 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4534 return rbd_dev;
1643dfa4
ID
4535
4536fail_dev_id:
4537 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4538fail_rbd_dev:
4539 rbd_dev_free(rbd_dev);
4540 return NULL;
c53d5893
AE
4541}
4542
4543static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4544{
dd5ac32d
ID
4545 if (rbd_dev)
4546 put_device(&rbd_dev->dev);
c53d5893
AE
4547}
4548
9d475de5
AE
4549/*
4550 * Get the size and object order for an image snapshot, or if
4551 * snap_id is CEPH_NOSNAP, gets this information for the base
4552 * image.
4553 */
4554static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4555 u8 *order, u64 *snap_size)
4556{
4557 __le64 snapid = cpu_to_le64(snap_id);
4558 int ret;
4559 struct {
4560 u8 order;
4561 __le64 size;
4562 } __attribute__ ((packed)) size_buf = { 0 };
4563
ecd4a68a
ID
4564 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4565 &rbd_dev->header_oloc, "get_size",
4566 &snapid, sizeof(snapid),
4567 &size_buf, sizeof(size_buf));
36be9a76 4568 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4569 if (ret < 0)
4570 return ret;
57385b51
AE
4571 if (ret < sizeof (size_buf))
4572 return -ERANGE;
9d475de5 4573
c3545579 4574 if (order) {
c86f86e9 4575 *order = size_buf.order;
c3545579
JD
4576 dout(" order %u", (unsigned int)*order);
4577 }
9d475de5
AE
4578 *snap_size = le64_to_cpu(size_buf.size);
4579
c3545579
JD
4580 dout(" snap_id 0x%016llx snap_size = %llu\n",
4581 (unsigned long long)snap_id,
57385b51 4582 (unsigned long long)*snap_size);
9d475de5
AE
4583
4584 return 0;
4585}
4586
4587static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4588{
4589 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4590 &rbd_dev->header.obj_order,
4591 &rbd_dev->header.image_size);
4592}
4593
1e130199
AE
4594static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4595{
4596 void *reply_buf;
4597 int ret;
4598 void *p;
4599
4600 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4601 if (!reply_buf)
4602 return -ENOMEM;
4603
ecd4a68a
ID
4604 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4605 &rbd_dev->header_oloc, "get_object_prefix",
4606 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4607 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4608 if (ret < 0)
4609 goto out;
4610
4611 p = reply_buf;
4612 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4613 p + ret, NULL, GFP_NOIO);
4614 ret = 0;
1e130199
AE
4615
4616 if (IS_ERR(rbd_dev->header.object_prefix)) {
4617 ret = PTR_ERR(rbd_dev->header.object_prefix);
4618 rbd_dev->header.object_prefix = NULL;
4619 } else {
4620 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4621 }
1e130199
AE
4622out:
4623 kfree(reply_buf);
4624
4625 return ret;
4626}
4627
b1b5402a
AE
4628static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4629 u64 *snap_features)
4630{
4631 __le64 snapid = cpu_to_le64(snap_id);
4632 struct {
4633 __le64 features;
4634 __le64 incompat;
4157976b 4635 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4636 u64 unsup;
b1b5402a
AE
4637 int ret;
4638
ecd4a68a
ID
4639 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4640 &rbd_dev->header_oloc, "get_features",
4641 &snapid, sizeof(snapid),
4642 &features_buf, sizeof(features_buf));
36be9a76 4643 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4644 if (ret < 0)
4645 return ret;
57385b51
AE
4646 if (ret < sizeof (features_buf))
4647 return -ERANGE;
d889140c 4648
d3767f0f
ID
4649 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4650 if (unsup) {
4651 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4652 unsup);
b8f5c6ed 4653 return -ENXIO;
d3767f0f 4654 }
d889140c 4655
b1b5402a
AE
4656 *snap_features = le64_to_cpu(features_buf.features);
4657
4658 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4659 (unsigned long long)snap_id,
4660 (unsigned long long)*snap_features,
4661 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4662
4663 return 0;
4664}
4665
4666static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4667{
4668 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4669 &rbd_dev->header.features);
4670}
4671
eb3b2d6b
ID
4672struct parent_image_info {
4673 u64 pool_id;
e92c0eaf 4674 const char *pool_ns;
eb3b2d6b
ID
4675 const char *image_id;
4676 u64 snap_id;
4677
e92c0eaf 4678 bool has_overlap;
eb3b2d6b
ID
4679 u64 overlap;
4680};
4681
e92c0eaf
ID
4682/*
4683 * The caller is responsible for @pii.
4684 */
4685static int decode_parent_image_spec(void **p, void *end,
4686 struct parent_image_info *pii)
4687{
4688 u8 struct_v;
4689 u32 struct_len;
4690 int ret;
4691
4692 ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
4693 &struct_v, &struct_len);
4694 if (ret)
4695 return ret;
4696
4697 ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
4698 pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
4699 if (IS_ERR(pii->pool_ns)) {
4700 ret = PTR_ERR(pii->pool_ns);
4701 pii->pool_ns = NULL;
4702 return ret;
4703 }
4704 pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
4705 if (IS_ERR(pii->image_id)) {
4706 ret = PTR_ERR(pii->image_id);
4707 pii->image_id = NULL;
4708 return ret;
4709 }
4710 ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
4711 return 0;
4712
4713e_inval:
4714 return -EINVAL;
4715}
4716
4717static int __get_parent_info(struct rbd_device *rbd_dev,
4718 struct page *req_page,
4719 struct page *reply_page,
4720 struct parent_image_info *pii)
4721{
4722 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4723 size_t reply_len = PAGE_SIZE;
4724 void *p, *end;
4725 int ret;
4726
4727 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4728 "rbd", "parent_get", CEPH_OSD_FLAG_READ,
4729 req_page, sizeof(u64), reply_page, &reply_len);
4730 if (ret)
4731 return ret == -EOPNOTSUPP ? 1 : ret;
4732
4733 p = page_address(reply_page);
4734 end = p + reply_len;
4735 ret = decode_parent_image_spec(&p, end, pii);
4736 if (ret)
4737 return ret;
4738
4739 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4740 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
4741 req_page, sizeof(u64), reply_page, &reply_len);
4742 if (ret)
4743 return ret;
4744
4745 p = page_address(reply_page);
4746 end = p + reply_len;
4747 ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
4748 if (pii->has_overlap)
4749 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
4750
4751 return 0;
4752
4753e_inval:
4754 return -EINVAL;
4755}
4756
eb3b2d6b
ID
4757/*
4758 * The caller is responsible for @pii.
4759 */
4760static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
4761 struct page *req_page,
4762 struct page *reply_page,
4763 struct parent_image_info *pii)
4764{
4765 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4766 size_t reply_len = PAGE_SIZE;
4767 void *p, *end;
4768 int ret;
4769
4770 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4771 "rbd", "get_parent", CEPH_OSD_FLAG_READ,
4772 req_page, sizeof(u64), reply_page, &reply_len);
4773 if (ret)
4774 return ret;
4775
4776 p = page_address(reply_page);
4777 end = p + reply_len;
4778 ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
4779 pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4780 if (IS_ERR(pii->image_id)) {
4781 ret = PTR_ERR(pii->image_id);
4782 pii->image_id = NULL;
4783 return ret;
4784 }
4785 ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
e92c0eaf 4786 pii->has_overlap = true;
eb3b2d6b
ID
4787 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
4788
4789 return 0;
4790
4791e_inval:
4792 return -EINVAL;
4793}
4794
4795static int get_parent_info(struct rbd_device *rbd_dev,
4796 struct parent_image_info *pii)
4797{
4798 struct page *req_page, *reply_page;
4799 void *p;
4800 int ret;
4801
4802 req_page = alloc_page(GFP_KERNEL);
4803 if (!req_page)
4804 return -ENOMEM;
4805
4806 reply_page = alloc_page(GFP_KERNEL);
4807 if (!reply_page) {
4808 __free_page(req_page);
4809 return -ENOMEM;
4810 }
4811
4812 p = page_address(req_page);
4813 ceph_encode_64(&p, rbd_dev->spec->snap_id);
e92c0eaf
ID
4814 ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
4815 if (ret > 0)
4816 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
4817 pii);
eb3b2d6b
ID
4818
4819 __free_page(req_page);
4820 __free_page(reply_page);
4821 return ret;
4822}
4823
86b00e0d
AE
4824static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4825{
4826 struct rbd_spec *parent_spec;
eb3b2d6b 4827 struct parent_image_info pii = { 0 };
86b00e0d
AE
4828 int ret;
4829
4830 parent_spec = rbd_spec_alloc();
4831 if (!parent_spec)
4832 return -ENOMEM;
4833
eb3b2d6b
ID
4834 ret = get_parent_info(rbd_dev, &pii);
4835 if (ret)
86b00e0d 4836 goto out_err;
86b00e0d 4837
e92c0eaf
ID
4838 dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
4839 __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
4840 pii.has_overlap, pii.overlap);
86b00e0d 4841
e92c0eaf 4842 if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
392a9dad
AE
4843 /*
4844 * Either the parent never existed, or we have
4845 * record of it but the image got flattened so it no
4846 * longer has a parent. When the parent of a
4847 * layered image disappears we immediately set the
4848 * overlap to 0. The effect of this is that all new
4849 * requests will be treated as if the image had no
4850 * parent.
e92c0eaf
ID
4851 *
4852 * If !pii.has_overlap, the parent image spec is not
4853 * applicable. It's there to avoid duplication in each
4854 * snapshot record.
392a9dad
AE
4855 */
4856 if (rbd_dev->parent_overlap) {
4857 rbd_dev->parent_overlap = 0;
392a9dad
AE
4858 rbd_dev_parent_put(rbd_dev);
4859 pr_info("%s: clone image has been flattened\n",
4860 rbd_dev->disk->disk_name);
4861 }
4862
86b00e0d 4863 goto out; /* No parent? No problem. */
392a9dad 4864 }
86b00e0d 4865
0903e875
AE
4866 /* The ceph file layout needs to fit pool id in 32 bits */
4867
4868 ret = -EIO;
eb3b2d6b 4869 if (pii.pool_id > (u64)U32_MAX) {
9584d508 4870 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
eb3b2d6b 4871 (unsigned long long)pii.pool_id, U32_MAX);
86b00e0d
AE
4872 goto out_err;
4873 }
86b00e0d 4874
3b5cf2a2
AE
4875 /*
4876 * The parent won't change (except when the clone is
4877 * flattened, already handled that). So we only need to
4878 * record the parent spec we have not already done so.
4879 */
4880 if (!rbd_dev->parent_spec) {
eb3b2d6b 4881 parent_spec->pool_id = pii.pool_id;
e92c0eaf
ID
4882 if (pii.pool_ns && *pii.pool_ns) {
4883 parent_spec->pool_ns = pii.pool_ns;
4884 pii.pool_ns = NULL;
4885 }
eb3b2d6b
ID
4886 parent_spec->image_id = pii.image_id;
4887 pii.image_id = NULL;
4888 parent_spec->snap_id = pii.snap_id;
b26c047b 4889
70cf49cf
AE
4890 rbd_dev->parent_spec = parent_spec;
4891 parent_spec = NULL; /* rbd_dev now owns this */
3b5cf2a2
AE
4892 }
4893
4894 /*
cf32bd9c
ID
4895 * We always update the parent overlap. If it's zero we issue
4896 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 4897 */
eb3b2d6b 4898 if (!pii.overlap) {
3b5cf2a2 4899 if (parent_spec) {
cf32bd9c
ID
4900 /* refresh, careful to warn just once */
4901 if (rbd_dev->parent_overlap)
4902 rbd_warn(rbd_dev,
4903 "clone now standalone (overlap became 0)");
3b5cf2a2 4904 } else {
cf32bd9c
ID
4905 /* initial probe */
4906 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 4907 }
70cf49cf 4908 }
eb3b2d6b 4909 rbd_dev->parent_overlap = pii.overlap;
cf32bd9c 4910
86b00e0d
AE
4911out:
4912 ret = 0;
4913out_err:
e92c0eaf 4914 kfree(pii.pool_ns);
eb3b2d6b 4915 kfree(pii.image_id);
86b00e0d 4916 rbd_spec_put(parent_spec);
86b00e0d
AE
4917 return ret;
4918}
4919
cc070d59
AE
4920static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4921{
4922 struct {
4923 __le64 stripe_unit;
4924 __le64 stripe_count;
4925 } __attribute__ ((packed)) striping_info_buf = { 0 };
4926 size_t size = sizeof (striping_info_buf);
4927 void *p;
cc070d59
AE
4928 int ret;
4929
ecd4a68a
ID
4930 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4931 &rbd_dev->header_oloc, "get_stripe_unit_count",
4932 NULL, 0, &striping_info_buf, size);
cc070d59
AE
4933 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4934 if (ret < 0)
4935 return ret;
4936 if (ret < size)
4937 return -ERANGE;
4938
cc070d59 4939 p = &striping_info_buf;
b1331852
ID
4940 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4941 rbd_dev->header.stripe_count = ceph_decode_64(&p);
cc070d59
AE
4942 return 0;
4943}
4944
7e97332e
ID
4945static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4946{
4947 __le64 data_pool_id;
4948 int ret;
4949
4950 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4951 &rbd_dev->header_oloc, "get_data_pool",
4952 NULL, 0, &data_pool_id, sizeof(data_pool_id));
4953 if (ret < 0)
4954 return ret;
4955 if (ret < sizeof(data_pool_id))
4956 return -EBADMSG;
4957
4958 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4959 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4960 return 0;
4961}
4962
9e15b77d
AE
4963static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4964{
ecd4a68a 4965 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
4966 size_t image_id_size;
4967 char *image_id;
4968 void *p;
4969 void *end;
4970 size_t size;
4971 void *reply_buf = NULL;
4972 size_t len = 0;
4973 char *image_name = NULL;
4974 int ret;
4975
4976 rbd_assert(!rbd_dev->spec->image_name);
4977
69e7a02f
AE
4978 len = strlen(rbd_dev->spec->image_id);
4979 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4980 image_id = kmalloc(image_id_size, GFP_KERNEL);
4981 if (!image_id)
4982 return NULL;
4983
4984 p = image_id;
4157976b 4985 end = image_id + image_id_size;
57385b51 4986 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4987
4988 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4989 reply_buf = kmalloc(size, GFP_KERNEL);
4990 if (!reply_buf)
4991 goto out;
4992
ecd4a68a
ID
4993 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4994 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4995 "dir_get_name", image_id, image_id_size,
4996 reply_buf, size);
9e15b77d
AE
4997 if (ret < 0)
4998 goto out;
4999 p = reply_buf;
f40eb349
AE
5000 end = reply_buf + ret;
5001
9e15b77d
AE
5002 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5003 if (IS_ERR(image_name))
5004 image_name = NULL;
5005 else
5006 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5007out:
5008 kfree(reply_buf);
5009 kfree(image_id);
5010
5011 return image_name;
5012}
5013
2ad3d716
AE
5014static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5015{
5016 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5017 const char *snap_name;
5018 u32 which = 0;
5019
5020 /* Skip over names until we find the one we are looking for */
5021
5022 snap_name = rbd_dev->header.snap_names;
5023 while (which < snapc->num_snaps) {
5024 if (!strcmp(name, snap_name))
5025 return snapc->snaps[which];
5026 snap_name += strlen(snap_name) + 1;
5027 which++;
5028 }
5029 return CEPH_NOSNAP;
5030}
5031
5032static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5033{
5034 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5035 u32 which;
5036 bool found = false;
5037 u64 snap_id;
5038
5039 for (which = 0; !found && which < snapc->num_snaps; which++) {
5040 const char *snap_name;
5041
5042 snap_id = snapc->snaps[which];
5043 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5044 if (IS_ERR(snap_name)) {
5045 /* ignore no-longer existing snapshots */
5046 if (PTR_ERR(snap_name) == -ENOENT)
5047 continue;
5048 else
5049 break;
5050 }
2ad3d716
AE
5051 found = !strcmp(name, snap_name);
5052 kfree(snap_name);
5053 }
5054 return found ? snap_id : CEPH_NOSNAP;
5055}
5056
5057/*
5058 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5059 * no snapshot by that name is found, or if an error occurs.
5060 */
5061static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5062{
5063 if (rbd_dev->image_format == 1)
5064 return rbd_v1_snap_id_by_name(rbd_dev, name);
5065
5066 return rbd_v2_snap_id_by_name(rbd_dev, name);
5067}
5068
9e15b77d 5069/*
04077599
ID
5070 * An image being mapped will have everything but the snap id.
5071 */
5072static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5073{
5074 struct rbd_spec *spec = rbd_dev->spec;
5075
5076 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5077 rbd_assert(spec->image_id && spec->image_name);
5078 rbd_assert(spec->snap_name);
5079
5080 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5081 u64 snap_id;
5082
5083 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5084 if (snap_id == CEPH_NOSNAP)
5085 return -ENOENT;
5086
5087 spec->snap_id = snap_id;
5088 } else {
5089 spec->snap_id = CEPH_NOSNAP;
5090 }
5091
5092 return 0;
5093}
5094
5095/*
5096 * A parent image will have all ids but none of the names.
e1d4213f 5097 *
04077599
ID
5098 * All names in an rbd spec are dynamically allocated. It's OK if we
5099 * can't figure out the name for an image id.
9e15b77d 5100 */
04077599 5101static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5102{
2e9f7f1c
AE
5103 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5104 struct rbd_spec *spec = rbd_dev->spec;
5105 const char *pool_name;
5106 const char *image_name;
5107 const char *snap_name;
9e15b77d
AE
5108 int ret;
5109
04077599
ID
5110 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5111 rbd_assert(spec->image_id);
5112 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5113
2e9f7f1c 5114 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5115
2e9f7f1c
AE
5116 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5117 if (!pool_name) {
5118 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5119 return -EIO;
5120 }
2e9f7f1c
AE
5121 pool_name = kstrdup(pool_name, GFP_KERNEL);
5122 if (!pool_name)
9e15b77d
AE
5123 return -ENOMEM;
5124
5125 /* Fetch the image name; tolerate failure here */
5126
2e9f7f1c
AE
5127 image_name = rbd_dev_image_name(rbd_dev);
5128 if (!image_name)
06ecc6cb 5129 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5130
04077599 5131 /* Fetch the snapshot name */
9e15b77d 5132
2e9f7f1c 5133 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5134 if (IS_ERR(snap_name)) {
5135 ret = PTR_ERR(snap_name);
9e15b77d 5136 goto out_err;
2e9f7f1c
AE
5137 }
5138
5139 spec->pool_name = pool_name;
5140 spec->image_name = image_name;
5141 spec->snap_name = snap_name;
9e15b77d
AE
5142
5143 return 0;
04077599 5144
9e15b77d 5145out_err:
2e9f7f1c
AE
5146 kfree(image_name);
5147 kfree(pool_name);
9e15b77d
AE
5148 return ret;
5149}
5150
cc4a38bd 5151static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5152{
5153 size_t size;
5154 int ret;
5155 void *reply_buf;
5156 void *p;
5157 void *end;
5158 u64 seq;
5159 u32 snap_count;
5160 struct ceph_snap_context *snapc;
5161 u32 i;
5162
5163 /*
5164 * We'll need room for the seq value (maximum snapshot id),
5165 * snapshot count, and array of that many snapshot ids.
5166 * For now we have a fixed upper limit on the number we're
5167 * prepared to receive.
5168 */
5169 size = sizeof (__le64) + sizeof (__le32) +
5170 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5171 reply_buf = kzalloc(size, GFP_KERNEL);
5172 if (!reply_buf)
5173 return -ENOMEM;
5174
ecd4a68a
ID
5175 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5176 &rbd_dev->header_oloc, "get_snapcontext",
5177 NULL, 0, reply_buf, size);
36be9a76 5178 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5179 if (ret < 0)
5180 goto out;
5181
35d489f9 5182 p = reply_buf;
57385b51
AE
5183 end = reply_buf + ret;
5184 ret = -ERANGE;
35d489f9
AE
5185 ceph_decode_64_safe(&p, end, seq, out);
5186 ceph_decode_32_safe(&p, end, snap_count, out);
5187
5188 /*
5189 * Make sure the reported number of snapshot ids wouldn't go
5190 * beyond the end of our buffer. But before checking that,
5191 * make sure the computed size of the snapshot context we
5192 * allocate is representable in a size_t.
5193 */
5194 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5195 / sizeof (u64)) {
5196 ret = -EINVAL;
5197 goto out;
5198 }
5199 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5200 goto out;
468521c1 5201 ret = 0;
35d489f9 5202
812164f8 5203 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5204 if (!snapc) {
5205 ret = -ENOMEM;
5206 goto out;
5207 }
35d489f9 5208 snapc->seq = seq;
35d489f9
AE
5209 for (i = 0; i < snap_count; i++)
5210 snapc->snaps[i] = ceph_decode_64(&p);
5211
49ece554 5212 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5213 rbd_dev->header.snapc = snapc;
5214
5215 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5216 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5217out:
5218 kfree(reply_buf);
5219
57385b51 5220 return ret;
35d489f9
AE
5221}
5222
54cac61f
AE
5223static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5224 u64 snap_id)
b8b1e2db
AE
5225{
5226 size_t size;
5227 void *reply_buf;
54cac61f 5228 __le64 snapid;
b8b1e2db
AE
5229 int ret;
5230 void *p;
5231 void *end;
b8b1e2db
AE
5232 char *snap_name;
5233
5234 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5235 reply_buf = kmalloc(size, GFP_KERNEL);
5236 if (!reply_buf)
5237 return ERR_PTR(-ENOMEM);
5238
54cac61f 5239 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
5240 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5241 &rbd_dev->header_oloc, "get_snapshot_name",
5242 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5243 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5244 if (ret < 0) {
5245 snap_name = ERR_PTR(ret);
b8b1e2db 5246 goto out;
f40eb349 5247 }
b8b1e2db
AE
5248
5249 p = reply_buf;
f40eb349 5250 end = reply_buf + ret;
e5c35534 5251 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5252 if (IS_ERR(snap_name))
b8b1e2db 5253 goto out;
b8b1e2db 5254
f40eb349 5255 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5256 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5257out:
5258 kfree(reply_buf);
5259
f40eb349 5260 return snap_name;
b8b1e2db
AE
5261}
5262
2df3fac7 5263static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5264{
2df3fac7 5265 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5266 int ret;
117973fb 5267
1617e40c
JD
5268 ret = rbd_dev_v2_image_size(rbd_dev);
5269 if (ret)
cfbf6377 5270 return ret;
1617e40c 5271
2df3fac7
AE
5272 if (first_time) {
5273 ret = rbd_dev_v2_header_onetime(rbd_dev);
5274 if (ret)
cfbf6377 5275 return ret;
2df3fac7
AE
5276 }
5277
cc4a38bd 5278 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5279 if (ret && first_time) {
5280 kfree(rbd_dev->header.object_prefix);
5281 rbd_dev->header.object_prefix = NULL;
5282 }
117973fb
AE
5283
5284 return ret;
5285}
5286
a720ae09
ID
5287static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5288{
5289 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5290
5291 if (rbd_dev->image_format == 1)
5292 return rbd_dev_v1_header_info(rbd_dev);
5293
5294 return rbd_dev_v2_header_info(rbd_dev);
5295}
5296
e28fff26
AE
5297/*
5298 * Skips over white space at *buf, and updates *buf to point to the
5299 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5300 * the token (string of non-white space characters) found. Note
5301 * that *buf must be terminated with '\0'.
e28fff26
AE
5302 */
5303static inline size_t next_token(const char **buf)
5304{
5305 /*
5306 * These are the characters that produce nonzero for
5307 * isspace() in the "C" and "POSIX" locales.
5308 */
5309 const char *spaces = " \f\n\r\t\v";
5310
5311 *buf += strspn(*buf, spaces); /* Find start of token */
5312
5313 return strcspn(*buf, spaces); /* Return token length */
5314}
5315
ea3352f4
AE
5316/*
5317 * Finds the next token in *buf, dynamically allocates a buffer big
5318 * enough to hold a copy of it, and copies the token into the new
5319 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5320 * that a duplicate buffer is created even for a zero-length token.
5321 *
5322 * Returns a pointer to the newly-allocated duplicate, or a null
5323 * pointer if memory for the duplicate was not available. If
5324 * the lenp argument is a non-null pointer, the length of the token
5325 * (not including the '\0') is returned in *lenp.
5326 *
5327 * If successful, the *buf pointer will be updated to point beyond
5328 * the end of the found token.
5329 *
5330 * Note: uses GFP_KERNEL for allocation.
5331 */
5332static inline char *dup_token(const char **buf, size_t *lenp)
5333{
5334 char *dup;
5335 size_t len;
5336
5337 len = next_token(buf);
4caf35f9 5338 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5339 if (!dup)
5340 return NULL;
ea3352f4
AE
5341 *(dup + len) = '\0';
5342 *buf += len;
5343
5344 if (lenp)
5345 *lenp = len;
5346
5347 return dup;
5348}
5349
a725f65e 5350/*
859c31df
AE
5351 * Parse the options provided for an "rbd add" (i.e., rbd image
5352 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5353 * and the data written is passed here via a NUL-terminated buffer.
5354 * Returns 0 if successful or an error code otherwise.
d22f76e7 5355 *
859c31df
AE
5356 * The information extracted from these options is recorded in
5357 * the other parameters which return dynamically-allocated
5358 * structures:
5359 * ceph_opts
5360 * The address of a pointer that will refer to a ceph options
5361 * structure. Caller must release the returned pointer using
5362 * ceph_destroy_options() when it is no longer needed.
5363 * rbd_opts
5364 * Address of an rbd options pointer. Fully initialized by
5365 * this function; caller must release with kfree().
5366 * spec
5367 * Address of an rbd image specification pointer. Fully
5368 * initialized by this function based on parsed options.
5369 * Caller must release with rbd_spec_put().
5370 *
5371 * The options passed take this form:
5372 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5373 * where:
5374 * <mon_addrs>
5375 * A comma-separated list of one or more monitor addresses.
5376 * A monitor address is an ip address, optionally followed
5377 * by a port number (separated by a colon).
5378 * I.e.: ip1[:port1][,ip2[:port2]...]
5379 * <options>
5380 * A comma-separated list of ceph and/or rbd options.
5381 * <pool_name>
5382 * The name of the rados pool containing the rbd image.
5383 * <image_name>
5384 * The name of the image in that pool to map.
5385 * <snap_id>
5386 * An optional snapshot id. If provided, the mapping will
5387 * present data from the image at the time that snapshot was
5388 * created. The image head is used if no snapshot id is
5389 * provided. Snapshot mappings are always read-only.
a725f65e 5390 */
859c31df 5391static int rbd_add_parse_args(const char *buf,
dc79b113 5392 struct ceph_options **ceph_opts,
859c31df
AE
5393 struct rbd_options **opts,
5394 struct rbd_spec **rbd_spec)
e28fff26 5395{
d22f76e7 5396 size_t len;
859c31df 5397 char *options;
0ddebc0c 5398 const char *mon_addrs;
ecb4dc22 5399 char *snap_name;
0ddebc0c 5400 size_t mon_addrs_size;
c300156b 5401 struct parse_rbd_opts_ctx pctx = { 0 };
859c31df 5402 struct ceph_options *copts;
dc79b113 5403 int ret;
e28fff26
AE
5404
5405 /* The first four tokens are required */
5406
7ef3214a 5407 len = next_token(&buf);
4fb5d671
AE
5408 if (!len) {
5409 rbd_warn(NULL, "no monitor address(es) provided");
5410 return -EINVAL;
5411 }
0ddebc0c 5412 mon_addrs = buf;
f28e565a 5413 mon_addrs_size = len + 1;
7ef3214a 5414 buf += len;
a725f65e 5415
dc79b113 5416 ret = -EINVAL;
f28e565a
AE
5417 options = dup_token(&buf, NULL);
5418 if (!options)
dc79b113 5419 return -ENOMEM;
4fb5d671
AE
5420 if (!*options) {
5421 rbd_warn(NULL, "no options provided");
5422 goto out_err;
5423 }
e28fff26 5424
c300156b
ID
5425 pctx.spec = rbd_spec_alloc();
5426 if (!pctx.spec)
f28e565a 5427 goto out_mem;
859c31df 5428
c300156b
ID
5429 pctx.spec->pool_name = dup_token(&buf, NULL);
5430 if (!pctx.spec->pool_name)
859c31df 5431 goto out_mem;
c300156b 5432 if (!*pctx.spec->pool_name) {
4fb5d671
AE
5433 rbd_warn(NULL, "no pool name provided");
5434 goto out_err;
5435 }
e28fff26 5436
c300156b
ID
5437 pctx.spec->image_name = dup_token(&buf, NULL);
5438 if (!pctx.spec->image_name)
f28e565a 5439 goto out_mem;
c300156b 5440 if (!*pctx.spec->image_name) {
4fb5d671
AE
5441 rbd_warn(NULL, "no image name provided");
5442 goto out_err;
5443 }
d4b125e9 5444
f28e565a
AE
5445 /*
5446 * Snapshot name is optional; default is to use "-"
5447 * (indicating the head/no snapshot).
5448 */
3feeb894 5449 len = next_token(&buf);
820a5f3e 5450 if (!len) {
3feeb894
AE
5451 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5452 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5453 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5454 ret = -ENAMETOOLONG;
f28e565a 5455 goto out_err;
849b4260 5456 }
ecb4dc22
AE
5457 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5458 if (!snap_name)
f28e565a 5459 goto out_mem;
ecb4dc22 5460 *(snap_name + len) = '\0';
c300156b 5461 pctx.spec->snap_name = snap_name;
e5c35534 5462
0ddebc0c 5463 /* Initialize all rbd options to the defaults */
e28fff26 5464
c300156b
ID
5465 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
5466 if (!pctx.opts)
4e9afeba
AE
5467 goto out_mem;
5468
c300156b
ID
5469 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
5470 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
0c93e1b7 5471 pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
c300156b
ID
5472 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
5473 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5474 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5475 pctx.opts->trim = RBD_TRIM_DEFAULT;
d22f76e7 5476
859c31df 5477 copts = ceph_parse_options(options, mon_addrs,
c300156b
ID
5478 mon_addrs + mon_addrs_size - 1,
5479 parse_rbd_opts_token, &pctx);
859c31df
AE
5480 if (IS_ERR(copts)) {
5481 ret = PTR_ERR(copts);
dc79b113
AE
5482 goto out_err;
5483 }
859c31df
AE
5484 kfree(options);
5485
5486 *ceph_opts = copts;
c300156b
ID
5487 *opts = pctx.opts;
5488 *rbd_spec = pctx.spec;
0ddebc0c 5489
dc79b113 5490 return 0;
f28e565a 5491out_mem:
dc79b113 5492 ret = -ENOMEM;
d22f76e7 5493out_err:
c300156b
ID
5494 kfree(pctx.opts);
5495 rbd_spec_put(pctx.spec);
f28e565a 5496 kfree(options);
d22f76e7 5497
dc79b113 5498 return ret;
a725f65e
AE
5499}
5500
e010dd0a
ID
5501static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5502{
5503 down_write(&rbd_dev->lock_rwsem);
5504 if (__rbd_is_lock_owner(rbd_dev))
5505 rbd_unlock(rbd_dev);
5506 up_write(&rbd_dev->lock_rwsem);
5507}
5508
5509static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5510{
2f18d466
ID
5511 int ret;
5512
e010dd0a
ID
5513 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5514 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5515 return -EINVAL;
5516 }
5517
5518 /* FIXME: "rbd map --exclusive" should be in interruptible */
5519 down_read(&rbd_dev->lock_rwsem);
2f18d466 5520 ret = rbd_wait_state_locked(rbd_dev, true);
e010dd0a 5521 up_read(&rbd_dev->lock_rwsem);
2f18d466 5522 if (ret) {
e010dd0a
ID
5523 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5524 return -EROFS;
5525 }
5526
5527 return 0;
5528}
5529
589d30e0
AE
5530/*
5531 * An rbd format 2 image has a unique identifier, distinct from the
5532 * name given to it by the user. Internally, that identifier is
5533 * what's used to specify the names of objects related to the image.
5534 *
5535 * A special "rbd id" object is used to map an rbd image name to its
5536 * id. If that object doesn't exist, then there is no v2 rbd image
5537 * with the supplied name.
5538 *
5539 * This function will record the given rbd_dev's image_id field if
5540 * it can be determined, and in that case will return 0. If any
5541 * errors occur a negative errno will be returned and the rbd_dev's
5542 * image_id field will be unchanged (and should be NULL).
5543 */
5544static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5545{
5546 int ret;
5547 size_t size;
ecd4a68a 5548 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5549 void *response;
c0fba368 5550 char *image_id;
2f82ee54 5551
2c0d0a10
AE
5552 /*
5553 * When probing a parent image, the image id is already
5554 * known (and the image name likely is not). There's no
c0fba368
AE
5555 * need to fetch the image id again in this case. We
5556 * do still need to set the image format though.
2c0d0a10 5557 */
c0fba368
AE
5558 if (rbd_dev->spec->image_id) {
5559 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5560
2c0d0a10 5561 return 0;
c0fba368 5562 }
2c0d0a10 5563
589d30e0
AE
5564 /*
5565 * First, see if the format 2 image id file exists, and if
5566 * so, get the image's persistent id from it.
5567 */
ecd4a68a
ID
5568 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5569 rbd_dev->spec->image_name);
5570 if (ret)
5571 return ret;
5572
5573 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5574
5575 /* Response will be an encoded string, which includes a length */
5576
5577 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5578 response = kzalloc(size, GFP_NOIO);
5579 if (!response) {
5580 ret = -ENOMEM;
5581 goto out;
5582 }
5583
c0fba368
AE
5584 /* If it doesn't exist we'll assume it's a format 1 image */
5585
ecd4a68a
ID
5586 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5587 "get_id", NULL, 0,
5588 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5589 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5590 if (ret == -ENOENT) {
5591 image_id = kstrdup("", GFP_KERNEL);
5592 ret = image_id ? 0 : -ENOMEM;
5593 if (!ret)
5594 rbd_dev->image_format = 1;
7dd440c9 5595 } else if (ret >= 0) {
c0fba368
AE
5596 void *p = response;
5597
5598 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5599 NULL, GFP_NOIO);
461f758a 5600 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5601 if (!ret)
5602 rbd_dev->image_format = 2;
c0fba368
AE
5603 }
5604
5605 if (!ret) {
5606 rbd_dev->spec->image_id = image_id;
5607 dout("image_id is %s\n", image_id);
589d30e0
AE
5608 }
5609out:
5610 kfree(response);
ecd4a68a 5611 ceph_oid_destroy(&oid);
589d30e0
AE
5612 return ret;
5613}
5614
3abef3b3
AE
5615/*
5616 * Undo whatever state changes are made by v1 or v2 header info
5617 * call.
5618 */
6fd48b3b
AE
5619static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5620{
5621 struct rbd_image_header *header;
5622
e69b8d41 5623 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5624
5625 /* Free dynamic fields from the header, then zero it out */
5626
5627 header = &rbd_dev->header;
812164f8 5628 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5629 kfree(header->snap_sizes);
5630 kfree(header->snap_names);
5631 kfree(header->object_prefix);
5632 memset(header, 0, sizeof (*header));
5633}
5634
2df3fac7 5635static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5636{
5637 int ret;
a30b71b9 5638
1e130199 5639 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5640 if (ret)
b1b5402a
AE
5641 goto out_err;
5642
2df3fac7
AE
5643 /*
5644 * Get the and check features for the image. Currently the
5645 * features are assumed to never change.
5646 */
b1b5402a 5647 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5648 if (ret)
9d475de5 5649 goto out_err;
35d489f9 5650
cc070d59
AE
5651 /* If the image supports fancy striping, get its parameters */
5652
5653 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5654 ret = rbd_dev_v2_striping_info(rbd_dev);
5655 if (ret < 0)
5656 goto out_err;
5657 }
a30b71b9 5658
7e97332e
ID
5659 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5660 ret = rbd_dev_v2_data_pool(rbd_dev);
5661 if (ret)
5662 goto out_err;
5663 }
5664
263423f8 5665 rbd_init_layout(rbd_dev);
35152979 5666 return 0;
263423f8 5667
9d475de5 5668out_err:
642a2537 5669 rbd_dev->header.features = 0;
1e130199
AE
5670 kfree(rbd_dev->header.object_prefix);
5671 rbd_dev->header.object_prefix = NULL;
9d475de5 5672 return ret;
a30b71b9
AE
5673}
5674
6d69bb53
ID
5675/*
5676 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5677 * rbd_dev_image_probe() recursion depth, which means it's also the
5678 * length of the already discovered part of the parent chain.
5679 */
5680static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5681{
2f82ee54 5682 struct rbd_device *parent = NULL;
124afba2
AE
5683 int ret;
5684
5685 if (!rbd_dev->parent_spec)
5686 return 0;
124afba2 5687
6d69bb53
ID
5688 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5689 pr_info("parent chain is too long (%d)\n", depth);
5690 ret = -EINVAL;
5691 goto out_err;
5692 }
5693
1643dfa4 5694 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5695 if (!parent) {
5696 ret = -ENOMEM;
124afba2 5697 goto out_err;
1f2c6651
ID
5698 }
5699
5700 /*
5701 * Images related by parent/child relationships always share
5702 * rbd_client and spec/parent_spec, so bump their refcounts.
5703 */
5704 __rbd_get_client(rbd_dev->rbd_client);
5705 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5706
6d69bb53 5707 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5708 if (ret < 0)
5709 goto out_err;
1f2c6651 5710
124afba2 5711 rbd_dev->parent = parent;
a2acd00e 5712 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5713 return 0;
1f2c6651 5714
124afba2 5715out_err:
1f2c6651 5716 rbd_dev_unparent(rbd_dev);
1761b229 5717 rbd_dev_destroy(parent);
124afba2
AE
5718 return ret;
5719}
5720
5769ed0c
ID
5721static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5722{
5723 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5724 rbd_dev_mapping_clear(rbd_dev);
5725 rbd_free_disk(rbd_dev);
5726 if (!single_major)
5727 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5728}
5729
811c6688
ID
5730/*
5731 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5732 * upon return.
5733 */
200a6a8b 5734static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5735{
83a06263 5736 int ret;
d1cf5788 5737
9b60e70b 5738 /* Record our major and minor device numbers. */
83a06263 5739
9b60e70b
ID
5740 if (!single_major) {
5741 ret = register_blkdev(0, rbd_dev->name);
5742 if (ret < 0)
1643dfa4 5743 goto err_out_unlock;
9b60e70b
ID
5744
5745 rbd_dev->major = ret;
5746 rbd_dev->minor = 0;
5747 } else {
5748 rbd_dev->major = rbd_major;
5749 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5750 }
83a06263
AE
5751
5752 /* Set up the blkdev mapping. */
5753
5754 ret = rbd_init_disk(rbd_dev);
5755 if (ret)
5756 goto err_out_blkdev;
5757
f35a4dee 5758 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5759 if (ret)
5760 goto err_out_disk;
bc1ecc65 5761
f35a4dee 5762 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
9568c93e 5763 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
f35a4dee 5764
5769ed0c 5765 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 5766 if (ret)
f5ee37bd 5767 goto err_out_mapping;
83a06263 5768
129b79d4 5769 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5770 up_write(&rbd_dev->header_rwsem);
5769ed0c 5771 return 0;
2f82ee54 5772
f35a4dee
AE
5773err_out_mapping:
5774 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5775err_out_disk:
5776 rbd_free_disk(rbd_dev);
5777err_out_blkdev:
9b60e70b
ID
5778 if (!single_major)
5779 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5780err_out_unlock:
5781 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5782 return ret;
5783}
5784
332bb12d
AE
5785static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5786{
5787 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5788 int ret;
332bb12d
AE
5789
5790 /* Record the header object name for this rbd image. */
5791
5792 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 5793 if (rbd_dev->image_format == 1)
c41d13a3
ID
5794 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5795 spec->image_name, RBD_SUFFIX);
332bb12d 5796 else
c41d13a3
ID
5797 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5798 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5799
c41d13a3 5800 return ret;
332bb12d
AE
5801}
5802
200a6a8b
AE
5803static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5804{
6fd48b3b 5805 rbd_dev_unprobe(rbd_dev);
fd22aef8
ID
5806 if (rbd_dev->opts)
5807 rbd_unregister_watch(rbd_dev);
6fd48b3b
AE
5808 rbd_dev->image_format = 0;
5809 kfree(rbd_dev->spec->image_id);
5810 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
5811}
5812
a30b71b9
AE
5813/*
5814 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
5815 * device. If this image is the one being mapped (i.e., not a
5816 * parent), initiate a watch on its header object before using that
5817 * object to get detailed information about the rbd image.
a30b71b9 5818 */
6d69bb53 5819static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
5820{
5821 int ret;
5822
5823 /*
3abef3b3
AE
5824 * Get the id from the image id object. Unless there's an
5825 * error, rbd_dev->spec->image_id will be filled in with
5826 * a dynamically-allocated string, and rbd_dev->image_format
5827 * will be set to either 1 or 2.
a30b71b9
AE
5828 */
5829 ret = rbd_dev_image_id(rbd_dev);
5830 if (ret)
c0fba368 5831 return ret;
c0fba368 5832
332bb12d
AE
5833 ret = rbd_dev_header_name(rbd_dev);
5834 if (ret)
5835 goto err_out_format;
5836
6d69bb53 5837 if (!depth) {
99d16943 5838 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
5839 if (ret) {
5840 if (ret == -ENOENT)
b26c047b 5841 pr_info("image %s/%s%s%s does not exist\n",
1fe48023 5842 rbd_dev->spec->pool_name,
b26c047b
ID
5843 rbd_dev->spec->pool_ns ?: "",
5844 rbd_dev->spec->pool_ns ? "/" : "",
1fe48023 5845 rbd_dev->spec->image_name);
c41d13a3 5846 goto err_out_format;
1fe48023 5847 }
1f3ef788 5848 }
b644de2b 5849
a720ae09 5850 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 5851 if (ret)
b644de2b 5852 goto err_out_watch;
83a06263 5853
04077599
ID
5854 /*
5855 * If this image is the one being mapped, we have pool name and
5856 * id, image name and id, and snap name - need to fill snap id.
5857 * Otherwise this is a parent image, identified by pool, image
5858 * and snap ids - need to fill in names for those ids.
5859 */
6d69bb53 5860 if (!depth)
04077599
ID
5861 ret = rbd_spec_fill_snap_id(rbd_dev);
5862 else
5863 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
5864 if (ret) {
5865 if (ret == -ENOENT)
b26c047b 5866 pr_info("snap %s/%s%s%s@%s does not exist\n",
1fe48023 5867 rbd_dev->spec->pool_name,
b26c047b
ID
5868 rbd_dev->spec->pool_ns ?: "",
5869 rbd_dev->spec->pool_ns ? "/" : "",
1fe48023
ID
5870 rbd_dev->spec->image_name,
5871 rbd_dev->spec->snap_name);
33dca39f 5872 goto err_out_probe;
1fe48023 5873 }
9bb81c9b 5874
e8f59b59
ID
5875 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5876 ret = rbd_dev_v2_parent_info(rbd_dev);
5877 if (ret)
5878 goto err_out_probe;
e8f59b59
ID
5879 }
5880
6d69bb53 5881 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
5882 if (ret)
5883 goto err_out_probe;
5884
5885 dout("discovered format %u image, header name is %s\n",
c41d13a3 5886 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 5887 return 0;
e8f59b59 5888
6fd48b3b
AE
5889err_out_probe:
5890 rbd_dev_unprobe(rbd_dev);
b644de2b 5891err_out_watch:
6d69bb53 5892 if (!depth)
99d16943 5893 rbd_unregister_watch(rbd_dev);
332bb12d
AE
5894err_out_format:
5895 rbd_dev->image_format = 0;
5655c4d9
AE
5896 kfree(rbd_dev->spec->image_id);
5897 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
5898 return ret;
5899}
5900
9b60e70b
ID
5901static ssize_t do_rbd_add(struct bus_type *bus,
5902 const char *buf,
5903 size_t count)
602adf40 5904{
cb8627c7 5905 struct rbd_device *rbd_dev = NULL;
dc79b113 5906 struct ceph_options *ceph_opts = NULL;
4e9afeba 5907 struct rbd_options *rbd_opts = NULL;
859c31df 5908 struct rbd_spec *spec = NULL;
9d3997fd 5909 struct rbd_client *rbdc;
b51c83c2 5910 int rc;
602adf40
YS
5911
5912 if (!try_module_get(THIS_MODULE))
5913 return -ENODEV;
5914
602adf40 5915 /* parse add command */
859c31df 5916 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5917 if (rc < 0)
dd5ac32d 5918 goto out;
78cea76e 5919
9d3997fd
AE
5920 rbdc = rbd_get_client(ceph_opts);
5921 if (IS_ERR(rbdc)) {
5922 rc = PTR_ERR(rbdc);
0ddebc0c 5923 goto err_out_args;
9d3997fd 5924 }
602adf40 5925
602adf40 5926 /* pick the pool */
dd435855 5927 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
1fe48023
ID
5928 if (rc < 0) {
5929 if (rc == -ENOENT)
5930 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 5931 goto err_out_client;
1fe48023 5932 }
c0cd10db 5933 spec->pool_id = (u64)rc;
859c31df 5934
d147543d 5935 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
5936 if (!rbd_dev) {
5937 rc = -ENOMEM;
bd4ba655 5938 goto err_out_client;
b51c83c2 5939 }
c53d5893
AE
5940 rbdc = NULL; /* rbd_dev now owns this */
5941 spec = NULL; /* rbd_dev now owns this */
d147543d 5942 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 5943
0d6d1e9c
MC
5944 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5945 if (!rbd_dev->config_info) {
5946 rc = -ENOMEM;
5947 goto err_out_rbd_dev;
5948 }
5949
811c6688 5950 down_write(&rbd_dev->header_rwsem);
6d69bb53 5951 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
5952 if (rc < 0) {
5953 up_write(&rbd_dev->header_rwsem);
c53d5893 5954 goto err_out_rbd_dev;
0d6d1e9c 5955 }
05fd6f6f 5956
7ce4eef7 5957 /* If we are mapping a snapshot it must be marked read-only */
7ce4eef7 5958 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9568c93e 5959 rbd_dev->opts->read_only = true;
7ce4eef7 5960
0c93e1b7
ID
5961 if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
5962 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
5963 rbd_dev->layout.object_size);
5964 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
5965 }
5966
b536f69a 5967 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 5968 if (rc)
8b679ec5 5969 goto err_out_image_probe;
3abef3b3 5970
e010dd0a
ID
5971 if (rbd_dev->opts->exclusive) {
5972 rc = rbd_add_acquire_lock(rbd_dev);
5973 if (rc)
5974 goto err_out_device_setup;
3abef3b3
AE
5975 }
5976
5769ed0c
ID
5977 /* Everything's ready. Announce the disk to the world. */
5978
5979 rc = device_add(&rbd_dev->dev);
5980 if (rc)
e010dd0a 5981 goto err_out_image_lock;
5769ed0c
ID
5982
5983 add_disk(rbd_dev->disk);
5984 /* see rbd_init_disk() */
5985 blk_put_queue(rbd_dev->disk->queue);
5986
5987 spin_lock(&rbd_dev_list_lock);
5988 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5989 spin_unlock(&rbd_dev_list_lock);
5990
5991 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5992 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5993 rbd_dev->header.features);
dd5ac32d
ID
5994 rc = count;
5995out:
5996 module_put(THIS_MODULE);
5997 return rc;
b536f69a 5998
e010dd0a
ID
5999err_out_image_lock:
6000 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
6001err_out_device_setup:
6002 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
6003err_out_image_probe:
6004 rbd_dev_image_release(rbd_dev);
c53d5893
AE
6005err_out_rbd_dev:
6006 rbd_dev_destroy(rbd_dev);
bd4ba655 6007err_out_client:
9d3997fd 6008 rbd_put_client(rbdc);
0ddebc0c 6009err_out_args:
859c31df 6010 rbd_spec_put(spec);
d147543d 6011 kfree(rbd_opts);
dd5ac32d 6012 goto out;
602adf40
YS
6013}
6014
9b60e70b
ID
6015static ssize_t rbd_add(struct bus_type *bus,
6016 const char *buf,
6017 size_t count)
6018{
6019 if (single_major)
6020 return -EINVAL;
6021
6022 return do_rbd_add(bus, buf, count);
6023}
6024
6025static ssize_t rbd_add_single_major(struct bus_type *bus,
6026 const char *buf,
6027 size_t count)
6028{
6029 return do_rbd_add(bus, buf, count);
6030}
6031
05a46afd
AE
6032static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6033{
ad945fc1 6034 while (rbd_dev->parent) {
05a46afd
AE
6035 struct rbd_device *first = rbd_dev;
6036 struct rbd_device *second = first->parent;
6037 struct rbd_device *third;
6038
6039 /*
6040 * Follow to the parent with no grandparent and
6041 * remove it.
6042 */
6043 while (second && (third = second->parent)) {
6044 first = second;
6045 second = third;
6046 }
ad945fc1 6047 rbd_assert(second);
8ad42cd0 6048 rbd_dev_image_release(second);
8b679ec5 6049 rbd_dev_destroy(second);
ad945fc1
AE
6050 first->parent = NULL;
6051 first->parent_overlap = 0;
6052
6053 rbd_assert(first->parent_spec);
05a46afd
AE
6054 rbd_spec_put(first->parent_spec);
6055 first->parent_spec = NULL;
05a46afd
AE
6056 }
6057}
6058
9b60e70b
ID
6059static ssize_t do_rbd_remove(struct bus_type *bus,
6060 const char *buf,
6061 size_t count)
602adf40
YS
6062{
6063 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6064 struct list_head *tmp;
6065 int dev_id;
0276dca6 6066 char opt_buf[6];
0276dca6 6067 bool force = false;
0d8189e1 6068 int ret;
602adf40 6069
0276dca6
MC
6070 dev_id = -1;
6071 opt_buf[0] = '\0';
6072 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6073 if (dev_id < 0) {
6074 pr_err("dev_id out of range\n");
602adf40 6075 return -EINVAL;
0276dca6
MC
6076 }
6077 if (opt_buf[0] != '\0') {
6078 if (!strcmp(opt_buf, "force")) {
6079 force = true;
6080 } else {
6081 pr_err("bad remove option at '%s'\n", opt_buf);
6082 return -EINVAL;
6083 }
6084 }
602adf40 6085
751cc0e3
AE
6086 ret = -ENOENT;
6087 spin_lock(&rbd_dev_list_lock);
6088 list_for_each(tmp, &rbd_dev_list) {
6089 rbd_dev = list_entry(tmp, struct rbd_device, node);
6090 if (rbd_dev->dev_id == dev_id) {
6091 ret = 0;
6092 break;
6093 }
42382b70 6094 }
751cc0e3
AE
6095 if (!ret) {
6096 spin_lock_irq(&rbd_dev->lock);
0276dca6 6097 if (rbd_dev->open_count && !force)
751cc0e3 6098 ret = -EBUSY;
85f5a4d6
ID
6099 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6100 &rbd_dev->flags))
6101 ret = -EINPROGRESS;
751cc0e3
AE
6102 spin_unlock_irq(&rbd_dev->lock);
6103 }
6104 spin_unlock(&rbd_dev_list_lock);
85f5a4d6 6105 if (ret)
1ba0f1e7 6106 return ret;
751cc0e3 6107
0276dca6
MC
6108 if (force) {
6109 /*
6110 * Prevent new IO from being queued and wait for existing
6111 * IO to complete/fail.
6112 */
6113 blk_mq_freeze_queue(rbd_dev->disk->queue);
6114 blk_set_queue_dying(rbd_dev->disk->queue);
6115 }
6116
5769ed0c
ID
6117 del_gendisk(rbd_dev->disk);
6118 spin_lock(&rbd_dev_list_lock);
6119 list_del_init(&rbd_dev->node);
6120 spin_unlock(&rbd_dev_list_lock);
6121 device_del(&rbd_dev->dev);
fca27065 6122
e010dd0a 6123 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 6124 rbd_dev_device_release(rbd_dev);
8ad42cd0 6125 rbd_dev_image_release(rbd_dev);
8b679ec5 6126 rbd_dev_destroy(rbd_dev);
1ba0f1e7 6127 return count;
602adf40
YS
6128}
6129
9b60e70b
ID
6130static ssize_t rbd_remove(struct bus_type *bus,
6131 const char *buf,
6132 size_t count)
6133{
6134 if (single_major)
6135 return -EINVAL;
6136
6137 return do_rbd_remove(bus, buf, count);
6138}
6139
6140static ssize_t rbd_remove_single_major(struct bus_type *bus,
6141 const char *buf,
6142 size_t count)
6143{
6144 return do_rbd_remove(bus, buf, count);
6145}
6146
602adf40
YS
6147/*
6148 * create control files in sysfs
dfc5606d 6149 * /sys/bus/rbd/...
602adf40 6150 */
7d8dc534 6151static int __init rbd_sysfs_init(void)
602adf40 6152{
dfc5606d 6153 int ret;
602adf40 6154
fed4c143 6155 ret = device_register(&rbd_root_dev);
21079786 6156 if (ret < 0)
dfc5606d 6157 return ret;
602adf40 6158
fed4c143
AE
6159 ret = bus_register(&rbd_bus_type);
6160 if (ret < 0)
6161 device_unregister(&rbd_root_dev);
602adf40 6162
602adf40
YS
6163 return ret;
6164}
6165
7d8dc534 6166static void __exit rbd_sysfs_cleanup(void)
602adf40 6167{
dfc5606d 6168 bus_unregister(&rbd_bus_type);
fed4c143 6169 device_unregister(&rbd_root_dev);
602adf40
YS
6170}
6171
7d8dc534 6172static int __init rbd_slab_init(void)
1c2a9dfe
AE
6173{
6174 rbd_assert(!rbd_img_request_cache);
03d94406 6175 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6176 if (!rbd_img_request_cache)
6177 return -ENOMEM;
6178
6179 rbd_assert(!rbd_obj_request_cache);
03d94406 6180 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6181 if (!rbd_obj_request_cache)
6182 goto out_err;
6183
6c696d85 6184 return 0;
1c2a9dfe 6185
6c696d85 6186out_err:
868311b1
AE
6187 kmem_cache_destroy(rbd_img_request_cache);
6188 rbd_img_request_cache = NULL;
1c2a9dfe
AE
6189 return -ENOMEM;
6190}
6191
6192static void rbd_slab_exit(void)
6193{
868311b1
AE
6194 rbd_assert(rbd_obj_request_cache);
6195 kmem_cache_destroy(rbd_obj_request_cache);
6196 rbd_obj_request_cache = NULL;
6197
1c2a9dfe
AE
6198 rbd_assert(rbd_img_request_cache);
6199 kmem_cache_destroy(rbd_img_request_cache);
6200 rbd_img_request_cache = NULL;
6201}
6202
cc344fa1 6203static int __init rbd_init(void)
602adf40
YS
6204{
6205 int rc;
6206
1e32d34c
AE
6207 if (!libceph_compatible(NULL)) {
6208 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6209 return -EINVAL;
6210 }
e1b4d96d 6211
1c2a9dfe 6212 rc = rbd_slab_init();
602adf40
YS
6213 if (rc)
6214 return rc;
e1b4d96d 6215
f5ee37bd
ID
6216 /*
6217 * The number of active work items is limited by the number of
f77303bd 6218 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6219 */
6220 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6221 if (!rbd_wq) {
6222 rc = -ENOMEM;
6223 goto err_out_slab;
6224 }
6225
9b60e70b
ID
6226 if (single_major) {
6227 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6228 if (rbd_major < 0) {
6229 rc = rbd_major;
f5ee37bd 6230 goto err_out_wq;
9b60e70b
ID
6231 }
6232 }
6233
1c2a9dfe
AE
6234 rc = rbd_sysfs_init();
6235 if (rc)
9b60e70b
ID
6236 goto err_out_blkdev;
6237
6238 if (single_major)
6239 pr_info("loaded (major %d)\n", rbd_major);
6240 else
6241 pr_info("loaded\n");
1c2a9dfe 6242
e1b4d96d
ID
6243 return 0;
6244
9b60e70b
ID
6245err_out_blkdev:
6246 if (single_major)
6247 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6248err_out_wq:
6249 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6250err_out_slab:
6251 rbd_slab_exit();
1c2a9dfe 6252 return rc;
602adf40
YS
6253}
6254
cc344fa1 6255static void __exit rbd_exit(void)
602adf40 6256{
ffe312cf 6257 ida_destroy(&rbd_dev_id_ida);
602adf40 6258 rbd_sysfs_cleanup();
9b60e70b
ID
6259 if (single_major)
6260 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6261 destroy_workqueue(rbd_wq);
1c2a9dfe 6262 rbd_slab_exit();
602adf40
YS
6263}
6264
6265module_init(rbd_init);
6266module_exit(rbd_exit);
6267
d552c619 6268MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6269MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6270MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6271/* following authorship retained from original osdblk.c */
6272MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6273
90da258b 6274MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6275MODULE_LICENSE("GPL");