rbd: update rbd_img_request_submit() signature
[linux-block.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
593a9e7b
AE
53/*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
a2acd00e
AE
62/*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81/* Decrement the counter. Return the resulting value, or -EINVAL */
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
f0f8cef5 95#define RBD_DRV_NAME "rbd"
602adf40 96
7e513d43
ID
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 99
6d69bb53
ID
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
d4b125e9
AE
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
35d489f9 106#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
107
108#define RBD_SNAP_HEAD_NAME "-"
109
9682fc6d
AE
110#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
9e15b77d
AE
112/* This allows a single page to hold an image name sent by OSD */
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 114#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 115
1e130199 116#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 117
ed95b21a 118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
d889140c
AE
121/* Feature bits */
122
8767b293
ID
123#define RBD_FEATURE_LAYERING (1ULL<<0)
124#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
126#define RBD_FEATURE_DATA_POOL (1ULL<<7)
e573427a 127#define RBD_FEATURE_OPERATIONS (1ULL<<8)
8767b293 128
ed95b21a
ID
129#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
130 RBD_FEATURE_STRIPINGV2 | \
7e97332e 131 RBD_FEATURE_EXCLUSIVE_LOCK | \
e573427a
ID
132 RBD_FEATURE_DATA_POOL | \
133 RBD_FEATURE_OPERATIONS)
d889140c
AE
134
135/* Features supported by this (client software) implementation. */
136
770eba6e 137#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 138
81a89793
AE
139/*
140 * An RBD device name will be "rbd#", where the "rbd" comes from
141 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 142 */
602adf40
YS
143#define DEV_NAME_LEN 32
144
145/*
146 * block device image metadata (in-memory version)
147 */
148struct rbd_image_header {
f35a4dee 149 /* These six fields never change for a given rbd image */
849b4260 150 char *object_prefix;
602adf40 151 __u8 obj_order;
f35a4dee
AE
152 u64 stripe_unit;
153 u64 stripe_count;
7e97332e 154 s64 data_pool_id;
f35a4dee 155 u64 features; /* Might be changeable someday? */
602adf40 156
f84344f3
AE
157 /* The remaining fields need to be updated occasionally */
158 u64 image_size;
159 struct ceph_snap_context *snapc;
f35a4dee
AE
160 char *snap_names; /* format 1 only */
161 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
162};
163
0d7dbfce
AE
164/*
165 * An rbd image specification.
166 *
167 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
168 * identify an image. Each rbd_dev structure includes a pointer to
169 * an rbd_spec structure that encapsulates this identity.
170 *
171 * Each of the id's in an rbd_spec has an associated name. For a
172 * user-mapped image, the names are supplied and the id's associated
173 * with them are looked up. For a layered image, a parent image is
174 * defined by the tuple, and the names are looked up.
175 *
176 * An rbd_dev structure contains a parent_spec pointer which is
177 * non-null if the image it represents is a child in a layered
178 * image. This pointer will refer to the rbd_spec structure used
179 * by the parent rbd_dev for its own identity (i.e., the structure
180 * is shared between the parent and child).
181 *
182 * Since these structures are populated once, during the discovery
183 * phase of image construction, they are effectively immutable so
184 * we make no effort to synchronize access to them.
185 *
186 * Note that code herein does not assume the image name is known (it
187 * could be a null pointer).
0d7dbfce
AE
188 */
189struct rbd_spec {
190 u64 pool_id;
ecb4dc22 191 const char *pool_name;
0d7dbfce 192
ecb4dc22
AE
193 const char *image_id;
194 const char *image_name;
0d7dbfce
AE
195
196 u64 snap_id;
ecb4dc22 197 const char *snap_name;
0d7dbfce
AE
198
199 struct kref kref;
200};
201
602adf40 202/*
f0f8cef5 203 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
204 */
205struct rbd_client {
206 struct ceph_client *client;
207 struct kref kref;
208 struct list_head node;
209};
210
bf0d5f50
AE
211struct rbd_img_request;
212typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
213
214#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
215
216struct rbd_obj_request;
217typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
218
9969ebc5 219enum obj_request_type {
a1fbb5e7 220 OBJ_REQUEST_NODATA = 1,
5359a17d 221 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
7e07efb1 222 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
9969ebc5 223};
bf0d5f50 224
6d2940c8 225enum obj_operation_type {
a1fbb5e7 226 OBJ_OP_READ = 1,
6d2940c8 227 OBJ_OP_WRITE,
90e98c52 228 OBJ_OP_DISCARD,
6d2940c8
GZ
229};
230
926f9b3f
AE
231enum obj_req_flags {
232 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 233 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
926f9b3f
AE
234};
235
3da691bf
ID
236/*
237 * Writes go through the following state machine to deal with
238 * layering:
239 *
240 * need copyup
241 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
242 * | ^ |
243 * v \------------------------------/
244 * done
245 * ^
246 * |
247 * RBD_OBJ_WRITE_FLAT
248 *
249 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
250 * there is a parent or not.
251 */
252enum rbd_obj_write_state {
253 RBD_OBJ_WRITE_FLAT = 1,
254 RBD_OBJ_WRITE_GUARD,
255 RBD_OBJ_WRITE_COPYUP,
256};
257
bf0d5f50 258struct rbd_obj_request {
a90bb0c1 259 u64 object_no;
bf0d5f50
AE
260 u64 offset; /* object start byte */
261 u64 length; /* bytes from offset */
926f9b3f 262 unsigned long flags;
3da691bf
ID
263 union {
264 bool tried_parent; /* for reads */
265 enum rbd_obj_write_state write_state; /* for writes */
266 };
bf0d5f50 267
c5b5ef6c
AE
268 /*
269 * An object request associated with an image will have its
270 * img_data flag set; a standalone object request will not.
271 *
c5b5ef6c
AE
272 * Finally, an object request for rbd image data will have
273 * which != BAD_WHICH, and will have a non-null img_request
274 * pointer. The value of which will be in the range
275 * 0..(img_request->obj_request_count-1).
276 */
51c3509e
ID
277 struct rbd_img_request *img_request;
278 u64 img_offset;
279 /* links for img_request->obj_requests list */
280 struct list_head links;
bf0d5f50
AE
281 u32 which; /* posn image request list */
282
283 enum obj_request_type type;
788e2df3 284 union {
5359a17d 285 struct ceph_bio_iter bio_pos;
788e2df3 286 struct {
7e07efb1
ID
287 struct ceph_bvec_iter bvec_pos;
288 u32 bvec_count;
788e2df3
AE
289 };
290 };
7e07efb1
ID
291 struct bio_vec *copyup_bvecs;
292 u32 copyup_bvec_count;
bf0d5f50
AE
293
294 struct ceph_osd_request *osd_req;
295
296 u64 xferred; /* bytes transferred */
1b83bef2 297 int result;
bf0d5f50
AE
298
299 rbd_obj_callback_t callback;
300
301 struct kref kref;
302};
303
0c425248 304enum img_req_flags {
9849e986 305 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 306 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
307};
308
bf0d5f50 309struct rbd_img_request {
bf0d5f50 310 struct rbd_device *rbd_dev;
9bb0248d 311 enum obj_operation_type op_type;
bf0d5f50
AE
312 u64 offset; /* starting image byte offset */
313 u64 length; /* byte count from offset */
0c425248 314 unsigned long flags;
bf0d5f50 315 union {
9849e986 316 u64 snap_id; /* for reads */
bf0d5f50 317 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
318 };
319 union {
320 struct request *rq; /* block request */
321 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50
AE
322 };
323 spinlock_t completion_lock;/* protects next_completion */
324 u32 next_completion;
325 rbd_img_callback_t callback;
55f27e09 326 u64 xferred;/* aggregate bytes transferred */
a5a337d4 327 int result; /* first nonzero obj_request result */
bf0d5f50
AE
328
329 u32 obj_request_count;
330 struct list_head obj_requests; /* rbd_obj_request structs */
331
332 struct kref kref;
333};
334
335#define for_each_obj_request(ireq, oreq) \
ef06f4d3 336 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 337#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 338 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 339#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 340 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 341
99d16943
ID
342enum rbd_watch_state {
343 RBD_WATCH_STATE_UNREGISTERED,
344 RBD_WATCH_STATE_REGISTERED,
345 RBD_WATCH_STATE_ERROR,
346};
347
ed95b21a
ID
348enum rbd_lock_state {
349 RBD_LOCK_STATE_UNLOCKED,
350 RBD_LOCK_STATE_LOCKED,
351 RBD_LOCK_STATE_RELEASING,
352};
353
354/* WatchNotify::ClientId */
355struct rbd_client_id {
356 u64 gid;
357 u64 handle;
358};
359
f84344f3 360struct rbd_mapping {
99c1f08f 361 u64 size;
34b13184 362 u64 features;
f84344f3
AE
363};
364
602adf40
YS
365/*
366 * a single device
367 */
368struct rbd_device {
de71a297 369 int dev_id; /* blkdev unique id */
602adf40
YS
370
371 int major; /* blkdev assigned major */
dd82fff1 372 int minor;
602adf40 373 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 374
a30b71b9 375 u32 image_format; /* Either 1 or 2 */
602adf40
YS
376 struct rbd_client *rbd_client;
377
378 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
379
b82d167b 380 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
381
382 struct rbd_image_header header;
b82d167b 383 unsigned long flags; /* possibly lock protected */
0d7dbfce 384 struct rbd_spec *spec;
d147543d 385 struct rbd_options *opts;
0d6d1e9c 386 char *config_info; /* add{,_single_major} string */
602adf40 387
c41d13a3 388 struct ceph_object_id header_oid;
922dab61 389 struct ceph_object_locator header_oloc;
971f839a 390
1643dfa4 391 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 392
99d16943
ID
393 struct mutex watch_mutex;
394 enum rbd_watch_state watch_state;
922dab61 395 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
396 u64 watch_cookie;
397 struct delayed_work watch_dwork;
59c2be1e 398
ed95b21a
ID
399 struct rw_semaphore lock_rwsem;
400 enum rbd_lock_state lock_state;
cbbfb0ff 401 char lock_cookie[32];
ed95b21a
ID
402 struct rbd_client_id owner_cid;
403 struct work_struct acquired_lock_work;
404 struct work_struct released_lock_work;
405 struct delayed_work lock_dwork;
406 struct work_struct unlock_work;
407 wait_queue_head_t lock_waitq;
408
1643dfa4 409 struct workqueue_struct *task_wq;
59c2be1e 410
86b00e0d
AE
411 struct rbd_spec *parent_spec;
412 u64 parent_overlap;
a2acd00e 413 atomic_t parent_ref;
2f82ee54 414 struct rbd_device *parent;
86b00e0d 415
7ad18afa
CH
416 /* Block layer tags. */
417 struct blk_mq_tag_set tag_set;
418
c666601a
JD
419 /* protects updating the header */
420 struct rw_semaphore header_rwsem;
f84344f3
AE
421
422 struct rbd_mapping mapping;
602adf40
YS
423
424 struct list_head node;
dfc5606d 425
dfc5606d
YS
426 /* sysfs related */
427 struct device dev;
b82d167b 428 unsigned long open_count; /* protected by lock */
dfc5606d
YS
429};
430
b82d167b 431/*
87c0fded
ID
432 * Flag bits for rbd_dev->flags:
433 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
434 * by rbd_dev->lock
435 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 436 */
6d292906
AE
437enum rbd_dev_flags {
438 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 439 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 440 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
441};
442
cfbf6377 443static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 444
602adf40 445static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
446static DEFINE_SPINLOCK(rbd_dev_list_lock);
447
432b8587
AE
448static LIST_HEAD(rbd_client_list); /* clients */
449static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 450
78c2a44a
AE
451/* Slab caches for frequently-allocated structures */
452
1c2a9dfe 453static struct kmem_cache *rbd_img_request_cache;
868311b1 454static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 455
9b60e70b 456static int rbd_major;
f8a22fc2
ID
457static DEFINE_IDA(rbd_dev_id_ida);
458
f5ee37bd
ID
459static struct workqueue_struct *rbd_wq;
460
9b60e70b 461/*
3cfa3b16 462 * single-major requires >= 0.75 version of userspace rbd utility.
9b60e70b 463 */
3cfa3b16 464static bool single_major = true;
9b60e70b 465module_param(single_major, bool, S_IRUGO);
3cfa3b16 466MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
9b60e70b 467
f0f8cef5
AE
468static ssize_t rbd_add(struct bus_type *bus, const char *buf,
469 size_t count);
470static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
471 size_t count);
9b60e70b
ID
472static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
473 size_t count);
474static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
475 size_t count);
6d69bb53 476static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 477static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 478
9b60e70b
ID
479static int rbd_dev_id_to_minor(int dev_id)
480{
7e513d43 481 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
482}
483
484static int minor_to_rbd_dev_id(int minor)
485{
7e513d43 486 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
487}
488
ed95b21a
ID
489static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
490{
491 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
492 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
493}
494
495static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
496{
497 bool is_lock_owner;
498
499 down_read(&rbd_dev->lock_rwsem);
500 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
501 up_read(&rbd_dev->lock_rwsem);
502 return is_lock_owner;
503}
504
8767b293
ID
505static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
506{
507 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
508}
509
b15a21dd
GKH
510static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
511static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
512static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
513static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
8767b293 514static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
b15a21dd
GKH
515
516static struct attribute *rbd_bus_attrs[] = {
517 &bus_attr_add.attr,
518 &bus_attr_remove.attr,
9b60e70b
ID
519 &bus_attr_add_single_major.attr,
520 &bus_attr_remove_single_major.attr,
8767b293 521 &bus_attr_supported_features.attr,
b15a21dd 522 NULL,
f0f8cef5 523};
92c76dc0
ID
524
525static umode_t rbd_bus_is_visible(struct kobject *kobj,
526 struct attribute *attr, int index)
527{
9b60e70b
ID
528 if (!single_major &&
529 (attr == &bus_attr_add_single_major.attr ||
530 attr == &bus_attr_remove_single_major.attr))
531 return 0;
532
92c76dc0
ID
533 return attr->mode;
534}
535
536static const struct attribute_group rbd_bus_group = {
537 .attrs = rbd_bus_attrs,
538 .is_visible = rbd_bus_is_visible,
539};
540__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
541
542static struct bus_type rbd_bus_type = {
543 .name = "rbd",
b15a21dd 544 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
545};
546
547static void rbd_root_dev_release(struct device *dev)
548{
549}
550
551static struct device rbd_root_dev = {
552 .init_name = "rbd",
553 .release = rbd_root_dev_release,
554};
555
06ecc6cb
AE
556static __printf(2, 3)
557void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
558{
559 struct va_format vaf;
560 va_list args;
561
562 va_start(args, fmt);
563 vaf.fmt = fmt;
564 vaf.va = &args;
565
566 if (!rbd_dev)
567 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
568 else if (rbd_dev->disk)
569 printk(KERN_WARNING "%s: %s: %pV\n",
570 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
571 else if (rbd_dev->spec && rbd_dev->spec->image_name)
572 printk(KERN_WARNING "%s: image %s: %pV\n",
573 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
574 else if (rbd_dev->spec && rbd_dev->spec->image_id)
575 printk(KERN_WARNING "%s: id %s: %pV\n",
576 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
577 else /* punt */
578 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
579 RBD_DRV_NAME, rbd_dev, &vaf);
580 va_end(args);
581}
582
aafb230e
AE
583#ifdef RBD_DEBUG
584#define rbd_assert(expr) \
585 if (unlikely(!(expr))) { \
586 printk(KERN_ERR "\nAssertion failure in %s() " \
587 "at line %d:\n\n" \
588 "\trbd_assert(%s);\n\n", \
589 __func__, __LINE__, #expr); \
590 BUG(); \
591 }
592#else /* !RBD_DEBUG */
593# define rbd_assert(expr) ((void) 0)
594#endif /* !RBD_DEBUG */
dfc5606d 595
05a46afd 596static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 597
cc4a38bd 598static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 599static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 600static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 601static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
602static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
603 u64 snap_id);
2ad3d716
AE
604static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
605 u8 *order, u64 *snap_size);
606static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
607 u64 *snap_features);
59c2be1e 608
602adf40
YS
609static int rbd_open(struct block_device *bdev, fmode_t mode)
610{
f0f8cef5 611 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 612 bool removing = false;
602adf40 613
a14ea269 614 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
615 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
616 removing = true;
617 else
618 rbd_dev->open_count++;
a14ea269 619 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
620 if (removing)
621 return -ENOENT;
622
c3e946ce 623 (void) get_device(&rbd_dev->dev);
340c7a2b 624
602adf40
YS
625 return 0;
626}
627
db2a144b 628static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
629{
630 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
631 unsigned long open_count_before;
632
a14ea269 633 spin_lock_irq(&rbd_dev->lock);
b82d167b 634 open_count_before = rbd_dev->open_count--;
a14ea269 635 spin_unlock_irq(&rbd_dev->lock);
b82d167b 636 rbd_assert(open_count_before > 0);
dfc5606d 637
c3e946ce 638 put_device(&rbd_dev->dev);
dfc5606d
YS
639}
640
131fd9f6
GZ
641static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
642{
1de797bb 643 int ro;
131fd9f6 644
1de797bb 645 if (get_user(ro, (int __user *)arg))
131fd9f6
GZ
646 return -EFAULT;
647
1de797bb 648 /* Snapshots can't be marked read-write */
131fd9f6
GZ
649 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
650 return -EROFS;
651
1de797bb
ID
652 /* Let blkdev_roset() handle it */
653 return -ENOTTY;
131fd9f6
GZ
654}
655
656static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
657 unsigned int cmd, unsigned long arg)
658{
659 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
1de797bb 660 int ret;
131fd9f6 661
131fd9f6
GZ
662 switch (cmd) {
663 case BLKROSET:
664 ret = rbd_ioctl_set_ro(rbd_dev, arg);
665 break;
666 default:
667 ret = -ENOTTY;
668 }
669
131fd9f6
GZ
670 return ret;
671}
672
673#ifdef CONFIG_COMPAT
674static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
675 unsigned int cmd, unsigned long arg)
676{
677 return rbd_ioctl(bdev, mode, cmd, arg);
678}
679#endif /* CONFIG_COMPAT */
680
602adf40
YS
681static const struct block_device_operations rbd_bd_ops = {
682 .owner = THIS_MODULE,
683 .open = rbd_open,
dfc5606d 684 .release = rbd_release,
131fd9f6
GZ
685 .ioctl = rbd_ioctl,
686#ifdef CONFIG_COMPAT
687 .compat_ioctl = rbd_compat_ioctl,
688#endif
602adf40
YS
689};
690
691/*
7262cfca 692 * Initialize an rbd client instance. Success or not, this function
cfbf6377 693 * consumes ceph_opts. Caller holds client_mutex.
602adf40 694 */
f8c38929 695static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
696{
697 struct rbd_client *rbdc;
698 int ret = -ENOMEM;
699
37206ee5 700 dout("%s:\n", __func__);
602adf40
YS
701 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
702 if (!rbdc)
703 goto out_opt;
704
705 kref_init(&rbdc->kref);
706 INIT_LIST_HEAD(&rbdc->node);
707
74da4a0f 708 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 709 if (IS_ERR(rbdc->client))
08f75463 710 goto out_rbdc;
43ae4701 711 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
712
713 ret = ceph_open_session(rbdc->client);
714 if (ret < 0)
08f75463 715 goto out_client;
602adf40 716
432b8587 717 spin_lock(&rbd_client_list_lock);
602adf40 718 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 719 spin_unlock(&rbd_client_list_lock);
602adf40 720
37206ee5 721 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 722
602adf40 723 return rbdc;
08f75463 724out_client:
602adf40 725 ceph_destroy_client(rbdc->client);
08f75463 726out_rbdc:
602adf40
YS
727 kfree(rbdc);
728out_opt:
43ae4701
AE
729 if (ceph_opts)
730 ceph_destroy_options(ceph_opts);
37206ee5
AE
731 dout("%s: error %d\n", __func__, ret);
732
28f259b7 733 return ERR_PTR(ret);
602adf40
YS
734}
735
2f82ee54
AE
736static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
737{
738 kref_get(&rbdc->kref);
739
740 return rbdc;
741}
742
602adf40 743/*
1f7ba331
AE
744 * Find a ceph client with specific addr and configuration. If
745 * found, bump its reference count.
602adf40 746 */
1f7ba331 747static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
748{
749 struct rbd_client *client_node;
1f7ba331 750 bool found = false;
602adf40 751
43ae4701 752 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
753 return NULL;
754
1f7ba331
AE
755 spin_lock(&rbd_client_list_lock);
756 list_for_each_entry(client_node, &rbd_client_list, node) {
757 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
758 __rbd_get_client(client_node);
759
1f7ba331
AE
760 found = true;
761 break;
762 }
763 }
764 spin_unlock(&rbd_client_list_lock);
765
766 return found ? client_node : NULL;
602adf40
YS
767}
768
59c2be1e 769/*
210c104c 770 * (Per device) rbd map options
59c2be1e
YS
771 */
772enum {
b5584180 773 Opt_queue_depth,
59c2be1e
YS
774 Opt_last_int,
775 /* int args above */
776 Opt_last_string,
777 /* string args above */
cc0538b6
AE
778 Opt_read_only,
779 Opt_read_write,
80de1912 780 Opt_lock_on_read,
e010dd0a 781 Opt_exclusive,
210c104c 782 Opt_err
59c2be1e
YS
783};
784
43ae4701 785static match_table_t rbd_opts_tokens = {
b5584180 786 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
787 /* int args above */
788 /* string args above */
be466c1c 789 {Opt_read_only, "read_only"},
cc0538b6
AE
790 {Opt_read_only, "ro"}, /* Alternate spelling */
791 {Opt_read_write, "read_write"},
792 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 793 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 794 {Opt_exclusive, "exclusive"},
210c104c 795 {Opt_err, NULL}
59c2be1e
YS
796};
797
98571b5a 798struct rbd_options {
b5584180 799 int queue_depth;
98571b5a 800 bool read_only;
80de1912 801 bool lock_on_read;
e010dd0a 802 bool exclusive;
98571b5a
AE
803};
804
b5584180 805#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 806#define RBD_READ_ONLY_DEFAULT false
80de1912 807#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 808#define RBD_EXCLUSIVE_DEFAULT false
98571b5a 809
59c2be1e
YS
810static int parse_rbd_opts_token(char *c, void *private)
811{
43ae4701 812 struct rbd_options *rbd_opts = private;
59c2be1e
YS
813 substring_t argstr[MAX_OPT_ARGS];
814 int token, intval, ret;
815
43ae4701 816 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
817 if (token < Opt_last_int) {
818 ret = match_int(&argstr[0], &intval);
819 if (ret < 0) {
210c104c 820 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
821 return ret;
822 }
823 dout("got int token %d val %d\n", token, intval);
824 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 825 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
826 } else {
827 dout("got token %d\n", token);
828 }
829
830 switch (token) {
b5584180
ID
831 case Opt_queue_depth:
832 if (intval < 1) {
833 pr_err("queue_depth out of range\n");
834 return -EINVAL;
835 }
836 rbd_opts->queue_depth = intval;
837 break;
cc0538b6
AE
838 case Opt_read_only:
839 rbd_opts->read_only = true;
840 break;
841 case Opt_read_write:
842 rbd_opts->read_only = false;
843 break;
80de1912
ID
844 case Opt_lock_on_read:
845 rbd_opts->lock_on_read = true;
846 break;
e010dd0a
ID
847 case Opt_exclusive:
848 rbd_opts->exclusive = true;
849 break;
59c2be1e 850 default:
210c104c
ID
851 /* libceph prints "bad option" msg */
852 return -EINVAL;
59c2be1e 853 }
210c104c 854
59c2be1e
YS
855 return 0;
856}
857
6d2940c8
GZ
858static char* obj_op_name(enum obj_operation_type op_type)
859{
860 switch (op_type) {
861 case OBJ_OP_READ:
862 return "read";
863 case OBJ_OP_WRITE:
864 return "write";
90e98c52
GZ
865 case OBJ_OP_DISCARD:
866 return "discard";
6d2940c8
GZ
867 default:
868 return "???";
869 }
870}
871
602adf40
YS
872/*
873 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
874 * not exist create it. Either way, ceph_opts is consumed by this
875 * function.
602adf40 876 */
9d3997fd 877static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 878{
f8c38929 879 struct rbd_client *rbdc;
59c2be1e 880
cfbf6377 881 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 882 rbdc = rbd_client_find(ceph_opts);
9d3997fd 883 if (rbdc) /* using an existing client */
43ae4701 884 ceph_destroy_options(ceph_opts);
9d3997fd 885 else
f8c38929 886 rbdc = rbd_client_create(ceph_opts);
cfbf6377 887 mutex_unlock(&client_mutex);
602adf40 888
9d3997fd 889 return rbdc;
602adf40
YS
890}
891
892/*
893 * Destroy ceph client
d23a4b3f 894 *
432b8587 895 * Caller must hold rbd_client_list_lock.
602adf40
YS
896 */
897static void rbd_client_release(struct kref *kref)
898{
899 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
900
37206ee5 901 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 902 spin_lock(&rbd_client_list_lock);
602adf40 903 list_del(&rbdc->node);
cd9d9f5d 904 spin_unlock(&rbd_client_list_lock);
602adf40
YS
905
906 ceph_destroy_client(rbdc->client);
907 kfree(rbdc);
908}
909
910/*
911 * Drop reference to ceph client node. If it's not referenced anymore, release
912 * it.
913 */
9d3997fd 914static void rbd_put_client(struct rbd_client *rbdc)
602adf40 915{
c53d5893
AE
916 if (rbdc)
917 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
918}
919
a30b71b9
AE
920static bool rbd_image_format_valid(u32 image_format)
921{
922 return image_format == 1 || image_format == 2;
923}
924
8e94af8e
AE
925static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
926{
103a150f
AE
927 size_t size;
928 u32 snap_count;
929
930 /* The header has to start with the magic rbd header text */
931 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
932 return false;
933
db2388b6
AE
934 /* The bio layer requires at least sector-sized I/O */
935
936 if (ondisk->options.order < SECTOR_SHIFT)
937 return false;
938
939 /* If we use u64 in a few spots we may be able to loosen this */
940
941 if (ondisk->options.order > 8 * sizeof (int) - 1)
942 return false;
943
103a150f
AE
944 /*
945 * The size of a snapshot header has to fit in a size_t, and
946 * that limits the number of snapshots.
947 */
948 snap_count = le32_to_cpu(ondisk->snap_count);
949 size = SIZE_MAX - sizeof (struct ceph_snap_context);
950 if (snap_count > size / sizeof (__le64))
951 return false;
952
953 /*
954 * Not only that, but the size of the entire the snapshot
955 * header must also be representable in a size_t.
956 */
957 size -= snap_count * sizeof (__le64);
958 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
959 return false;
960
961 return true;
8e94af8e
AE
962}
963
5bc3fb17
ID
964/*
965 * returns the size of an object in the image
966 */
967static u32 rbd_obj_bytes(struct rbd_image_header *header)
968{
969 return 1U << header->obj_order;
970}
971
263423f8
ID
972static void rbd_init_layout(struct rbd_device *rbd_dev)
973{
974 if (rbd_dev->header.stripe_unit == 0 ||
975 rbd_dev->header.stripe_count == 0) {
976 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
977 rbd_dev->header.stripe_count = 1;
978 }
979
980 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
981 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
982 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
983 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
984 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
985 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
986}
987
602adf40 988/*
bb23e37a
AE
989 * Fill an rbd image header with information from the given format 1
990 * on-disk header.
602adf40 991 */
662518b1 992static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 993 struct rbd_image_header_ondisk *ondisk)
602adf40 994{
662518b1 995 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
996 bool first_time = header->object_prefix == NULL;
997 struct ceph_snap_context *snapc;
998 char *object_prefix = NULL;
999 char *snap_names = NULL;
1000 u64 *snap_sizes = NULL;
ccece235 1001 u32 snap_count;
bb23e37a 1002 int ret = -ENOMEM;
621901d6 1003 u32 i;
602adf40 1004
bb23e37a 1005 /* Allocate this now to avoid having to handle failure below */
6a52325f 1006
bb23e37a 1007 if (first_time) {
848d796c
ID
1008 object_prefix = kstrndup(ondisk->object_prefix,
1009 sizeof(ondisk->object_prefix),
1010 GFP_KERNEL);
bb23e37a
AE
1011 if (!object_prefix)
1012 return -ENOMEM;
bb23e37a 1013 }
00f1f36f 1014
bb23e37a 1015 /* Allocate the snapshot context and fill it in */
00f1f36f 1016
bb23e37a
AE
1017 snap_count = le32_to_cpu(ondisk->snap_count);
1018 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1019 if (!snapc)
1020 goto out_err;
1021 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1022 if (snap_count) {
bb23e37a 1023 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1024 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1025
bb23e37a 1026 /* We'll keep a copy of the snapshot names... */
621901d6 1027
bb23e37a
AE
1028 if (snap_names_len > (u64)SIZE_MAX)
1029 goto out_2big;
1030 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1031 if (!snap_names)
6a52325f
AE
1032 goto out_err;
1033
bb23e37a 1034 /* ...as well as the array of their sizes. */
88a25a5f
ME
1035 snap_sizes = kmalloc_array(snap_count,
1036 sizeof(*header->snap_sizes),
1037 GFP_KERNEL);
bb23e37a 1038 if (!snap_sizes)
6a52325f 1039 goto out_err;
bb23e37a 1040
f785cc1d 1041 /*
bb23e37a
AE
1042 * Copy the names, and fill in each snapshot's id
1043 * and size.
1044 *
99a41ebc 1045 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1046 * ondisk buffer we're working with has
f785cc1d
AE
1047 * snap_names_len bytes beyond the end of the
1048 * snapshot id array, this memcpy() is safe.
1049 */
bb23e37a
AE
1050 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1051 snaps = ondisk->snaps;
1052 for (i = 0; i < snap_count; i++) {
1053 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1054 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1055 }
602adf40 1056 }
6a52325f 1057
bb23e37a 1058 /* We won't fail any more, fill in the header */
621901d6 1059
bb23e37a
AE
1060 if (first_time) {
1061 header->object_prefix = object_prefix;
1062 header->obj_order = ondisk->options.order;
263423f8 1063 rbd_init_layout(rbd_dev);
602adf40 1064 } else {
662518b1
AE
1065 ceph_put_snap_context(header->snapc);
1066 kfree(header->snap_names);
1067 kfree(header->snap_sizes);
602adf40 1068 }
849b4260 1069
bb23e37a 1070 /* The remaining fields always get updated (when we refresh) */
621901d6 1071
f84344f3 1072 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1073 header->snapc = snapc;
1074 header->snap_names = snap_names;
1075 header->snap_sizes = snap_sizes;
468521c1 1076
602adf40 1077 return 0;
bb23e37a
AE
1078out_2big:
1079 ret = -EIO;
6a52325f 1080out_err:
bb23e37a
AE
1081 kfree(snap_sizes);
1082 kfree(snap_names);
1083 ceph_put_snap_context(snapc);
1084 kfree(object_prefix);
ccece235 1085
bb23e37a 1086 return ret;
602adf40
YS
1087}
1088
9682fc6d
AE
1089static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1090{
1091 const char *snap_name;
1092
1093 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1094
1095 /* Skip over names until we find the one we are looking for */
1096
1097 snap_name = rbd_dev->header.snap_names;
1098 while (which--)
1099 snap_name += strlen(snap_name) + 1;
1100
1101 return kstrdup(snap_name, GFP_KERNEL);
1102}
1103
30d1cff8
AE
1104/*
1105 * Snapshot id comparison function for use with qsort()/bsearch().
1106 * Note that result is for snapshots in *descending* order.
1107 */
1108static int snapid_compare_reverse(const void *s1, const void *s2)
1109{
1110 u64 snap_id1 = *(u64 *)s1;
1111 u64 snap_id2 = *(u64 *)s2;
1112
1113 if (snap_id1 < snap_id2)
1114 return 1;
1115 return snap_id1 == snap_id2 ? 0 : -1;
1116}
1117
1118/*
1119 * Search a snapshot context to see if the given snapshot id is
1120 * present.
1121 *
1122 * Returns the position of the snapshot id in the array if it's found,
1123 * or BAD_SNAP_INDEX otherwise.
1124 *
1125 * Note: The snapshot array is in kept sorted (by the osd) in
1126 * reverse order, highest snapshot id first.
1127 */
9682fc6d
AE
1128static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1129{
1130 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1131 u64 *found;
9682fc6d 1132
30d1cff8
AE
1133 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1134 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1135
30d1cff8 1136 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1137}
1138
2ad3d716
AE
1139static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1140 u64 snap_id)
9e15b77d 1141{
54cac61f 1142 u32 which;
da6a6b63 1143 const char *snap_name;
9e15b77d 1144
54cac61f
AE
1145 which = rbd_dev_snap_index(rbd_dev, snap_id);
1146 if (which == BAD_SNAP_INDEX)
da6a6b63 1147 return ERR_PTR(-ENOENT);
54cac61f 1148
da6a6b63
JD
1149 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1150 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1151}
1152
1153static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1154{
9e15b77d
AE
1155 if (snap_id == CEPH_NOSNAP)
1156 return RBD_SNAP_HEAD_NAME;
1157
54cac61f
AE
1158 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1159 if (rbd_dev->image_format == 1)
1160 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1161
54cac61f 1162 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1163}
1164
2ad3d716
AE
1165static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1166 u64 *snap_size)
602adf40 1167{
2ad3d716
AE
1168 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1169 if (snap_id == CEPH_NOSNAP) {
1170 *snap_size = rbd_dev->header.image_size;
1171 } else if (rbd_dev->image_format == 1) {
1172 u32 which;
602adf40 1173
2ad3d716
AE
1174 which = rbd_dev_snap_index(rbd_dev, snap_id);
1175 if (which == BAD_SNAP_INDEX)
1176 return -ENOENT;
e86924a8 1177
2ad3d716
AE
1178 *snap_size = rbd_dev->header.snap_sizes[which];
1179 } else {
1180 u64 size = 0;
1181 int ret;
1182
1183 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1184 if (ret)
1185 return ret;
1186
1187 *snap_size = size;
1188 }
1189 return 0;
602adf40
YS
1190}
1191
2ad3d716
AE
1192static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1193 u64 *snap_features)
602adf40 1194{
2ad3d716
AE
1195 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1196 if (snap_id == CEPH_NOSNAP) {
1197 *snap_features = rbd_dev->header.features;
1198 } else if (rbd_dev->image_format == 1) {
1199 *snap_features = 0; /* No features for format 1 */
602adf40 1200 } else {
2ad3d716
AE
1201 u64 features = 0;
1202 int ret;
8b0241f8 1203
2ad3d716
AE
1204 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1205 if (ret)
1206 return ret;
1207
1208 *snap_features = features;
1209 }
1210 return 0;
1211}
1212
1213static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1214{
8f4b7d98 1215 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1216 u64 size = 0;
1217 u64 features = 0;
1218 int ret;
1219
2ad3d716
AE
1220 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1221 if (ret)
1222 return ret;
1223 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1224 if (ret)
1225 return ret;
1226
1227 rbd_dev->mapping.size = size;
1228 rbd_dev->mapping.features = features;
1229
8b0241f8 1230 return 0;
602adf40
YS
1231}
1232
d1cf5788
AE
1233static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1234{
1235 rbd_dev->mapping.size = 0;
1236 rbd_dev->mapping.features = 0;
200a6a8b
AE
1237}
1238
65ccfe21
AE
1239static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1240{
5bc3fb17 1241 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
602adf40 1242
65ccfe21
AE
1243 return offset & (segment_size - 1);
1244}
1245
1246static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1247 u64 offset, u64 length)
1248{
5bc3fb17 1249 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
65ccfe21
AE
1250
1251 offset &= segment_size - 1;
1252
aafb230e 1253 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1254 if (offset + length > segment_size)
1255 length = segment_size - offset;
1256
1257 return length;
602adf40
YS
1258}
1259
5359a17d
ID
1260static void zero_bvec(struct bio_vec *bv)
1261{
1262 void *buf;
1263 unsigned long flags;
1264
1265 buf = bvec_kmap_irq(bv, &flags);
1266 memset(buf, 0, bv->bv_len);
1267 flush_dcache_page(bv->bv_page);
1268 bvec_kunmap_irq(buf, &flags);
1269}
1270
1271static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1272{
1273 struct ceph_bio_iter it = *bio_pos;
1274
1275 ceph_bio_iter_advance(&it, off);
1276 ceph_bio_iter_advance_step(&it, bytes, ({
1277 zero_bvec(&bv);
1278 }));
1279}
1280
7e07efb1
ID
1281static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1282{
1283 struct ceph_bvec_iter it = *bvec_pos;
1284
1285 ceph_bvec_iter_advance(&it, off);
1286 ceph_bvec_iter_advance_step(&it, bytes, ({
1287 zero_bvec(&bv);
1288 }));
b9434c5b
AE
1289}
1290
3da691bf
ID
1291/*
1292 * Zero a range in @obj_req data buffer defined by a bio (list) or
1293 * bio_vec array.
1294 *
1295 * @off is relative to the start of the data buffer.
1296 */
1297static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1298 u32 bytes)
1299{
1300 switch (obj_req->type) {
1301 case OBJ_REQUEST_BIO:
1302 zero_bios(&obj_req->bio_pos, off, bytes);
1303 break;
1304 case OBJ_REQUEST_BVECS:
1305 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1306 break;
1307 default:
1308 rbd_assert(0);
1309 }
1310}
1311
926f9b3f
AE
1312/*
1313 * The default/initial value for all object request flags is 0. For
1314 * each flag, once its value is set to 1 it is never reset to 0
1315 * again.
1316 */
57acbaa7 1317static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1318{
57acbaa7 1319 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1320 struct rbd_device *rbd_dev;
1321
57acbaa7 1322 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1323 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1324 obj_request);
1325 }
1326}
1327
57acbaa7 1328static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1329{
1330 smp_mb();
57acbaa7 1331 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1332}
1333
57acbaa7 1334static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1335{
57acbaa7
AE
1336 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1337 struct rbd_device *rbd_dev = NULL;
6365d33a 1338
57acbaa7
AE
1339 if (obj_request_img_data_test(obj_request))
1340 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1341 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1342 obj_request);
1343 }
1344}
1345
57acbaa7 1346static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1347{
1348 smp_mb();
57acbaa7 1349 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1350}
1351
9638556a
ID
1352static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1353{
1354 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1355
1356 return obj_request->img_offset <
1357 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1358}
1359
bf0d5f50
AE
1360static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1361{
37206ee5 1362 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1363 kref_read(&obj_request->kref));
bf0d5f50
AE
1364 kref_get(&obj_request->kref);
1365}
1366
1367static void rbd_obj_request_destroy(struct kref *kref);
1368static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1369{
1370 rbd_assert(obj_request != NULL);
37206ee5 1371 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1372 kref_read(&obj_request->kref));
bf0d5f50
AE
1373 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1374}
1375
0f2d5be7
AE
1376static void rbd_img_request_get(struct rbd_img_request *img_request)
1377{
1378 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1379 kref_read(&img_request->kref));
0f2d5be7
AE
1380 kref_get(&img_request->kref);
1381}
1382
e93f3152
AE
1383static bool img_request_child_test(struct rbd_img_request *img_request);
1384static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1385static void rbd_img_request_destroy(struct kref *kref);
1386static void rbd_img_request_put(struct rbd_img_request *img_request)
1387{
1388 rbd_assert(img_request != NULL);
37206ee5 1389 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1390 kref_read(&img_request->kref));
e93f3152
AE
1391 if (img_request_child_test(img_request))
1392 kref_put(&img_request->kref, rbd_parent_request_destroy);
1393 else
1394 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1395}
1396
1397static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1398 struct rbd_obj_request *obj_request)
1399{
25dcf954
AE
1400 rbd_assert(obj_request->img_request == NULL);
1401
b155e86c 1402 /* Image request now owns object's original reference */
bf0d5f50 1403 obj_request->img_request = img_request;
25dcf954 1404 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1405 rbd_assert(!obj_request_img_data_test(obj_request));
1406 obj_request_img_data_set(obj_request);
bf0d5f50 1407 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1408 img_request->obj_request_count++;
1409 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1410 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1411 obj_request->which);
bf0d5f50
AE
1412}
1413
1414static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1415 struct rbd_obj_request *obj_request)
1416{
1417 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1418
37206ee5
AE
1419 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1420 obj_request->which);
bf0d5f50 1421 list_del(&obj_request->links);
25dcf954
AE
1422 rbd_assert(img_request->obj_request_count > 0);
1423 img_request->obj_request_count--;
1424 rbd_assert(obj_request->which == img_request->obj_request_count);
1425 obj_request->which = BAD_WHICH;
6365d33a 1426 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1427 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1428 obj_request->img_request = NULL;
25dcf954 1429 obj_request->callback = NULL;
bf0d5f50
AE
1430 rbd_obj_request_put(obj_request);
1431}
1432
1433static bool obj_request_type_valid(enum obj_request_type type)
1434{
1435 switch (type) {
9969ebc5 1436 case OBJ_REQUEST_NODATA:
bf0d5f50 1437 case OBJ_REQUEST_BIO:
7e07efb1 1438 case OBJ_REQUEST_BVECS:
bf0d5f50
AE
1439 return true;
1440 default:
1441 return false;
1442 }
1443}
1444
4a17dadc
ID
1445static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1446
980917fc 1447static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1448{
980917fc
ID
1449 struct ceph_osd_request *osd_req = obj_request->osd_req;
1450
a90bb0c1
ID
1451 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1452 obj_request, obj_request->object_no, obj_request->offset,
67e2b652 1453 obj_request->length, osd_req);
4a17dadc
ID
1454 if (obj_request_img_data_test(obj_request)) {
1455 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1456 rbd_img_request_get(obj_request->img_request);
1457 }
980917fc 1458 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1459}
1460
1461static void rbd_img_request_complete(struct rbd_img_request *img_request)
1462{
55f27e09 1463
37206ee5 1464 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1465
1466 /*
1467 * If no error occurred, compute the aggregate transfer
1468 * count for the image request. We could instead use
1469 * atomic64_cmpxchg() to update it as each object request
1470 * completes; not clear which way is better off hand.
1471 */
1472 if (!img_request->result) {
1473 struct rbd_obj_request *obj_request;
1474 u64 xferred = 0;
1475
1476 for_each_obj_request(img_request, obj_request)
1477 xferred += obj_request->xferred;
1478 img_request->xferred = xferred;
1479 }
1480
bf0d5f50
AE
1481 if (img_request->callback)
1482 img_request->callback(img_request);
1483 else
1484 rbd_img_request_put(img_request);
1485}
1486
0c425248
AE
1487/*
1488 * The default/initial value for all image request flags is 0. Each
1489 * is conditionally set to 1 at image request initialization time
1490 * and currently never change thereafter.
1491 */
9849e986
AE
1492static void img_request_child_set(struct rbd_img_request *img_request)
1493{
1494 set_bit(IMG_REQ_CHILD, &img_request->flags);
1495 smp_mb();
1496}
1497
e93f3152
AE
1498static void img_request_child_clear(struct rbd_img_request *img_request)
1499{
1500 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1501 smp_mb();
1502}
1503
9849e986
AE
1504static bool img_request_child_test(struct rbd_img_request *img_request)
1505{
1506 smp_mb();
1507 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1508}
1509
d0b2e944
AE
1510static void img_request_layered_set(struct rbd_img_request *img_request)
1511{
1512 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1513 smp_mb();
1514}
1515
a2acd00e
AE
1516static void img_request_layered_clear(struct rbd_img_request *img_request)
1517{
1518 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1519 smp_mb();
1520}
1521
d0b2e944
AE
1522static bool img_request_layered_test(struct rbd_img_request *img_request)
1523{
1524 smp_mb();
1525 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1526}
1527
3da691bf
ID
1528static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1529{
1530 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1531
1532 return !obj_req->offset &&
1533 obj_req->length == rbd_dev->layout.object_size;
1534}
1535
1536static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1537{
1538 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1539
1540 return obj_req->offset + obj_req->length ==
1541 rbd_dev->layout.object_size;
1542}
1543
1544static bool rbd_img_is_write(struct rbd_img_request *img_req)
1545{
9bb0248d 1546 switch (img_req->op_type) {
3da691bf
ID
1547 case OBJ_OP_READ:
1548 return false;
1549 case OBJ_OP_WRITE:
1550 case OBJ_OP_DISCARD:
1551 return true;
1552 default:
1553 rbd_assert(0);
1554 }
1555}
1556
bf0d5f50
AE
1557static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1558{
37206ee5
AE
1559 dout("%s: obj %p cb %p\n", __func__, obj_request,
1560 obj_request->callback);
2e584bce 1561 obj_request->callback(obj_request);
bf0d5f50
AE
1562}
1563
3da691bf
ID
1564static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1565
85e084fe 1566static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50 1567{
3da691bf 1568 struct rbd_obj_request *obj_req = osd_req->r_priv;
bf0d5f50 1569
3da691bf
ID
1570 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1571 osd_req->r_result, obj_req);
1572 rbd_assert(osd_req == obj_req->osd_req);
0ccd5926 1573
3da691bf
ID
1574 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1575 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1576 obj_req->xferred = osd_req->r_result;
1577 else
1578 /*
1579 * Writes aren't allowed to return a data payload. In some
1580 * guarded write cases (e.g. stat + zero on an empty object)
1581 * a stat response makes it through, but we don't care.
1582 */
1583 obj_req->xferred = 0;
bf0d5f50 1584
3da691bf 1585 rbd_obj_handle_request(obj_req);
bf0d5f50
AE
1586}
1587
9d4df01f 1588static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1589{
8c042b0d 1590 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1591
7c84883a 1592 rbd_assert(obj_request_img_data_test(obj_request));
a162b308 1593 osd_req->r_flags = CEPH_OSD_FLAG_READ;
7c84883a 1594 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1595}
1596
1597static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1598{
9d4df01f 1599 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1600
a162b308 1601 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1134e091 1602 ktime_get_real_ts(&osd_req->r_mtime);
bb873b53 1603 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1604}
1605
bc81207e 1606static struct ceph_osd_request *
a162b308 1607rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
bc81207e 1608{
a162b308
ID
1609 struct rbd_img_request *img_req = obj_req->img_request;
1610 struct rbd_device *rbd_dev = img_req->rbd_dev;
bc81207e
ID
1611 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1612 struct ceph_osd_request *req;
a90bb0c1
ID
1613 const char *name_format = rbd_dev->image_format == 1 ?
1614 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e 1615
a162b308
ID
1616 req = ceph_osdc_alloc_request(osdc,
1617 (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1618 num_ops, false, GFP_NOIO);
bc81207e
ID
1619 if (!req)
1620 return NULL;
1621
bc81207e 1622 req->r_callback = rbd_osd_req_callback;
a162b308 1623 req->r_priv = obj_req;
bc81207e
ID
1624
1625 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
a90bb0c1 1626 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
a162b308 1627 rbd_dev->header.object_prefix, obj_req->object_no))
bc81207e
ID
1628 goto err_req;
1629
1630 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1631 goto err_req;
1632
1633 return req;
1634
1635err_req:
1636 ceph_osdc_put_request(req);
1637 return NULL;
1638}
1639
bf0d5f50
AE
1640static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1641{
1642 ceph_osdc_put_request(osd_req);
1643}
1644
6c696d85
ID
1645static struct rbd_obj_request *
1646rbd_obj_request_create(enum obj_request_type type)
bf0d5f50
AE
1647{
1648 struct rbd_obj_request *obj_request;
bf0d5f50
AE
1649
1650 rbd_assert(obj_request_type_valid(type));
1651
5a60e876 1652 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 1653 if (!obj_request)
f907ad55 1654 return NULL;
f907ad55 1655
bf0d5f50
AE
1656 obj_request->which = BAD_WHICH;
1657 obj_request->type = type;
1658 INIT_LIST_HEAD(&obj_request->links);
bf0d5f50
AE
1659 kref_init(&obj_request->kref);
1660
67e2b652 1661 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
1662 return obj_request;
1663}
1664
1665static void rbd_obj_request_destroy(struct kref *kref)
1666{
1667 struct rbd_obj_request *obj_request;
7e07efb1 1668 u32 i;
bf0d5f50
AE
1669
1670 obj_request = container_of(kref, struct rbd_obj_request, kref);
1671
37206ee5
AE
1672 dout("%s: obj %p\n", __func__, obj_request);
1673
bf0d5f50
AE
1674 rbd_assert(obj_request->img_request == NULL);
1675 rbd_assert(obj_request->which == BAD_WHICH);
1676
1677 if (obj_request->osd_req)
1678 rbd_osd_req_destroy(obj_request->osd_req);
1679
bf0d5f50 1680 switch (obj_request->type) {
9969ebc5 1681 case OBJ_REQUEST_NODATA:
bf0d5f50 1682 case OBJ_REQUEST_BIO:
7e07efb1 1683 case OBJ_REQUEST_BVECS:
5359a17d 1684 break; /* Nothing to do */
7e07efb1
ID
1685 default:
1686 rbd_assert(0);
bf0d5f50
AE
1687 }
1688
7e07efb1
ID
1689 if (obj_request->copyup_bvecs) {
1690 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1691 if (obj_request->copyup_bvecs[i].bv_page)
1692 __free_page(obj_request->copyup_bvecs[i].bv_page);
1693 }
1694 kfree(obj_request->copyup_bvecs);
1695 }
f9dcbc44 1696
868311b1 1697 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1698}
1699
fb65d228
AE
1700/* It's OK to call this for a device with no parent */
1701
1702static void rbd_spec_put(struct rbd_spec *spec);
1703static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1704{
1705 rbd_dev_remove_parent(rbd_dev);
1706 rbd_spec_put(rbd_dev->parent_spec);
1707 rbd_dev->parent_spec = NULL;
1708 rbd_dev->parent_overlap = 0;
1709}
1710
a2acd00e
AE
1711/*
1712 * Parent image reference counting is used to determine when an
1713 * image's parent fields can be safely torn down--after there are no
1714 * more in-flight requests to the parent image. When the last
1715 * reference is dropped, cleaning them up is safe.
1716 */
1717static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1718{
1719 int counter;
1720
1721 if (!rbd_dev->parent_spec)
1722 return;
1723
1724 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1725 if (counter > 0)
1726 return;
1727
1728 /* Last reference; clean up parent data structures */
1729
1730 if (!counter)
1731 rbd_dev_unparent(rbd_dev);
1732 else
9584d508 1733 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
1734}
1735
1736/*
1737 * If an image has a non-zero parent overlap, get a reference to its
1738 * parent.
1739 *
1740 * Returns true if the rbd device has a parent with a non-zero
1741 * overlap and a reference for it was successfully taken, or
1742 * false otherwise.
1743 */
1744static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1745{
ae43e9d0 1746 int counter = 0;
a2acd00e
AE
1747
1748 if (!rbd_dev->parent_spec)
1749 return false;
1750
ae43e9d0
ID
1751 down_read(&rbd_dev->header_rwsem);
1752 if (rbd_dev->parent_overlap)
1753 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1754 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
1755
1756 if (counter < 0)
9584d508 1757 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 1758
ae43e9d0 1759 return counter > 0;
a2acd00e
AE
1760}
1761
bf0d5f50
AE
1762/*
1763 * Caller is responsible for filling in the list of object requests
1764 * that comprises the image request, and the Linux request pointer
1765 * (if there is one).
1766 */
cc344fa1
AE
1767static struct rbd_img_request *rbd_img_request_create(
1768 struct rbd_device *rbd_dev,
bf0d5f50 1769 u64 offset, u64 length,
6d2940c8 1770 enum obj_operation_type op_type,
4e752f0a 1771 struct ceph_snap_context *snapc)
bf0d5f50
AE
1772{
1773 struct rbd_img_request *img_request;
bf0d5f50 1774
a0c5895b 1775 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
1776 if (!img_request)
1777 return NULL;
1778
bf0d5f50 1779 img_request->rbd_dev = rbd_dev;
9bb0248d 1780 img_request->op_type = op_type;
bf0d5f50
AE
1781 img_request->offset = offset;
1782 img_request->length = length;
9bb0248d 1783 if (!rbd_img_is_write(img_request))
bf0d5f50 1784 img_request->snap_id = rbd_dev->spec->snap_id;
9bb0248d
ID
1785 else
1786 img_request->snapc = snapc;
1787
a2acd00e 1788 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1789 img_request_layered_set(img_request);
a0c5895b 1790
bf0d5f50 1791 spin_lock_init(&img_request->completion_lock);
bf0d5f50
AE
1792 INIT_LIST_HEAD(&img_request->obj_requests);
1793 kref_init(&img_request->kref);
1794
37206ee5 1795 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 1796 obj_op_name(op_type), offset, length, img_request);
37206ee5 1797
bf0d5f50
AE
1798 return img_request;
1799}
1800
1801static void rbd_img_request_destroy(struct kref *kref)
1802{
1803 struct rbd_img_request *img_request;
1804 struct rbd_obj_request *obj_request;
1805 struct rbd_obj_request *next_obj_request;
1806
1807 img_request = container_of(kref, struct rbd_img_request, kref);
1808
37206ee5
AE
1809 dout("%s: img %p\n", __func__, img_request);
1810
bf0d5f50
AE
1811 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1812 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1813 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1814
a2acd00e
AE
1815 if (img_request_layered_test(img_request)) {
1816 img_request_layered_clear(img_request);
1817 rbd_dev_parent_put(img_request->rbd_dev);
1818 }
1819
9bb0248d 1820 if (rbd_img_is_write(img_request))
812164f8 1821 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1822
1c2a9dfe 1823 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1824}
1825
e93f3152
AE
1826static struct rbd_img_request *rbd_parent_request_create(
1827 struct rbd_obj_request *obj_request,
1828 u64 img_offset, u64 length)
1829{
1830 struct rbd_img_request *parent_request;
1831 struct rbd_device *rbd_dev;
1832
1833 rbd_assert(obj_request->img_request);
1834 rbd_dev = obj_request->img_request->rbd_dev;
1835
4e752f0a 1836 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 1837 length, OBJ_OP_READ, NULL);
e93f3152
AE
1838 if (!parent_request)
1839 return NULL;
1840
1841 img_request_child_set(parent_request);
1842 rbd_obj_request_get(obj_request);
1843 parent_request->obj_request = obj_request;
1844
1845 return parent_request;
1846}
1847
1848static void rbd_parent_request_destroy(struct kref *kref)
1849{
1850 struct rbd_img_request *parent_request;
1851 struct rbd_obj_request *orig_request;
1852
1853 parent_request = container_of(kref, struct rbd_img_request, kref);
1854 orig_request = parent_request->obj_request;
1855
1856 parent_request->obj_request = NULL;
1857 rbd_obj_request_put(orig_request);
1858 img_request_child_clear(parent_request);
1859
1860 rbd_img_request_destroy(kref);
1861}
1862
1217857f
AE
1863static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1864{
6365d33a 1865 struct rbd_img_request *img_request;
1217857f
AE
1866 unsigned int xferred;
1867 int result;
8b3e1a56 1868 bool more;
1217857f 1869
6365d33a
AE
1870 rbd_assert(obj_request_img_data_test(obj_request));
1871 img_request = obj_request->img_request;
1872
1217857f
AE
1873 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1874 xferred = (unsigned int)obj_request->xferred;
1875 result = obj_request->result;
1876 if (result) {
1877 struct rbd_device *rbd_dev = img_request->rbd_dev;
1878
9584d508 1879 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
9bb0248d 1880 obj_op_name(img_request->op_type), obj_request->length,
6d2940c8 1881 obj_request->img_offset, obj_request->offset);
9584d508 1882 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
1883 result, xferred);
1884 if (!img_request->result)
1885 img_request->result = result;
082a75da
ID
1886 /*
1887 * Need to end I/O on the entire obj_request worth of
1888 * bytes in case of error.
1889 */
1890 xferred = obj_request->length;
1217857f
AE
1891 }
1892
8b3e1a56
AE
1893 if (img_request_child_test(img_request)) {
1894 rbd_assert(img_request->obj_request != NULL);
1895 more = obj_request->which < img_request->obj_request_count - 1;
1896 } else {
2a842aca
CH
1897 blk_status_t status = errno_to_blk_status(result);
1898
8b3e1a56 1899 rbd_assert(img_request->rq != NULL);
7ad18afa 1900
2a842aca 1901 more = blk_update_request(img_request->rq, status, xferred);
7ad18afa 1902 if (!more)
2a842aca 1903 __blk_mq_end_request(img_request->rq, status);
8b3e1a56
AE
1904 }
1905
1906 return more;
1217857f
AE
1907}
1908
2169238d
AE
1909static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1910{
1911 struct rbd_img_request *img_request;
1912 u32 which = obj_request->which;
1913 bool more = true;
1914
6365d33a 1915 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1916 img_request = obj_request->img_request;
1917
1918 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1919 rbd_assert(img_request != NULL);
2169238d
AE
1920 rbd_assert(img_request->obj_request_count > 0);
1921 rbd_assert(which != BAD_WHICH);
1922 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
1923
1924 spin_lock_irq(&img_request->completion_lock);
1925 if (which != img_request->next_completion)
1926 goto out;
1927
1928 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1929 rbd_assert(more);
1930 rbd_assert(which < img_request->obj_request_count);
1931
1932 if (!obj_request_done_test(obj_request))
1933 break;
1217857f 1934 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1935 which++;
1936 }
1937
1938 rbd_assert(more ^ (which == img_request->obj_request_count));
1939 img_request->next_completion = which;
1940out:
1941 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 1942 rbd_img_request_put(img_request);
2169238d
AE
1943
1944 if (!more)
1945 rbd_img_request_complete(img_request);
1946}
1947
3da691bf
ID
1948static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1949{
1950 switch (obj_req->type) {
1951 case OBJ_REQUEST_BIO:
1952 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1953 &obj_req->bio_pos,
1954 obj_req->length);
1955 break;
1956 case OBJ_REQUEST_BVECS:
1957 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
1958 obj_req->length);
1959 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1960 &obj_req->bvec_pos);
1961 break;
1962 default:
1963 rbd_assert(0);
1964 }
1965}
1966
1967static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1968{
a162b308 1969 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
3da691bf
ID
1970 if (!obj_req->osd_req)
1971 return -ENOMEM;
1972
1973 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1974 obj_req->offset, obj_req->length, 0, 0);
1975 rbd_osd_req_setup_data(obj_req, 0);
1976
1977 rbd_osd_req_format_read(obj_req);
1978 return 0;
1979}
1980
1981static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1982 unsigned int which)
1983{
1984 struct page **pages;
1985
1986 /*
1987 * The response data for a STAT call consists of:
1988 * le64 length;
1989 * struct {
1990 * le32 tv_sec;
1991 * le32 tv_nsec;
1992 * } mtime;
1993 */
1994 pages = ceph_alloc_page_vector(1, GFP_NOIO);
1995 if (IS_ERR(pages))
1996 return PTR_ERR(pages);
1997
1998 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1999 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
2000 8 + sizeof(struct ceph_timespec),
2001 0, false, true);
2002 return 0;
2003}
2004
2005static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
2006 unsigned int which)
2007{
2008 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2009 u16 opcode;
2010
2011 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
2012 rbd_dev->layout.object_size,
2013 rbd_dev->layout.object_size);
2014
2015 if (rbd_obj_is_entire(obj_req))
2016 opcode = CEPH_OSD_OP_WRITEFULL;
2017 else
2018 opcode = CEPH_OSD_OP_WRITE;
2019
2020 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
2021 obj_req->offset, obj_req->length, 0, 0);
2022 rbd_osd_req_setup_data(obj_req, which++);
2023
2024 rbd_assert(which == obj_req->osd_req->r_num_ops);
2025 rbd_osd_req_format_write(obj_req);
2026}
2027
2028static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
2029{
3da691bf
ID
2030 unsigned int num_osd_ops, which = 0;
2031 int ret;
2032
2033 if (obj_request_overlaps_parent(obj_req)) {
2034 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2035 num_osd_ops = 3; /* stat + setallochint + write/writefull */
2036 } else {
2037 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2038 num_osd_ops = 2; /* setallochint + write/writefull */
2039 }
2040
a162b308 2041 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
2042 if (!obj_req->osd_req)
2043 return -ENOMEM;
2044
2045 if (obj_request_overlaps_parent(obj_req)) {
2046 ret = __rbd_obj_setup_stat(obj_req, which++);
2047 if (ret)
2048 return ret;
2049 }
2050
2051 __rbd_obj_setup_write(obj_req, which);
2052 return 0;
2053}
2054
2055static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
2056 unsigned int which)
2057{
2058 u16 opcode;
2059
2060 if (rbd_obj_is_entire(obj_req)) {
2061 if (obj_request_overlaps_parent(obj_req)) {
2062 opcode = CEPH_OSD_OP_TRUNCATE;
2063 } else {
2064 osd_req_op_init(obj_req->osd_req, which++,
2065 CEPH_OSD_OP_DELETE, 0);
2066 opcode = 0;
2067 }
2068 } else if (rbd_obj_is_tail(obj_req)) {
2069 opcode = CEPH_OSD_OP_TRUNCATE;
2070 } else {
2071 opcode = CEPH_OSD_OP_ZERO;
2072 }
2073
2074 if (opcode)
2075 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
2076 obj_req->offset, obj_req->length,
2077 0, 0);
2078
2079 rbd_assert(which == obj_req->osd_req->r_num_ops);
2080 rbd_osd_req_format_write(obj_req);
2081}
2082
2083static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
2084{
3da691bf
ID
2085 unsigned int num_osd_ops, which = 0;
2086 int ret;
2087
2088 if (rbd_obj_is_entire(obj_req)) {
2089 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2090 num_osd_ops = 1; /* truncate/delete */
2091 } else {
2092 if (obj_request_overlaps_parent(obj_req)) {
2093 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2094 num_osd_ops = 2; /* stat + truncate/zero */
2095 } else {
2096 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2097 num_osd_ops = 1; /* truncate/zero */
2098 }
2099 }
2100
a162b308 2101 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
2102 if (!obj_req->osd_req)
2103 return -ENOMEM;
2104
2105 if (!rbd_obj_is_entire(obj_req) &&
2106 obj_request_overlaps_parent(obj_req)) {
2107 ret = __rbd_obj_setup_stat(obj_req, which++);
2108 if (ret)
2109 return ret;
2110 }
2111
2112 __rbd_obj_setup_discard(obj_req, which);
2113 return 0;
2114}
2115
2116/*
2117 * For each object request in @img_req, allocate an OSD request, add
2118 * individual OSD ops and prepare them for submission. The number of
2119 * OSD ops depends on op_type and the overlap point (if any).
2120 */
2121static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2122{
2123 struct rbd_obj_request *obj_req;
2124 int ret;
2125
2126 for_each_obj_request(img_req, obj_req) {
9bb0248d 2127 switch (img_req->op_type) {
3da691bf
ID
2128 case OBJ_OP_READ:
2129 ret = rbd_obj_setup_read(obj_req);
2130 break;
2131 case OBJ_OP_WRITE:
2132 ret = rbd_obj_setup_write(obj_req);
2133 break;
2134 case OBJ_OP_DISCARD:
2135 ret = rbd_obj_setup_discard(obj_req);
2136 break;
2137 default:
2138 rbd_assert(0);
2139 }
2140 if (ret)
2141 return ret;
2142 }
2143
2144 return 0;
2145}
2146
f1a4739f
AE
2147/*
2148 * Split up an image request into one or more object requests, each
2149 * to a different object. The "type" parameter indicates whether
2150 * "data_desc" is the pointer to the head of a list of bio
2151 * structures, or the base of a page array. In either case this
2152 * function assumes data_desc describes memory sufficient to hold
2153 * all data described by the image request.
2154 */
2155static int rbd_img_request_fill(struct rbd_img_request *img_request,
2156 enum obj_request_type type,
2157 void *data_desc)
bf0d5f50
AE
2158{
2159 struct rbd_device *rbd_dev = img_request->rbd_dev;
2160 struct rbd_obj_request *obj_request = NULL;
2161 struct rbd_obj_request *next_obj_request;
5359a17d 2162 struct ceph_bio_iter bio_it;
7e07efb1 2163 struct ceph_bvec_iter bvec_it;
7da22d29 2164 u64 img_offset;
bf0d5f50 2165 u64 resid;
bf0d5f50 2166
f1a4739f
AE
2167 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2168 (int)type, data_desc);
37206ee5 2169
7da22d29 2170 img_offset = img_request->offset;
bf0d5f50 2171 resid = img_request->length;
4dda41d3 2172 rbd_assert(resid > 0);
f1a4739f
AE
2173
2174 if (type == OBJ_REQUEST_BIO) {
5359a17d 2175 bio_it = *(struct ceph_bio_iter *)data_desc;
4f024f37 2176 rbd_assert(img_offset ==
5359a17d 2177 bio_it.iter.bi_sector << SECTOR_SHIFT);
7e07efb1
ID
2178 } else if (type == OBJ_REQUEST_BVECS) {
2179 bvec_it = *(struct ceph_bvec_iter *)data_desc;
f1a4739f
AE
2180 }
2181
bf0d5f50 2182 while (resid) {
a90bb0c1 2183 u64 object_no = img_offset >> rbd_dev->header.obj_order;
67e2b652
ID
2184 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2185 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2186
6c696d85 2187 obj_request = rbd_obj_request_create(type);
bf0d5f50
AE
2188 if (!obj_request)
2189 goto out_unwind;
62054da6 2190
a90bb0c1 2191 obj_request->object_no = object_no;
67e2b652
ID
2192 obj_request->offset = offset;
2193 obj_request->length = length;
2194
03507db6
JD
2195 /*
2196 * set obj_request->img_request before creating the
2197 * osd_request so that it gets the right snapc
2198 */
2199 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2200
f1a4739f 2201 if (type == OBJ_REQUEST_BIO) {
5359a17d
ID
2202 obj_request->bio_pos = bio_it;
2203 ceph_bio_iter_advance(&bio_it, length);
7e07efb1
ID
2204 } else if (type == OBJ_REQUEST_BVECS) {
2205 obj_request->bvec_pos = bvec_it;
2206 ceph_bvec_iter_shorten(&obj_request->bvec_pos, length);
2207 ceph_bvec_iter_advance(&bvec_it, length);
f1a4739f 2208 }
bf0d5f50 2209
2169238d 2210 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2211 obj_request->img_offset = img_offset;
9d4df01f 2212
7da22d29 2213 img_offset += length;
bf0d5f50
AE
2214 resid -= length;
2215 }
2216
3da691bf 2217 return __rbd_img_fill_request(img_request);
bf0d5f50 2218
bf0d5f50
AE
2219out_unwind:
2220 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2221 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2222
2223 return -ENOMEM;
2224}
2225
efbd1a11 2226static void rbd_img_request_submit(struct rbd_img_request *img_request)
bf0d5f50 2227{
bf0d5f50
AE
2228 struct rbd_obj_request *obj_request;
2229
37206ee5 2230 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 2231
663ae2cc 2232 rbd_img_request_get(img_request);
efbd1a11 2233 for_each_obj_request(img_request, obj_request)
3da691bf 2234 rbd_obj_request_submit(obj_request);
bf0d5f50 2235
663ae2cc 2236 rbd_img_request_put(img_request);
bf0d5f50 2237}
8b3e1a56 2238
3da691bf
ID
2239static void rbd_img_end_child_request(struct rbd_img_request *img_req);
2240
2241static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req,
2242 u64 img_offset, u32 bytes)
2243{
2244 struct rbd_img_request *img_req = obj_req->img_request;
2245 struct rbd_img_request *child_img_req;
2246 int ret;
2247
2248 child_img_req = rbd_parent_request_create(obj_req, img_offset, bytes);
2249 if (!child_img_req)
2250 return -ENOMEM;
2251
2252 child_img_req->callback = rbd_img_end_child_request;
2253
2254 if (!rbd_img_is_write(img_req)) {
2255 switch (obj_req->type) {
2256 case OBJ_REQUEST_BIO:
2257 ret = rbd_img_request_fill(child_img_req,
2258 OBJ_REQUEST_BIO,
2259 &obj_req->bio_pos);
2260 break;
2261 case OBJ_REQUEST_BVECS:
2262 ret = rbd_img_request_fill(child_img_req,
2263 OBJ_REQUEST_BVECS,
2264 &obj_req->bvec_pos);
2265 break;
2266 default:
2267 rbd_assert(0);
2268 }
2269 } else {
2270 struct ceph_bvec_iter it = {
2271 .bvecs = obj_req->copyup_bvecs,
2272 .iter = { .bi_size = bytes },
2273 };
2274
2275 ret = rbd_img_request_fill(child_img_req, OBJ_REQUEST_BVECS,
2276 &it);
2277 }
2278 if (ret) {
2279 rbd_img_request_put(child_img_req);
2280 return ret;
2281 }
2282
2283 rbd_img_request_submit(child_img_req);
2284 return 0;
2285}
2286
2287static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2288{
2289 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2290 int ret;
2291
2292 if (obj_req->result == -ENOENT &&
2293 obj_req->img_offset < rbd_dev->parent_overlap &&
2294 !obj_req->tried_parent) {
2295 u64 obj_overlap = min(obj_req->length,
2296 rbd_dev->parent_overlap - obj_req->img_offset);
2297
2298 obj_req->tried_parent = true;
2299 ret = rbd_obj_read_from_parent(obj_req, obj_req->img_offset,
2300 obj_overlap);
2301 if (ret) {
2302 obj_req->result = ret;
2303 return true;
2304 }
2305 return false;
2306 }
2307
2308 /*
2309 * -ENOENT means a hole in the image -- zero-fill the entire
2310 * length of the request. A short read also implies zero-fill
2311 * to the end of the request. In both cases we update xferred
2312 * count to indicate the whole request was satisfied.
2313 */
2314 if (obj_req->result == -ENOENT ||
2315 (!obj_req->result && obj_req->xferred < obj_req->length)) {
2316 rbd_assert(!obj_req->xferred || !obj_req->result);
2317 rbd_obj_zero_range(obj_req, obj_req->xferred,
2318 obj_req->length - obj_req->xferred);
2319 obj_req->result = 0;
2320 obj_req->xferred = obj_req->length;
2321 }
2322
2323 return true;
2324}
2325
2326/*
2327 * copyup_bvecs pages are never highmem pages
2328 */
2329static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2330{
2331 struct ceph_bvec_iter it = {
2332 .bvecs = bvecs,
2333 .iter = { .bi_size = bytes },
2334 };
2335
2336 ceph_bvec_iter_advance_step(&it, bytes, ({
2337 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2338 bv.bv_len))
2339 return false;
2340 }));
2341 return true;
2342}
2343
2344static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2345{
3da691bf
ID
2346 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2347
2348 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2349 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2350 rbd_osd_req_destroy(obj_req->osd_req);
2351
2352 /*
2353 * Create a copyup request with the same number of OSD ops as
2354 * the original request. The original request was stat + op(s),
2355 * the new copyup request will be copyup + the same op(s).
2356 */
a162b308 2357 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
3da691bf
ID
2358 if (!obj_req->osd_req)
2359 return -ENOMEM;
2360
2361 /*
2362 * Only send non-zero copyup data to save some I/O and network
2363 * bandwidth -- zero copyup data is equivalent to the object not
2364 * existing.
2365 */
2366 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2367 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2368 bytes = 0;
2369 }
2370
2371 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2372 "copyup");
2373 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2374 obj_req->copyup_bvecs, bytes);
2375
9bb0248d 2376 switch (obj_req->img_request->op_type) {
3da691bf
ID
2377 case OBJ_OP_WRITE:
2378 __rbd_obj_setup_write(obj_req, 1);
2379 break;
2380 case OBJ_OP_DISCARD:
2381 rbd_assert(!rbd_obj_is_entire(obj_req));
2382 __rbd_obj_setup_discard(obj_req, 1);
2383 break;
2384 default:
2385 rbd_assert(0);
2386 }
2387
2388 rbd_obj_request_submit(obj_req);
2389 /* FIXME: in lieu of rbd_img_obj_callback() */
2390 rbd_img_request_put(obj_req->img_request);
2391 return 0;
2392}
2393
7e07efb1
ID
2394static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2395{
2396 u32 i;
2397
2398 rbd_assert(!obj_req->copyup_bvecs);
2399 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2400 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2401 sizeof(*obj_req->copyup_bvecs),
2402 GFP_NOIO);
2403 if (!obj_req->copyup_bvecs)
2404 return -ENOMEM;
2405
2406 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2407 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2408
2409 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2410 if (!obj_req->copyup_bvecs[i].bv_page)
2411 return -ENOMEM;
2412
2413 obj_req->copyup_bvecs[i].bv_offset = 0;
2414 obj_req->copyup_bvecs[i].bv_len = len;
2415 obj_overlap -= len;
2416 }
2417
2418 rbd_assert(!obj_overlap);
2419 return 0;
2420}
2421
3da691bf
ID
2422static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2423{
2424 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2425 u64 img_offset;
2426 u64 obj_overlap;
2427 int ret;
2428
2429 if (!obj_request_overlaps_parent(obj_req)) {
2430 /*
2431 * The overlap has become 0 (most likely because the
2432 * image has been flattened). Use rbd_obj_issue_copyup()
2433 * to re-submit the original write request -- the copyup
2434 * operation itself will be a no-op, since someone must
2435 * have populated the child object while we weren't
2436 * looking. Move to WRITE_FLAT state as we'll be done
2437 * with the operation once the null copyup completes.
2438 */
2439 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2440 return rbd_obj_issue_copyup(obj_req, 0);
2441 }
2442
2443 /*
2444 * Determine the byte range covered by the object in the
2445 * child image to which the original request was to be sent.
2446 */
2447 img_offset = obj_req->img_offset - obj_req->offset;
2448 obj_overlap = rbd_dev->layout.object_size;
2449
2450 /*
2451 * There is no defined parent data beyond the parent
2452 * overlap, so limit what we read at that boundary if
2453 * necessary.
2454 */
2455 if (img_offset + obj_overlap > rbd_dev->parent_overlap) {
2456 rbd_assert(img_offset < rbd_dev->parent_overlap);
2457 obj_overlap = rbd_dev->parent_overlap - img_offset;
2458 }
2459
2460 ret = setup_copyup_bvecs(obj_req, obj_overlap);
2461 if (ret)
2462 return ret;
2463
2464 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
2465 return rbd_obj_read_from_parent(obj_req, img_offset, obj_overlap);
2466}
2467
2468static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2469{
2470 int ret;
2471
2472again:
2473 switch (obj_req->write_state) {
2474 case RBD_OBJ_WRITE_GUARD:
2475 rbd_assert(!obj_req->xferred);
2476 if (obj_req->result == -ENOENT) {
2477 /*
2478 * The target object doesn't exist. Read the data for
2479 * the entire target object up to the overlap point (if
2480 * any) from the parent, so we can use it for a copyup.
2481 */
2482 ret = rbd_obj_handle_write_guard(obj_req);
2483 if (ret) {
2484 obj_req->result = ret;
2485 return true;
2486 }
2487 return false;
2488 }
2489 /* fall through */
2490 case RBD_OBJ_WRITE_FLAT:
2491 if (!obj_req->result)
2492 /*
2493 * There is no such thing as a successful short
2494 * write -- indicate the whole request was satisfied.
2495 */
2496 obj_req->xferred = obj_req->length;
2497 return true;
2498 case RBD_OBJ_WRITE_COPYUP:
2499 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2500 if (obj_req->result)
2501 goto again;
2502
2503 rbd_assert(obj_req->xferred);
2504 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2505 if (ret) {
2506 obj_req->result = ret;
2507 return true;
2508 }
2509 return false;
2510 default:
2511 rbd_assert(0);
2512 }
2513}
2514
2515/*
2516 * Returns true if @obj_req is completed, or false otherwise.
2517 */
2518static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2519{
9bb0248d 2520 switch (obj_req->img_request->op_type) {
3da691bf
ID
2521 case OBJ_OP_READ:
2522 return rbd_obj_handle_read(obj_req);
2523 case OBJ_OP_WRITE:
2524 return rbd_obj_handle_write(obj_req);
2525 case OBJ_OP_DISCARD:
2526 if (rbd_obj_handle_write(obj_req)) {
2527 /*
2528 * Hide -ENOENT from delete/truncate/zero -- discarding
2529 * a non-existent object is not a problem.
2530 */
2531 if (obj_req->result == -ENOENT) {
2532 obj_req->result = 0;
2533 obj_req->xferred = obj_req->length;
2534 }
2535 return true;
2536 }
2537 return false;
2538 default:
2539 rbd_assert(0);
2540 }
2541}
2542
2543static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2544{
2545 struct rbd_obj_request *obj_req = img_req->obj_request;
2546
2547 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2548
2549 obj_req->result = img_req->result;
2550 obj_req->xferred = img_req->xferred;
2551 rbd_img_request_put(img_req);
2552
2553 rbd_obj_handle_request(obj_req);
2554}
2555
2556static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2557{
2558 if (!__rbd_obj_handle_request(obj_req))
2559 return;
2560
2561 obj_request_done_set(obj_req);
2562 rbd_obj_request_complete(obj_req);
2563}
2564
ed95b21a 2565static const struct rbd_client_id rbd_empty_cid;
b8d70035 2566
ed95b21a
ID
2567static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2568 const struct rbd_client_id *rhs)
2569{
2570 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2571}
2572
2573static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2574{
2575 struct rbd_client_id cid;
2576
2577 mutex_lock(&rbd_dev->watch_mutex);
2578 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2579 cid.handle = rbd_dev->watch_cookie;
2580 mutex_unlock(&rbd_dev->watch_mutex);
2581 return cid;
2582}
2583
2584/*
2585 * lock_rwsem must be held for write
2586 */
2587static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2588 const struct rbd_client_id *cid)
2589{
2590 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2591 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2592 cid->gid, cid->handle);
2593 rbd_dev->owner_cid = *cid; /* struct */
2594}
2595
2596static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2597{
2598 mutex_lock(&rbd_dev->watch_mutex);
2599 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2600 mutex_unlock(&rbd_dev->watch_mutex);
2601}
2602
edd8ca80
FM
2603static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2604{
2605 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2606
2607 strcpy(rbd_dev->lock_cookie, cookie);
2608 rbd_set_owner_cid(rbd_dev, &cid);
2609 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2610}
2611
ed95b21a
ID
2612/*
2613 * lock_rwsem must be held for write
2614 */
2615static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 2616{
922dab61 2617 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 2618 char cookie[32];
e627db08 2619 int ret;
b8d70035 2620
cbbfb0ff
ID
2621 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2622 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 2623
ed95b21a
ID
2624 format_lock_cookie(rbd_dev, cookie);
2625 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2626 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2627 RBD_LOCK_TAG, "", 0);
e627db08 2628 if (ret)
ed95b21a 2629 return ret;
b8d70035 2630
ed95b21a 2631 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
edd8ca80 2632 __rbd_lock(rbd_dev, cookie);
ed95b21a 2633 return 0;
b8d70035
AE
2634}
2635
ed95b21a
ID
2636/*
2637 * lock_rwsem must be held for write
2638 */
bbead745 2639static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 2640{
922dab61 2641 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
2642 int ret;
2643
cbbfb0ff
ID
2644 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2645 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 2646
ed95b21a 2647 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 2648 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
2649 if (ret && ret != -ENOENT)
2650 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 2651
bbead745
ID
2652 /* treat errors as the image is unlocked */
2653 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 2654 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
2655 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2656 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
2657}
2658
ed95b21a
ID
2659static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2660 enum rbd_notify_op notify_op,
2661 struct page ***preply_pages,
2662 size_t *preply_len)
9969ebc5
AE
2663{
2664 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
2665 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2666 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
2667 char buf[buf_size];
2668 void *p = buf;
9969ebc5 2669
ed95b21a 2670 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 2671
ed95b21a
ID
2672 /* encode *LockPayload NotifyMessage (op + ClientId) */
2673 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2674 ceph_encode_32(&p, notify_op);
2675 ceph_encode_64(&p, cid.gid);
2676 ceph_encode_64(&p, cid.handle);
8eb87565 2677
ed95b21a
ID
2678 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2679 &rbd_dev->header_oloc, buf, buf_size,
2680 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
2681}
2682
ed95b21a
ID
2683static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2684 enum rbd_notify_op notify_op)
b30a01f2 2685{
ed95b21a
ID
2686 struct page **reply_pages;
2687 size_t reply_len;
b30a01f2 2688
ed95b21a
ID
2689 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2690 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2691}
b30a01f2 2692
ed95b21a
ID
2693static void rbd_notify_acquired_lock(struct work_struct *work)
2694{
2695 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2696 acquired_lock_work);
76756a51 2697
ed95b21a 2698 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
2699}
2700
ed95b21a 2701static void rbd_notify_released_lock(struct work_struct *work)
c525f036 2702{
ed95b21a
ID
2703 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2704 released_lock_work);
811c6688 2705
ed95b21a 2706 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
2707}
2708
ed95b21a 2709static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 2710{
ed95b21a
ID
2711 struct page **reply_pages;
2712 size_t reply_len;
2713 bool lock_owner_responded = false;
36be9a76
AE
2714 int ret;
2715
ed95b21a 2716 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 2717
ed95b21a
ID
2718 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2719 &reply_pages, &reply_len);
2720 if (ret && ret != -ETIMEDOUT) {
2721 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 2722 goto out;
ed95b21a 2723 }
36be9a76 2724
ed95b21a
ID
2725 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2726 void *p = page_address(reply_pages[0]);
2727 void *const end = p + reply_len;
2728 u32 n;
36be9a76 2729
ed95b21a
ID
2730 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2731 while (n--) {
2732 u8 struct_v;
2733 u32 len;
36be9a76 2734
ed95b21a
ID
2735 ceph_decode_need(&p, end, 8 + 8, e_inval);
2736 p += 8 + 8; /* skip gid and cookie */
04017e29 2737
ed95b21a
ID
2738 ceph_decode_32_safe(&p, end, len, e_inval);
2739 if (!len)
2740 continue;
2741
2742 if (lock_owner_responded) {
2743 rbd_warn(rbd_dev,
2744 "duplicate lock owners detected");
2745 ret = -EIO;
2746 goto out;
2747 }
2748
2749 lock_owner_responded = true;
2750 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2751 &struct_v, &len);
2752 if (ret) {
2753 rbd_warn(rbd_dev,
2754 "failed to decode ResponseMessage: %d",
2755 ret);
2756 goto e_inval;
2757 }
2758
2759 ret = ceph_decode_32(&p);
2760 }
2761 }
2762
2763 if (!lock_owner_responded) {
2764 rbd_warn(rbd_dev, "no lock owners detected");
2765 ret = -ETIMEDOUT;
2766 }
2767
2768out:
2769 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2770 return ret;
2771
2772e_inval:
2773 ret = -EINVAL;
2774 goto out;
2775}
2776
2777static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2778{
2779 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2780
2781 cancel_delayed_work(&rbd_dev->lock_dwork);
2782 if (wake_all)
2783 wake_up_all(&rbd_dev->lock_waitq);
2784 else
2785 wake_up(&rbd_dev->lock_waitq);
2786}
2787
2788static int get_lock_owner_info(struct rbd_device *rbd_dev,
2789 struct ceph_locker **lockers, u32 *num_lockers)
2790{
2791 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2792 u8 lock_type;
2793 char *lock_tag;
2794 int ret;
2795
2796 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2797
2798 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2799 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2800 &lock_type, &lock_tag, lockers, num_lockers);
2801 if (ret)
2802 return ret;
2803
2804 if (*num_lockers == 0) {
2805 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2806 goto out;
2807 }
2808
2809 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2810 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2811 lock_tag);
2812 ret = -EBUSY;
2813 goto out;
2814 }
2815
2816 if (lock_type == CEPH_CLS_LOCK_SHARED) {
2817 rbd_warn(rbd_dev, "shared lock type detected");
2818 ret = -EBUSY;
2819 goto out;
2820 }
2821
2822 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2823 strlen(RBD_LOCK_COOKIE_PREFIX))) {
2824 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2825 (*lockers)[0].id.cookie);
2826 ret = -EBUSY;
2827 goto out;
2828 }
2829
2830out:
2831 kfree(lock_tag);
2832 return ret;
2833}
2834
2835static int find_watcher(struct rbd_device *rbd_dev,
2836 const struct ceph_locker *locker)
2837{
2838 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2839 struct ceph_watch_item *watchers;
2840 u32 num_watchers;
2841 u64 cookie;
2842 int i;
2843 int ret;
2844
2845 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2846 &rbd_dev->header_oloc, &watchers,
2847 &num_watchers);
2848 if (ret)
2849 return ret;
2850
2851 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2852 for (i = 0; i < num_watchers; i++) {
2853 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2854 sizeof(locker->info.addr)) &&
2855 watchers[i].cookie == cookie) {
2856 struct rbd_client_id cid = {
2857 .gid = le64_to_cpu(watchers[i].name.num),
2858 .handle = cookie,
2859 };
2860
2861 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2862 rbd_dev, cid.gid, cid.handle);
2863 rbd_set_owner_cid(rbd_dev, &cid);
2864 ret = 1;
2865 goto out;
2866 }
2867 }
2868
2869 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2870 ret = 0;
2871out:
2872 kfree(watchers);
2873 return ret;
2874}
2875
2876/*
2877 * lock_rwsem must be held for write
2878 */
2879static int rbd_try_lock(struct rbd_device *rbd_dev)
2880{
2881 struct ceph_client *client = rbd_dev->rbd_client->client;
2882 struct ceph_locker *lockers;
2883 u32 num_lockers;
2884 int ret;
2885
2886 for (;;) {
2887 ret = rbd_lock(rbd_dev);
2888 if (ret != -EBUSY)
2889 return ret;
2890
2891 /* determine if the current lock holder is still alive */
2892 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2893 if (ret)
2894 return ret;
2895
2896 if (num_lockers == 0)
2897 goto again;
2898
2899 ret = find_watcher(rbd_dev, lockers);
2900 if (ret) {
2901 if (ret > 0)
2902 ret = 0; /* have to request lock */
2903 goto out;
2904 }
2905
2906 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2907 ENTITY_NAME(lockers[0].id.name));
2908
2909 ret = ceph_monc_blacklist_add(&client->monc,
2910 &lockers[0].info.addr);
2911 if (ret) {
2912 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2913 ENTITY_NAME(lockers[0].id.name), ret);
2914 goto out;
2915 }
2916
2917 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2918 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2919 lockers[0].id.cookie,
2920 &lockers[0].id.name);
2921 if (ret && ret != -ENOENT)
2922 goto out;
2923
2924again:
2925 ceph_free_lockers(lockers, num_lockers);
2926 }
2927
2928out:
2929 ceph_free_lockers(lockers, num_lockers);
2930 return ret;
2931}
2932
2933/*
2934 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2935 */
2936static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2937 int *pret)
2938{
2939 enum rbd_lock_state lock_state;
2940
2941 down_read(&rbd_dev->lock_rwsem);
2942 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
2943 rbd_dev->lock_state);
2944 if (__rbd_is_lock_owner(rbd_dev)) {
2945 lock_state = rbd_dev->lock_state;
2946 up_read(&rbd_dev->lock_rwsem);
2947 return lock_state;
2948 }
2949
2950 up_read(&rbd_dev->lock_rwsem);
2951 down_write(&rbd_dev->lock_rwsem);
2952 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
2953 rbd_dev->lock_state);
2954 if (!__rbd_is_lock_owner(rbd_dev)) {
2955 *pret = rbd_try_lock(rbd_dev);
2956 if (*pret)
2957 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
2958 }
2959
2960 lock_state = rbd_dev->lock_state;
2961 up_write(&rbd_dev->lock_rwsem);
2962 return lock_state;
2963}
2964
2965static void rbd_acquire_lock(struct work_struct *work)
2966{
2967 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
2968 struct rbd_device, lock_dwork);
2969 enum rbd_lock_state lock_state;
37f13252 2970 int ret = 0;
ed95b21a
ID
2971
2972 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2973again:
2974 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
2975 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
2976 if (lock_state == RBD_LOCK_STATE_LOCKED)
2977 wake_requests(rbd_dev, true);
2978 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
2979 rbd_dev, lock_state, ret);
2980 return;
2981 }
2982
2983 ret = rbd_request_lock(rbd_dev);
2984 if (ret == -ETIMEDOUT) {
2985 goto again; /* treat this as a dead client */
e010dd0a
ID
2986 } else if (ret == -EROFS) {
2987 rbd_warn(rbd_dev, "peer will not release lock");
2988 /*
2989 * If this is rbd_add_acquire_lock(), we want to fail
2990 * immediately -- reuse BLACKLISTED flag. Otherwise we
2991 * want to block.
2992 */
2993 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
2994 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
2995 /* wake "rbd map --exclusive" process */
2996 wake_requests(rbd_dev, false);
2997 }
ed95b21a
ID
2998 } else if (ret < 0) {
2999 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3000 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3001 RBD_RETRY_DELAY);
3002 } else {
3003 /*
3004 * lock owner acked, but resend if we don't see them
3005 * release the lock
3006 */
3007 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3008 rbd_dev);
3009 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3010 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3011 }
3012}
3013
3014/*
3015 * lock_rwsem must be held for write
3016 */
3017static bool rbd_release_lock(struct rbd_device *rbd_dev)
3018{
3019 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3020 rbd_dev->lock_state);
3021 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3022 return false;
3023
3024 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3025 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3026 /*
ed95b21a 3027 * Ensure that all in-flight IO is flushed.
52bb1f9b 3028 *
ed95b21a
ID
3029 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3030 * may be shared with other devices.
52bb1f9b 3031 */
ed95b21a
ID
3032 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3033 up_read(&rbd_dev->lock_rwsem);
3034
3035 down_write(&rbd_dev->lock_rwsem);
3036 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3037 rbd_dev->lock_state);
3038 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3039 return false;
3040
bbead745
ID
3041 rbd_unlock(rbd_dev);
3042 /*
3043 * Give others a chance to grab the lock - we would re-acquire
3044 * almost immediately if we got new IO during ceph_osdc_sync()
3045 * otherwise. We need to ack our own notifications, so this
3046 * lock_dwork will be requeued from rbd_wait_state_locked()
3047 * after wake_requests() in rbd_handle_released_lock().
3048 */
3049 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3050 return true;
3051}
3052
3053static void rbd_release_lock_work(struct work_struct *work)
3054{
3055 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3056 unlock_work);
3057
3058 down_write(&rbd_dev->lock_rwsem);
3059 rbd_release_lock(rbd_dev);
3060 up_write(&rbd_dev->lock_rwsem);
3061}
3062
3063static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3064 void **p)
3065{
3066 struct rbd_client_id cid = { 0 };
3067
3068 if (struct_v >= 2) {
3069 cid.gid = ceph_decode_64(p);
3070 cid.handle = ceph_decode_64(p);
3071 }
3072
3073 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3074 cid.handle);
3075 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3076 down_write(&rbd_dev->lock_rwsem);
3077 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3078 /*
3079 * we already know that the remote client is
3080 * the owner
3081 */
3082 up_write(&rbd_dev->lock_rwsem);
3083 return;
3084 }
3085
3086 rbd_set_owner_cid(rbd_dev, &cid);
3087 downgrade_write(&rbd_dev->lock_rwsem);
3088 } else {
3089 down_read(&rbd_dev->lock_rwsem);
3090 }
3091
3092 if (!__rbd_is_lock_owner(rbd_dev))
3093 wake_requests(rbd_dev, false);
3094 up_read(&rbd_dev->lock_rwsem);
3095}
3096
3097static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3098 void **p)
3099{
3100 struct rbd_client_id cid = { 0 };
3101
3102 if (struct_v >= 2) {
3103 cid.gid = ceph_decode_64(p);
3104 cid.handle = ceph_decode_64(p);
3105 }
3106
3107 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3108 cid.handle);
3109 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3110 down_write(&rbd_dev->lock_rwsem);
3111 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3112 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3113 __func__, rbd_dev, cid.gid, cid.handle,
3114 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3115 up_write(&rbd_dev->lock_rwsem);
3116 return;
3117 }
3118
3119 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3120 downgrade_write(&rbd_dev->lock_rwsem);
3121 } else {
3122 down_read(&rbd_dev->lock_rwsem);
3123 }
3124
3125 if (!__rbd_is_lock_owner(rbd_dev))
3126 wake_requests(rbd_dev, false);
3127 up_read(&rbd_dev->lock_rwsem);
3128}
3129
3b77faa0
ID
3130/*
3131 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3132 * ResponseMessage is needed.
3133 */
3134static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3135 void **p)
ed95b21a
ID
3136{
3137 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3138 struct rbd_client_id cid = { 0 };
3b77faa0 3139 int result = 1;
ed95b21a
ID
3140
3141 if (struct_v >= 2) {
3142 cid.gid = ceph_decode_64(p);
3143 cid.handle = ceph_decode_64(p);
3144 }
3145
3146 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3147 cid.handle);
3148 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3149 return result;
ed95b21a
ID
3150
3151 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3152 if (__rbd_is_lock_owner(rbd_dev)) {
3153 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3154 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3155 goto out_unlock;
3156
3157 /*
3158 * encode ResponseMessage(0) so the peer can detect
3159 * a missing owner
3160 */
3161 result = 0;
3162
3163 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3164 if (!rbd_dev->opts->exclusive) {
3165 dout("%s rbd_dev %p queueing unlock_work\n",
3166 __func__, rbd_dev);
3167 queue_work(rbd_dev->task_wq,
3168 &rbd_dev->unlock_work);
3169 } else {
3170 /* refuse to release the lock */
3171 result = -EROFS;
3172 }
ed95b21a
ID
3173 }
3174 }
3b77faa0
ID
3175
3176out_unlock:
ed95b21a 3177 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3178 return result;
ed95b21a
ID
3179}
3180
3181static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3182 u64 notify_id, u64 cookie, s32 *result)
3183{
3184 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3185 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3186 char buf[buf_size];
3187 int ret;
3188
3189 if (result) {
3190 void *p = buf;
3191
3192 /* encode ResponseMessage */
3193 ceph_start_encoding(&p, 1, 1,
3194 buf_size - CEPH_ENCODING_START_BLK_LEN);
3195 ceph_encode_32(&p, *result);
3196 } else {
3197 buf_size = 0;
3198 }
b8d70035 3199
922dab61
ID
3200 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3201 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3202 buf, buf_size);
52bb1f9b 3203 if (ret)
ed95b21a
ID
3204 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3205}
3206
3207static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3208 u64 cookie)
3209{
3210 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3211 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3212}
3213
3214static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3215 u64 notify_id, u64 cookie, s32 result)
3216{
3217 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3218 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3219}
3220
3221static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3222 u64 notifier_id, void *data, size_t data_len)
3223{
3224 struct rbd_device *rbd_dev = arg;
3225 void *p = data;
3226 void *const end = p + data_len;
d4c2269b 3227 u8 struct_v = 0;
ed95b21a
ID
3228 u32 len;
3229 u32 notify_op;
3230 int ret;
3231
3232 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3233 __func__, rbd_dev, cookie, notify_id, data_len);
3234 if (data_len) {
3235 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3236 &struct_v, &len);
3237 if (ret) {
3238 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3239 ret);
3240 return;
3241 }
3242
3243 notify_op = ceph_decode_32(&p);
3244 } else {
3245 /* legacy notification for header updates */
3246 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3247 len = 0;
3248 }
3249
3250 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3251 switch (notify_op) {
3252 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3253 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3254 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3255 break;
3256 case RBD_NOTIFY_OP_RELEASED_LOCK:
3257 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3258 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3259 break;
3260 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3261 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3262 if (ret <= 0)
ed95b21a 3263 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3264 cookie, ret);
ed95b21a
ID
3265 else
3266 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3267 break;
3268 case RBD_NOTIFY_OP_HEADER_UPDATE:
3269 ret = rbd_dev_refresh(rbd_dev);
3270 if (ret)
3271 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3272
3273 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3274 break;
3275 default:
3276 if (rbd_is_lock_owner(rbd_dev))
3277 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3278 cookie, -EOPNOTSUPP);
3279 else
3280 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3281 break;
3282 }
b8d70035
AE
3283}
3284
99d16943
ID
3285static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3286
922dab61 3287static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3288{
922dab61 3289 struct rbd_device *rbd_dev = arg;
bb040aa0 3290
922dab61 3291 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3292
ed95b21a
ID
3293 down_write(&rbd_dev->lock_rwsem);
3294 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3295 up_write(&rbd_dev->lock_rwsem);
3296
99d16943
ID
3297 mutex_lock(&rbd_dev->watch_mutex);
3298 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3299 __rbd_unregister_watch(rbd_dev);
3300 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3301
99d16943 3302 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3303 }
99d16943 3304 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3305}
3306
9969ebc5 3307/*
99d16943 3308 * watch_mutex must be locked
9969ebc5 3309 */
99d16943 3310static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3311{
3312 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3313 struct ceph_osd_linger_request *handle;
9969ebc5 3314
922dab61 3315 rbd_assert(!rbd_dev->watch_handle);
99d16943 3316 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3317
922dab61
ID
3318 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3319 &rbd_dev->header_oloc, rbd_watch_cb,
3320 rbd_watch_errcb, rbd_dev);
3321 if (IS_ERR(handle))
3322 return PTR_ERR(handle);
8eb87565 3323
922dab61 3324 rbd_dev->watch_handle = handle;
b30a01f2 3325 return 0;
b30a01f2
ID
3326}
3327
99d16943
ID
3328/*
3329 * watch_mutex must be locked
3330 */
3331static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3332{
922dab61
ID
3333 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3334 int ret;
b30a01f2 3335
99d16943
ID
3336 rbd_assert(rbd_dev->watch_handle);
3337 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3338
922dab61
ID
3339 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3340 if (ret)
3341 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3342
922dab61 3343 rbd_dev->watch_handle = NULL;
c525f036
ID
3344}
3345
99d16943
ID
3346static int rbd_register_watch(struct rbd_device *rbd_dev)
3347{
3348 int ret;
3349
3350 mutex_lock(&rbd_dev->watch_mutex);
3351 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3352 ret = __rbd_register_watch(rbd_dev);
3353 if (ret)
3354 goto out;
3355
3356 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3357 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3358
3359out:
3360 mutex_unlock(&rbd_dev->watch_mutex);
3361 return ret;
3362}
3363
3364static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3365{
99d16943
ID
3366 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3367
3368 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3369 cancel_work_sync(&rbd_dev->acquired_lock_work);
3370 cancel_work_sync(&rbd_dev->released_lock_work);
3371 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3372 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3373}
3374
3375static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3376{
ed95b21a 3377 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3378 cancel_tasks_sync(rbd_dev);
3379
3380 mutex_lock(&rbd_dev->watch_mutex);
3381 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3382 __rbd_unregister_watch(rbd_dev);
3383 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3384 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3385
811c6688 3386 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3387}
3388
14bb211d
ID
3389/*
3390 * lock_rwsem must be held for write
3391 */
3392static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3393{
3394 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3395 char cookie[32];
3396 int ret;
3397
3398 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3399
3400 format_lock_cookie(rbd_dev, cookie);
3401 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3402 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3403 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3404 RBD_LOCK_TAG, cookie);
3405 if (ret) {
3406 if (ret != -EOPNOTSUPP)
3407 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3408 ret);
3409
3410 /*
3411 * Lock cookie cannot be updated on older OSDs, so do
3412 * a manual release and queue an acquire.
3413 */
3414 if (rbd_release_lock(rbd_dev))
3415 queue_delayed_work(rbd_dev->task_wq,
3416 &rbd_dev->lock_dwork, 0);
3417 } else {
edd8ca80 3418 __rbd_lock(rbd_dev, cookie);
14bb211d
ID
3419 }
3420}
3421
99d16943
ID
3422static void rbd_reregister_watch(struct work_struct *work)
3423{
3424 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3425 struct rbd_device, watch_dwork);
3426 int ret;
3427
3428 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3429
3430 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3431 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3432 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3433 return;
87c0fded 3434 }
99d16943
ID
3435
3436 ret = __rbd_register_watch(rbd_dev);
3437 if (ret) {
3438 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3439 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3440 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3441 wake_requests(rbd_dev, true);
87c0fded 3442 } else {
99d16943
ID
3443 queue_delayed_work(rbd_dev->task_wq,
3444 &rbd_dev->watch_dwork,
3445 RBD_RETRY_DELAY);
87c0fded
ID
3446 }
3447 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3448 return;
99d16943
ID
3449 }
3450
3451 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3452 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3453 mutex_unlock(&rbd_dev->watch_mutex);
3454
14bb211d
ID
3455 down_write(&rbd_dev->lock_rwsem);
3456 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3457 rbd_reacquire_lock(rbd_dev);
3458 up_write(&rbd_dev->lock_rwsem);
3459
99d16943
ID
3460 ret = rbd_dev_refresh(rbd_dev);
3461 if (ret)
3462 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
99d16943
ID
3463}
3464
36be9a76 3465/*
f40eb349
AE
3466 * Synchronous osd object method call. Returns the number of bytes
3467 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3468 */
3469static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3470 struct ceph_object_id *oid,
3471 struct ceph_object_locator *oloc,
36be9a76 3472 const char *method_name,
4157976b 3473 const void *outbound,
36be9a76 3474 size_t outbound_size,
4157976b 3475 void *inbound,
e2a58ee5 3476 size_t inbound_size)
36be9a76 3477{
ecd4a68a
ID
3478 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3479 struct page *req_page = NULL;
3480 struct page *reply_page;
36be9a76
AE
3481 int ret;
3482
3483 /*
6010a451
AE
3484 * Method calls are ultimately read operations. The result
3485 * should placed into the inbound buffer provided. They
3486 * also supply outbound data--parameters for the object
3487 * method. Currently if this is present it will be a
3488 * snapshot id.
36be9a76 3489 */
ecd4a68a
ID
3490 if (outbound) {
3491 if (outbound_size > PAGE_SIZE)
3492 return -E2BIG;
36be9a76 3493
ecd4a68a
ID
3494 req_page = alloc_page(GFP_KERNEL);
3495 if (!req_page)
3496 return -ENOMEM;
04017e29 3497
ecd4a68a 3498 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3499 }
36be9a76 3500
ecd4a68a
ID
3501 reply_page = alloc_page(GFP_KERNEL);
3502 if (!reply_page) {
3503 if (req_page)
3504 __free_page(req_page);
3505 return -ENOMEM;
3506 }
57385b51 3507
ecd4a68a
ID
3508 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3509 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3510 reply_page, &inbound_size);
3511 if (!ret) {
3512 memcpy(inbound, page_address(reply_page), inbound_size);
3513 ret = inbound_size;
3514 }
36be9a76 3515
ecd4a68a
ID
3516 if (req_page)
3517 __free_page(req_page);
3518 __free_page(reply_page);
36be9a76
AE
3519 return ret;
3520}
3521
ed95b21a
ID
3522/*
3523 * lock_rwsem must be held for read
3524 */
3525static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3526{
3527 DEFINE_WAIT(wait);
3528
3529 do {
3530 /*
3531 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3532 * and cancel_delayed_work() in wake_requests().
3533 */
3534 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3535 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3536 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3537 TASK_UNINTERRUPTIBLE);
3538 up_read(&rbd_dev->lock_rwsem);
3539 schedule();
3540 down_read(&rbd_dev->lock_rwsem);
87c0fded
ID
3541 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3542 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3543
ed95b21a
ID
3544 finish_wait(&rbd_dev->lock_waitq, &wait);
3545}
3546
7ad18afa 3547static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 3548{
7ad18afa
CH
3549 struct request *rq = blk_mq_rq_from_pdu(work);
3550 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 3551 struct rbd_img_request *img_request;
4e752f0a 3552 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
3553 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3554 u64 length = blk_rq_bytes(rq);
6d2940c8 3555 enum obj_operation_type op_type;
4e752f0a 3556 u64 mapping_size;
80de1912 3557 bool must_be_locked;
bf0d5f50
AE
3558 int result;
3559
aebf526b
CH
3560 switch (req_op(rq)) {
3561 case REQ_OP_DISCARD:
6ac56951 3562 case REQ_OP_WRITE_ZEROES:
90e98c52 3563 op_type = OBJ_OP_DISCARD;
aebf526b
CH
3564 break;
3565 case REQ_OP_WRITE:
6d2940c8 3566 op_type = OBJ_OP_WRITE;
aebf526b
CH
3567 break;
3568 case REQ_OP_READ:
6d2940c8 3569 op_type = OBJ_OP_READ;
aebf526b
CH
3570 break;
3571 default:
3572 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3573 result = -EIO;
3574 goto err;
3575 }
6d2940c8 3576
bc1ecc65 3577 /* Ignore/skip any zero-length requests */
bf0d5f50 3578
bc1ecc65
ID
3579 if (!length) {
3580 dout("%s: zero-length request\n", __func__);
3581 result = 0;
3582 goto err_rq;
3583 }
bf0d5f50 3584
9568c93e
ID
3585 rbd_assert(op_type == OBJ_OP_READ ||
3586 rbd_dev->spec->snap_id == CEPH_NOSNAP);
4dda41d3 3587
bc1ecc65
ID
3588 /*
3589 * Quit early if the mapped snapshot no longer exists. It's
3590 * still possible the snapshot will have disappeared by the
3591 * time our request arrives at the osd, but there's no sense in
3592 * sending it if we already know.
3593 */
3594 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3595 dout("request for non-existent snapshot");
3596 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3597 result = -ENXIO;
3598 goto err_rq;
3599 }
4dda41d3 3600
bc1ecc65
ID
3601 if (offset && length > U64_MAX - offset + 1) {
3602 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3603 length);
3604 result = -EINVAL;
3605 goto err_rq; /* Shouldn't happen */
3606 }
4dda41d3 3607
7ad18afa
CH
3608 blk_mq_start_request(rq);
3609
4e752f0a
JD
3610 down_read(&rbd_dev->header_rwsem);
3611 mapping_size = rbd_dev->mapping.size;
6d2940c8 3612 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
3613 snapc = rbd_dev->header.snapc;
3614 ceph_get_snap_context(snapc);
3615 }
3616 up_read(&rbd_dev->header_rwsem);
3617
3618 if (offset + length > mapping_size) {
bc1ecc65 3619 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 3620 length, mapping_size);
bc1ecc65
ID
3621 result = -EIO;
3622 goto err_rq;
3623 }
bf0d5f50 3624
f9bebd58
ID
3625 must_be_locked =
3626 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3627 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
3628 if (must_be_locked) {
3629 down_read(&rbd_dev->lock_rwsem);
87c0fded 3630 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
e010dd0a
ID
3631 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3632 if (rbd_dev->opts->exclusive) {
3633 rbd_warn(rbd_dev, "exclusive lock required");
3634 result = -EROFS;
3635 goto err_unlock;
3636 }
ed95b21a 3637 rbd_wait_state_locked(rbd_dev);
e010dd0a 3638 }
87c0fded
ID
3639 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3640 result = -EBLACKLISTED;
3641 goto err_unlock;
3642 }
ed95b21a
ID
3643 }
3644
6d2940c8 3645 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 3646 snapc);
bc1ecc65
ID
3647 if (!img_request) {
3648 result = -ENOMEM;
ed95b21a 3649 goto err_unlock;
bc1ecc65
ID
3650 }
3651 img_request->rq = rq;
70b16db8 3652 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 3653
90e98c52
GZ
3654 if (op_type == OBJ_OP_DISCARD)
3655 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
3656 NULL);
5359a17d
ID
3657 else {
3658 struct ceph_bio_iter bio_it = { .bio = rq->bio,
3659 .iter = rq->bio->bi_iter };
3660
90e98c52 3661 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
5359a17d
ID
3662 &bio_it);
3663 }
bc1ecc65
ID
3664 if (result)
3665 goto err_img_request;
bf0d5f50 3666
efbd1a11 3667 rbd_img_request_submit(img_request);
ed95b21a
ID
3668 if (must_be_locked)
3669 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 3670 return;
bf0d5f50 3671
bc1ecc65
ID
3672err_img_request:
3673 rbd_img_request_put(img_request);
ed95b21a
ID
3674err_unlock:
3675 if (must_be_locked)
3676 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
3677err_rq:
3678 if (result)
3679 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 3680 obj_op_name(op_type), length, offset, result);
e96a650a 3681 ceph_put_snap_context(snapc);
7ad18afa 3682err:
2a842aca 3683 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 3684}
bf0d5f50 3685
fc17b653 3686static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 3687 const struct blk_mq_queue_data *bd)
bc1ecc65 3688{
7ad18afa
CH
3689 struct request *rq = bd->rq;
3690 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 3691
7ad18afa 3692 queue_work(rbd_wq, work);
fc17b653 3693 return BLK_STS_OK;
bf0d5f50
AE
3694}
3695
602adf40
YS
3696static void rbd_free_disk(struct rbd_device *rbd_dev)
3697{
5769ed0c
ID
3698 blk_cleanup_queue(rbd_dev->disk->queue);
3699 blk_mq_free_tag_set(&rbd_dev->tag_set);
3700 put_disk(rbd_dev->disk);
a0cab924 3701 rbd_dev->disk = NULL;
602adf40
YS
3702}
3703
788e2df3 3704static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
3705 struct ceph_object_id *oid,
3706 struct ceph_object_locator *oloc,
3707 void *buf, int buf_len)
788e2df3
AE
3708
3709{
fe5478e0
ID
3710 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3711 struct ceph_osd_request *req;
3712 struct page **pages;
3713 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
3714 int ret;
3715
fe5478e0
ID
3716 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3717 if (!req)
3718 return -ENOMEM;
788e2df3 3719
fe5478e0
ID
3720 ceph_oid_copy(&req->r_base_oid, oid);
3721 ceph_oloc_copy(&req->r_base_oloc, oloc);
3722 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 3723
fe5478e0 3724 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 3725 if (ret)
fe5478e0 3726 goto out_req;
788e2df3 3727
fe5478e0
ID
3728 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3729 if (IS_ERR(pages)) {
3730 ret = PTR_ERR(pages);
3731 goto out_req;
3732 }
1ceae7ef 3733
fe5478e0
ID
3734 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3735 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3736 true);
3737
3738 ceph_osdc_start_request(osdc, req, false);
3739 ret = ceph_osdc_wait_request(osdc, req);
3740 if (ret >= 0)
3741 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 3742
fe5478e0
ID
3743out_req:
3744 ceph_osdc_put_request(req);
788e2df3
AE
3745 return ret;
3746}
3747
602adf40 3748/*
662518b1
AE
3749 * Read the complete header for the given rbd device. On successful
3750 * return, the rbd_dev->header field will contain up-to-date
3751 * information about the image.
602adf40 3752 */
99a41ebc 3753static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3754{
4156d998 3755 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3756 u32 snap_count = 0;
4156d998
AE
3757 u64 names_size = 0;
3758 u32 want_count;
3759 int ret;
602adf40 3760
00f1f36f 3761 /*
4156d998
AE
3762 * The complete header will include an array of its 64-bit
3763 * snapshot ids, followed by the names of those snapshots as
3764 * a contiguous block of NUL-terminated strings. Note that
3765 * the number of snapshots could change by the time we read
3766 * it in, in which case we re-read it.
00f1f36f 3767 */
4156d998
AE
3768 do {
3769 size_t size;
3770
3771 kfree(ondisk);
3772
3773 size = sizeof (*ondisk);
3774 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3775 size += names_size;
3776 ondisk = kmalloc(size, GFP_KERNEL);
3777 if (!ondisk)
662518b1 3778 return -ENOMEM;
4156d998 3779
fe5478e0
ID
3780 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3781 &rbd_dev->header_oloc, ondisk, size);
4156d998 3782 if (ret < 0)
662518b1 3783 goto out;
c0cd10db 3784 if ((size_t)ret < size) {
4156d998 3785 ret = -ENXIO;
06ecc6cb
AE
3786 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3787 size, ret);
662518b1 3788 goto out;
4156d998
AE
3789 }
3790 if (!rbd_dev_ondisk_valid(ondisk)) {
3791 ret = -ENXIO;
06ecc6cb 3792 rbd_warn(rbd_dev, "invalid header");
662518b1 3793 goto out;
81e759fb 3794 }
602adf40 3795
4156d998
AE
3796 names_size = le64_to_cpu(ondisk->snap_names_len);
3797 want_count = snap_count;
3798 snap_count = le32_to_cpu(ondisk->snap_count);
3799 } while (snap_count != want_count);
00f1f36f 3800
662518b1
AE
3801 ret = rbd_header_from_disk(rbd_dev, ondisk);
3802out:
4156d998
AE
3803 kfree(ondisk);
3804
3805 return ret;
602adf40
YS
3806}
3807
15228ede
AE
3808/*
3809 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3810 * has disappeared from the (just updated) snapshot context.
3811 */
3812static void rbd_exists_validate(struct rbd_device *rbd_dev)
3813{
3814 u64 snap_id;
3815
3816 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3817 return;
3818
3819 snap_id = rbd_dev->spec->snap_id;
3820 if (snap_id == CEPH_NOSNAP)
3821 return;
3822
3823 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3824 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3825}
3826
9875201e
JD
3827static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3828{
3829 sector_t size;
9875201e
JD
3830
3831 /*
811c6688
ID
3832 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3833 * try to update its size. If REMOVING is set, updating size
3834 * is just useless work since the device can't be opened.
9875201e 3835 */
811c6688
ID
3836 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3837 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
3838 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3839 dout("setting size to %llu sectors", (unsigned long long)size);
3840 set_capacity(rbd_dev->disk, size);
3841 revalidate_disk(rbd_dev->disk);
3842 }
3843}
3844
cc4a38bd 3845static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3846{
e627db08 3847 u64 mapping_size;
1fe5e993
AE
3848 int ret;
3849
cfbf6377 3850 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3851 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
3852
3853 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 3854 if (ret)
73e39e4d 3855 goto out;
15228ede 3856
e8f59b59
ID
3857 /*
3858 * If there is a parent, see if it has disappeared due to the
3859 * mapped image getting flattened.
3860 */
3861 if (rbd_dev->parent) {
3862 ret = rbd_dev_v2_parent_info(rbd_dev);
3863 if (ret)
73e39e4d 3864 goto out;
e8f59b59
ID
3865 }
3866
5ff1108c 3867 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 3868 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
3869 } else {
3870 /* validate mapped snapshot's EXISTS flag */
3871 rbd_exists_validate(rbd_dev);
3872 }
15228ede 3873
73e39e4d 3874out:
cfbf6377 3875 up_write(&rbd_dev->header_rwsem);
73e39e4d 3876 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 3877 rbd_dev_update_size(rbd_dev);
1fe5e993 3878
73e39e4d 3879 return ret;
1fe5e993
AE
3880}
3881
d6296d39
CH
3882static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3883 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
3884{
3885 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3886
3887 INIT_WORK(work, rbd_queue_workfn);
3888 return 0;
3889}
3890
f363b089 3891static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 3892 .queue_rq = rbd_queue_rq,
7ad18afa
CH
3893 .init_request = rbd_init_request,
3894};
3895
602adf40
YS
3896static int rbd_init_disk(struct rbd_device *rbd_dev)
3897{
3898 struct gendisk *disk;
3899 struct request_queue *q;
593a9e7b 3900 u64 segment_size;
7ad18afa 3901 int err;
602adf40 3902
602adf40 3903 /* create gendisk info */
7e513d43
ID
3904 disk = alloc_disk(single_major ?
3905 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3906 RBD_MINORS_PER_MAJOR);
602adf40 3907 if (!disk)
1fcdb8aa 3908 return -ENOMEM;
602adf40 3909
f0f8cef5 3910 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3911 rbd_dev->dev_id);
602adf40 3912 disk->major = rbd_dev->major;
dd82fff1 3913 disk->first_minor = rbd_dev->minor;
7e513d43
ID
3914 if (single_major)
3915 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
3916 disk->fops = &rbd_bd_ops;
3917 disk->private_data = rbd_dev;
3918
7ad18afa
CH
3919 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3920 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 3921 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 3922 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 3923 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
3924 rbd_dev->tag_set.nr_hw_queues = 1;
3925 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3926
3927 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3928 if (err)
602adf40 3929 goto out_disk;
029bcbd8 3930
7ad18afa
CH
3931 q = blk_mq_init_queue(&rbd_dev->tag_set);
3932 if (IS_ERR(q)) {
3933 err = PTR_ERR(q);
3934 goto out_tag_set;
3935 }
3936
d8a2c89c
ID
3937 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
3938 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 3939
029bcbd8 3940 /* set io sizes to object size */
593a9e7b
AE
3941 segment_size = rbd_obj_bytes(&rbd_dev->header);
3942 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 3943 q->limits.max_sectors = queue_max_hw_sectors(q);
21acdf45 3944 blk_queue_max_segments(q, USHRT_MAX);
24f1df60 3945 blk_queue_max_segment_size(q, UINT_MAX);
593a9e7b
AE
3946 blk_queue_io_min(q, segment_size);
3947 blk_queue_io_opt(q, segment_size);
029bcbd8 3948
90e98c52
GZ
3949 /* enable the discard support */
3950 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
3951 q->limits.discard_granularity = segment_size;
2bb4cd5c 3952 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
6ac56951 3953 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
90e98c52 3954
bae818ee 3955 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 3956 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 3957
5769ed0c
ID
3958 /*
3959 * disk_release() expects a queue ref from add_disk() and will
3960 * put it. Hold an extra ref until add_disk() is called.
3961 */
3962 WARN_ON(!blk_get_queue(q));
602adf40 3963 disk->queue = q;
602adf40
YS
3964 q->queuedata = rbd_dev;
3965
3966 rbd_dev->disk = disk;
602adf40 3967
602adf40 3968 return 0;
7ad18afa
CH
3969out_tag_set:
3970 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
3971out_disk:
3972 put_disk(disk);
7ad18afa 3973 return err;
602adf40
YS
3974}
3975
dfc5606d
YS
3976/*
3977 sysfs
3978*/
3979
593a9e7b
AE
3980static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3981{
3982 return container_of(dev, struct rbd_device, dev);
3983}
3984
dfc5606d
YS
3985static ssize_t rbd_size_show(struct device *dev,
3986 struct device_attribute *attr, char *buf)
3987{
593a9e7b 3988 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3989
fc71d833
AE
3990 return sprintf(buf, "%llu\n",
3991 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3992}
3993
34b13184
AE
3994/*
3995 * Note this shows the features for whatever's mapped, which is not
3996 * necessarily the base image.
3997 */
3998static ssize_t rbd_features_show(struct device *dev,
3999 struct device_attribute *attr, char *buf)
4000{
4001 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4002
4003 return sprintf(buf, "0x%016llx\n",
fc71d833 4004 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4005}
4006
dfc5606d
YS
4007static ssize_t rbd_major_show(struct device *dev,
4008 struct device_attribute *attr, char *buf)
4009{
593a9e7b 4010 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4011
fc71d833
AE
4012 if (rbd_dev->major)
4013 return sprintf(buf, "%d\n", rbd_dev->major);
4014
4015 return sprintf(buf, "(none)\n");
dd82fff1
ID
4016}
4017
4018static ssize_t rbd_minor_show(struct device *dev,
4019 struct device_attribute *attr, char *buf)
4020{
4021 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4022
dd82fff1 4023 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4024}
4025
005a07bf
ID
4026static ssize_t rbd_client_addr_show(struct device *dev,
4027 struct device_attribute *attr, char *buf)
4028{
4029 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4030 struct ceph_entity_addr *client_addr =
4031 ceph_client_addr(rbd_dev->rbd_client->client);
4032
4033 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4034 le32_to_cpu(client_addr->nonce));
4035}
4036
dfc5606d
YS
4037static ssize_t rbd_client_id_show(struct device *dev,
4038 struct device_attribute *attr, char *buf)
602adf40 4039{
593a9e7b 4040 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4041
1dbb4399 4042 return sprintf(buf, "client%lld\n",
033268a5 4043 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4044}
4045
267fb90b
MC
4046static ssize_t rbd_cluster_fsid_show(struct device *dev,
4047 struct device_attribute *attr, char *buf)
4048{
4049 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4050
4051 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4052}
4053
0d6d1e9c
MC
4054static ssize_t rbd_config_info_show(struct device *dev,
4055 struct device_attribute *attr, char *buf)
4056{
4057 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4058
4059 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4060}
4061
dfc5606d
YS
4062static ssize_t rbd_pool_show(struct device *dev,
4063 struct device_attribute *attr, char *buf)
602adf40 4064{
593a9e7b 4065 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4066
0d7dbfce 4067 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4068}
4069
9bb2f334
AE
4070static ssize_t rbd_pool_id_show(struct device *dev,
4071 struct device_attribute *attr, char *buf)
4072{
4073 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4074
0d7dbfce 4075 return sprintf(buf, "%llu\n",
fc71d833 4076 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4077}
4078
dfc5606d
YS
4079static ssize_t rbd_name_show(struct device *dev,
4080 struct device_attribute *attr, char *buf)
4081{
593a9e7b 4082 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4083
a92ffdf8
AE
4084 if (rbd_dev->spec->image_name)
4085 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4086
4087 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4088}
4089
589d30e0
AE
4090static ssize_t rbd_image_id_show(struct device *dev,
4091 struct device_attribute *attr, char *buf)
4092{
4093 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4094
0d7dbfce 4095 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4096}
4097
34b13184
AE
4098/*
4099 * Shows the name of the currently-mapped snapshot (or
4100 * RBD_SNAP_HEAD_NAME for the base image).
4101 */
dfc5606d
YS
4102static ssize_t rbd_snap_show(struct device *dev,
4103 struct device_attribute *attr,
4104 char *buf)
4105{
593a9e7b 4106 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4107
0d7dbfce 4108 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4109}
4110
92a58671
MC
4111static ssize_t rbd_snap_id_show(struct device *dev,
4112 struct device_attribute *attr, char *buf)
4113{
4114 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4115
4116 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4117}
4118
86b00e0d 4119/*
ff96128f
ID
4120 * For a v2 image, shows the chain of parent images, separated by empty
4121 * lines. For v1 images or if there is no parent, shows "(no parent
4122 * image)".
86b00e0d
AE
4123 */
4124static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4125 struct device_attribute *attr,
4126 char *buf)
86b00e0d
AE
4127{
4128 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4129 ssize_t count = 0;
86b00e0d 4130
ff96128f 4131 if (!rbd_dev->parent)
86b00e0d
AE
4132 return sprintf(buf, "(no parent image)\n");
4133
ff96128f
ID
4134 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4135 struct rbd_spec *spec = rbd_dev->parent_spec;
4136
4137 count += sprintf(&buf[count], "%s"
4138 "pool_id %llu\npool_name %s\n"
4139 "image_id %s\nimage_name %s\n"
4140 "snap_id %llu\nsnap_name %s\n"
4141 "overlap %llu\n",
4142 !count ? "" : "\n", /* first? */
4143 spec->pool_id, spec->pool_name,
4144 spec->image_id, spec->image_name ?: "(unknown)",
4145 spec->snap_id, spec->snap_name,
4146 rbd_dev->parent_overlap);
4147 }
4148
4149 return count;
86b00e0d
AE
4150}
4151
dfc5606d
YS
4152static ssize_t rbd_image_refresh(struct device *dev,
4153 struct device_attribute *attr,
4154 const char *buf,
4155 size_t size)
4156{
593a9e7b 4157 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4158 int ret;
602adf40 4159
cc4a38bd 4160 ret = rbd_dev_refresh(rbd_dev);
e627db08 4161 if (ret)
52bb1f9b 4162 return ret;
b813623a 4163
52bb1f9b 4164 return size;
dfc5606d 4165}
602adf40 4166
dfc5606d 4167static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4168static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4169static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4170static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4171static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4172static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4173static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4174static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4175static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4176static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4177static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4178static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4179static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4180static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4181static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4182static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4183
4184static struct attribute *rbd_attrs[] = {
4185 &dev_attr_size.attr,
34b13184 4186 &dev_attr_features.attr,
dfc5606d 4187 &dev_attr_major.attr,
dd82fff1 4188 &dev_attr_minor.attr,
005a07bf 4189 &dev_attr_client_addr.attr,
dfc5606d 4190 &dev_attr_client_id.attr,
267fb90b 4191 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4192 &dev_attr_config_info.attr,
dfc5606d 4193 &dev_attr_pool.attr,
9bb2f334 4194 &dev_attr_pool_id.attr,
dfc5606d 4195 &dev_attr_name.attr,
589d30e0 4196 &dev_attr_image_id.attr,
dfc5606d 4197 &dev_attr_current_snap.attr,
92a58671 4198 &dev_attr_snap_id.attr,
86b00e0d 4199 &dev_attr_parent.attr,
dfc5606d 4200 &dev_attr_refresh.attr,
dfc5606d
YS
4201 NULL
4202};
4203
4204static struct attribute_group rbd_attr_group = {
4205 .attrs = rbd_attrs,
4206};
4207
4208static const struct attribute_group *rbd_attr_groups[] = {
4209 &rbd_attr_group,
4210 NULL
4211};
4212
6cac4695 4213static void rbd_dev_release(struct device *dev);
dfc5606d 4214
b9942bc9 4215static const struct device_type rbd_device_type = {
dfc5606d
YS
4216 .name = "rbd",
4217 .groups = rbd_attr_groups,
6cac4695 4218 .release = rbd_dev_release,
dfc5606d
YS
4219};
4220
8b8fb99c
AE
4221static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4222{
4223 kref_get(&spec->kref);
4224
4225 return spec;
4226}
4227
4228static void rbd_spec_free(struct kref *kref);
4229static void rbd_spec_put(struct rbd_spec *spec)
4230{
4231 if (spec)
4232 kref_put(&spec->kref, rbd_spec_free);
4233}
4234
4235static struct rbd_spec *rbd_spec_alloc(void)
4236{
4237 struct rbd_spec *spec;
4238
4239 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4240 if (!spec)
4241 return NULL;
04077599
ID
4242
4243 spec->pool_id = CEPH_NOPOOL;
4244 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4245 kref_init(&spec->kref);
4246
8b8fb99c
AE
4247 return spec;
4248}
4249
4250static void rbd_spec_free(struct kref *kref)
4251{
4252 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4253
4254 kfree(spec->pool_name);
4255 kfree(spec->image_id);
4256 kfree(spec->image_name);
4257 kfree(spec->snap_name);
4258 kfree(spec);
4259}
4260
1643dfa4 4261static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4262{
99d16943 4263 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4264 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4265
c41d13a3 4266 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4267 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4268 kfree(rbd_dev->config_info);
c41d13a3 4269
dd5ac32d
ID
4270 rbd_put_client(rbd_dev->rbd_client);
4271 rbd_spec_put(rbd_dev->spec);
4272 kfree(rbd_dev->opts);
4273 kfree(rbd_dev);
1643dfa4
ID
4274}
4275
4276static void rbd_dev_release(struct device *dev)
4277{
4278 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4279 bool need_put = !!rbd_dev->opts;
4280
4281 if (need_put) {
4282 destroy_workqueue(rbd_dev->task_wq);
4283 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4284 }
4285
4286 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4287
4288 /*
4289 * This is racy, but way better than putting module outside of
4290 * the release callback. The race window is pretty small, so
4291 * doing something similar to dm (dm-builtin.c) is overkill.
4292 */
4293 if (need_put)
4294 module_put(THIS_MODULE);
4295}
4296
1643dfa4
ID
4297static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4298 struct rbd_spec *spec)
c53d5893
AE
4299{
4300 struct rbd_device *rbd_dev;
4301
1643dfa4 4302 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4303 if (!rbd_dev)
4304 return NULL;
4305
4306 spin_lock_init(&rbd_dev->lock);
4307 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4308 init_rwsem(&rbd_dev->header_rwsem);
4309
7e97332e 4310 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4311 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4312 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4313
99d16943
ID
4314 mutex_init(&rbd_dev->watch_mutex);
4315 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4316 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4317
ed95b21a
ID
4318 init_rwsem(&rbd_dev->lock_rwsem);
4319 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4320 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4321 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4322 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4323 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4324 init_waitqueue_head(&rbd_dev->lock_waitq);
4325
dd5ac32d
ID
4326 rbd_dev->dev.bus = &rbd_bus_type;
4327 rbd_dev->dev.type = &rbd_device_type;
4328 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4329 device_initialize(&rbd_dev->dev);
4330
c53d5893 4331 rbd_dev->rbd_client = rbdc;
d147543d 4332 rbd_dev->spec = spec;
0903e875 4333
1643dfa4
ID
4334 return rbd_dev;
4335}
4336
4337/*
4338 * Create a mapping rbd_dev.
4339 */
4340static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4341 struct rbd_spec *spec,
4342 struct rbd_options *opts)
4343{
4344 struct rbd_device *rbd_dev;
4345
4346 rbd_dev = __rbd_dev_create(rbdc, spec);
4347 if (!rbd_dev)
4348 return NULL;
4349
4350 rbd_dev->opts = opts;
4351
4352 /* get an id and fill in device name */
4353 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4354 minor_to_rbd_dev_id(1 << MINORBITS),
4355 GFP_KERNEL);
4356 if (rbd_dev->dev_id < 0)
4357 goto fail_rbd_dev;
4358
4359 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4360 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4361 rbd_dev->name);
4362 if (!rbd_dev->task_wq)
4363 goto fail_dev_id;
dd5ac32d 4364
1643dfa4
ID
4365 /* we have a ref from do_rbd_add() */
4366 __module_get(THIS_MODULE);
dd5ac32d 4367
1643dfa4 4368 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4369 return rbd_dev;
1643dfa4
ID
4370
4371fail_dev_id:
4372 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4373fail_rbd_dev:
4374 rbd_dev_free(rbd_dev);
4375 return NULL;
c53d5893
AE
4376}
4377
4378static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4379{
dd5ac32d
ID
4380 if (rbd_dev)
4381 put_device(&rbd_dev->dev);
c53d5893
AE
4382}
4383
9d475de5
AE
4384/*
4385 * Get the size and object order for an image snapshot, or if
4386 * snap_id is CEPH_NOSNAP, gets this information for the base
4387 * image.
4388 */
4389static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4390 u8 *order, u64 *snap_size)
4391{
4392 __le64 snapid = cpu_to_le64(snap_id);
4393 int ret;
4394 struct {
4395 u8 order;
4396 __le64 size;
4397 } __attribute__ ((packed)) size_buf = { 0 };
4398
ecd4a68a
ID
4399 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4400 &rbd_dev->header_oloc, "get_size",
4401 &snapid, sizeof(snapid),
4402 &size_buf, sizeof(size_buf));
36be9a76 4403 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4404 if (ret < 0)
4405 return ret;
57385b51
AE
4406 if (ret < sizeof (size_buf))
4407 return -ERANGE;
9d475de5 4408
c3545579 4409 if (order) {
c86f86e9 4410 *order = size_buf.order;
c3545579
JD
4411 dout(" order %u", (unsigned int)*order);
4412 }
9d475de5
AE
4413 *snap_size = le64_to_cpu(size_buf.size);
4414
c3545579
JD
4415 dout(" snap_id 0x%016llx snap_size = %llu\n",
4416 (unsigned long long)snap_id,
57385b51 4417 (unsigned long long)*snap_size);
9d475de5
AE
4418
4419 return 0;
4420}
4421
4422static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4423{
4424 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4425 &rbd_dev->header.obj_order,
4426 &rbd_dev->header.image_size);
4427}
4428
1e130199
AE
4429static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4430{
4431 void *reply_buf;
4432 int ret;
4433 void *p;
4434
4435 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4436 if (!reply_buf)
4437 return -ENOMEM;
4438
ecd4a68a
ID
4439 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4440 &rbd_dev->header_oloc, "get_object_prefix",
4441 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4442 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4443 if (ret < 0)
4444 goto out;
4445
4446 p = reply_buf;
4447 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4448 p + ret, NULL, GFP_NOIO);
4449 ret = 0;
1e130199
AE
4450
4451 if (IS_ERR(rbd_dev->header.object_prefix)) {
4452 ret = PTR_ERR(rbd_dev->header.object_prefix);
4453 rbd_dev->header.object_prefix = NULL;
4454 } else {
4455 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4456 }
1e130199
AE
4457out:
4458 kfree(reply_buf);
4459
4460 return ret;
4461}
4462
b1b5402a
AE
4463static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4464 u64 *snap_features)
4465{
4466 __le64 snapid = cpu_to_le64(snap_id);
4467 struct {
4468 __le64 features;
4469 __le64 incompat;
4157976b 4470 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4471 u64 unsup;
b1b5402a
AE
4472 int ret;
4473
ecd4a68a
ID
4474 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4475 &rbd_dev->header_oloc, "get_features",
4476 &snapid, sizeof(snapid),
4477 &features_buf, sizeof(features_buf));
36be9a76 4478 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4479 if (ret < 0)
4480 return ret;
57385b51
AE
4481 if (ret < sizeof (features_buf))
4482 return -ERANGE;
d889140c 4483
d3767f0f
ID
4484 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4485 if (unsup) {
4486 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4487 unsup);
b8f5c6ed 4488 return -ENXIO;
d3767f0f 4489 }
d889140c 4490
b1b5402a
AE
4491 *snap_features = le64_to_cpu(features_buf.features);
4492
4493 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4494 (unsigned long long)snap_id,
4495 (unsigned long long)*snap_features,
4496 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4497
4498 return 0;
4499}
4500
4501static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4502{
4503 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4504 &rbd_dev->header.features);
4505}
4506
86b00e0d
AE
4507static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4508{
4509 struct rbd_spec *parent_spec;
4510 size_t size;
4511 void *reply_buf = NULL;
4512 __le64 snapid;
4513 void *p;
4514 void *end;
642a2537 4515 u64 pool_id;
86b00e0d 4516 char *image_id;
3b5cf2a2 4517 u64 snap_id;
86b00e0d 4518 u64 overlap;
86b00e0d
AE
4519 int ret;
4520
4521 parent_spec = rbd_spec_alloc();
4522 if (!parent_spec)
4523 return -ENOMEM;
4524
4525 size = sizeof (__le64) + /* pool_id */
4526 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4527 sizeof (__le64) + /* snap_id */
4528 sizeof (__le64); /* overlap */
4529 reply_buf = kmalloc(size, GFP_KERNEL);
4530 if (!reply_buf) {
4531 ret = -ENOMEM;
4532 goto out_err;
4533 }
4534
4d9b67cd 4535 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
4536 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4537 &rbd_dev->header_oloc, "get_parent",
4538 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4539 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
4540 if (ret < 0)
4541 goto out_err;
4542
86b00e0d 4543 p = reply_buf;
57385b51
AE
4544 end = reply_buf + ret;
4545 ret = -ERANGE;
642a2537 4546 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
4547 if (pool_id == CEPH_NOPOOL) {
4548 /*
4549 * Either the parent never existed, or we have
4550 * record of it but the image got flattened so it no
4551 * longer has a parent. When the parent of a
4552 * layered image disappears we immediately set the
4553 * overlap to 0. The effect of this is that all new
4554 * requests will be treated as if the image had no
4555 * parent.
4556 */
4557 if (rbd_dev->parent_overlap) {
4558 rbd_dev->parent_overlap = 0;
392a9dad
AE
4559 rbd_dev_parent_put(rbd_dev);
4560 pr_info("%s: clone image has been flattened\n",
4561 rbd_dev->disk->disk_name);
4562 }
4563
86b00e0d 4564 goto out; /* No parent? No problem. */
392a9dad 4565 }
86b00e0d 4566
0903e875
AE
4567 /* The ceph file layout needs to fit pool id in 32 bits */
4568
4569 ret = -EIO;
642a2537 4570 if (pool_id > (u64)U32_MAX) {
9584d508 4571 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 4572 (unsigned long long)pool_id, U32_MAX);
57385b51 4573 goto out_err;
c0cd10db 4574 }
0903e875 4575
979ed480 4576 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
4577 if (IS_ERR(image_id)) {
4578 ret = PTR_ERR(image_id);
4579 goto out_err;
4580 }
3b5cf2a2 4581 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
4582 ceph_decode_64_safe(&p, end, overlap, out_err);
4583
3b5cf2a2
AE
4584 /*
4585 * The parent won't change (except when the clone is
4586 * flattened, already handled that). So we only need to
4587 * record the parent spec we have not already done so.
4588 */
4589 if (!rbd_dev->parent_spec) {
4590 parent_spec->pool_id = pool_id;
4591 parent_spec->image_id = image_id;
4592 parent_spec->snap_id = snap_id;
70cf49cf
AE
4593 rbd_dev->parent_spec = parent_spec;
4594 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
4595 } else {
4596 kfree(image_id);
3b5cf2a2
AE
4597 }
4598
4599 /*
cf32bd9c
ID
4600 * We always update the parent overlap. If it's zero we issue
4601 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 4602 */
3b5cf2a2 4603 if (!overlap) {
3b5cf2a2 4604 if (parent_spec) {
cf32bd9c
ID
4605 /* refresh, careful to warn just once */
4606 if (rbd_dev->parent_overlap)
4607 rbd_warn(rbd_dev,
4608 "clone now standalone (overlap became 0)");
3b5cf2a2 4609 } else {
cf32bd9c
ID
4610 /* initial probe */
4611 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 4612 }
70cf49cf 4613 }
cf32bd9c
ID
4614 rbd_dev->parent_overlap = overlap;
4615
86b00e0d
AE
4616out:
4617 ret = 0;
4618out_err:
4619 kfree(reply_buf);
4620 rbd_spec_put(parent_spec);
4621
4622 return ret;
4623}
4624
cc070d59
AE
4625static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4626{
4627 struct {
4628 __le64 stripe_unit;
4629 __le64 stripe_count;
4630 } __attribute__ ((packed)) striping_info_buf = { 0 };
4631 size_t size = sizeof (striping_info_buf);
4632 void *p;
4633 u64 obj_size;
4634 u64 stripe_unit;
4635 u64 stripe_count;
4636 int ret;
4637
ecd4a68a
ID
4638 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4639 &rbd_dev->header_oloc, "get_stripe_unit_count",
4640 NULL, 0, &striping_info_buf, size);
cc070d59
AE
4641 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4642 if (ret < 0)
4643 return ret;
4644 if (ret < size)
4645 return -ERANGE;
4646
4647 /*
4648 * We don't actually support the "fancy striping" feature
4649 * (STRIPINGV2) yet, but if the striping sizes are the
4650 * defaults the behavior is the same as before. So find
4651 * out, and only fail if the image has non-default values.
4652 */
4653 ret = -EINVAL;
5bc3fb17 4654 obj_size = rbd_obj_bytes(&rbd_dev->header);
cc070d59
AE
4655 p = &striping_info_buf;
4656 stripe_unit = ceph_decode_64(&p);
4657 if (stripe_unit != obj_size) {
4658 rbd_warn(rbd_dev, "unsupported stripe unit "
4659 "(got %llu want %llu)",
4660 stripe_unit, obj_size);
4661 return -EINVAL;
4662 }
4663 stripe_count = ceph_decode_64(&p);
4664 if (stripe_count != 1) {
4665 rbd_warn(rbd_dev, "unsupported stripe count "
4666 "(got %llu want 1)", stripe_count);
4667 return -EINVAL;
4668 }
500d0c0f
AE
4669 rbd_dev->header.stripe_unit = stripe_unit;
4670 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
4671
4672 return 0;
4673}
4674
7e97332e
ID
4675static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4676{
4677 __le64 data_pool_id;
4678 int ret;
4679
4680 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4681 &rbd_dev->header_oloc, "get_data_pool",
4682 NULL, 0, &data_pool_id, sizeof(data_pool_id));
4683 if (ret < 0)
4684 return ret;
4685 if (ret < sizeof(data_pool_id))
4686 return -EBADMSG;
4687
4688 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4689 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4690 return 0;
4691}
4692
9e15b77d
AE
4693static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4694{
ecd4a68a 4695 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
4696 size_t image_id_size;
4697 char *image_id;
4698 void *p;
4699 void *end;
4700 size_t size;
4701 void *reply_buf = NULL;
4702 size_t len = 0;
4703 char *image_name = NULL;
4704 int ret;
4705
4706 rbd_assert(!rbd_dev->spec->image_name);
4707
69e7a02f
AE
4708 len = strlen(rbd_dev->spec->image_id);
4709 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4710 image_id = kmalloc(image_id_size, GFP_KERNEL);
4711 if (!image_id)
4712 return NULL;
4713
4714 p = image_id;
4157976b 4715 end = image_id + image_id_size;
57385b51 4716 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4717
4718 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4719 reply_buf = kmalloc(size, GFP_KERNEL);
4720 if (!reply_buf)
4721 goto out;
4722
ecd4a68a
ID
4723 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4724 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4725 "dir_get_name", image_id, image_id_size,
4726 reply_buf, size);
9e15b77d
AE
4727 if (ret < 0)
4728 goto out;
4729 p = reply_buf;
f40eb349
AE
4730 end = reply_buf + ret;
4731
9e15b77d
AE
4732 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4733 if (IS_ERR(image_name))
4734 image_name = NULL;
4735 else
4736 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4737out:
4738 kfree(reply_buf);
4739 kfree(image_id);
4740
4741 return image_name;
4742}
4743
2ad3d716
AE
4744static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4745{
4746 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4747 const char *snap_name;
4748 u32 which = 0;
4749
4750 /* Skip over names until we find the one we are looking for */
4751
4752 snap_name = rbd_dev->header.snap_names;
4753 while (which < snapc->num_snaps) {
4754 if (!strcmp(name, snap_name))
4755 return snapc->snaps[which];
4756 snap_name += strlen(snap_name) + 1;
4757 which++;
4758 }
4759 return CEPH_NOSNAP;
4760}
4761
4762static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4763{
4764 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4765 u32 which;
4766 bool found = false;
4767 u64 snap_id;
4768
4769 for (which = 0; !found && which < snapc->num_snaps; which++) {
4770 const char *snap_name;
4771
4772 snap_id = snapc->snaps[which];
4773 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4774 if (IS_ERR(snap_name)) {
4775 /* ignore no-longer existing snapshots */
4776 if (PTR_ERR(snap_name) == -ENOENT)
4777 continue;
4778 else
4779 break;
4780 }
2ad3d716
AE
4781 found = !strcmp(name, snap_name);
4782 kfree(snap_name);
4783 }
4784 return found ? snap_id : CEPH_NOSNAP;
4785}
4786
4787/*
4788 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4789 * no snapshot by that name is found, or if an error occurs.
4790 */
4791static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4792{
4793 if (rbd_dev->image_format == 1)
4794 return rbd_v1_snap_id_by_name(rbd_dev, name);
4795
4796 return rbd_v2_snap_id_by_name(rbd_dev, name);
4797}
4798
9e15b77d 4799/*
04077599
ID
4800 * An image being mapped will have everything but the snap id.
4801 */
4802static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4803{
4804 struct rbd_spec *spec = rbd_dev->spec;
4805
4806 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4807 rbd_assert(spec->image_id && spec->image_name);
4808 rbd_assert(spec->snap_name);
4809
4810 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4811 u64 snap_id;
4812
4813 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4814 if (snap_id == CEPH_NOSNAP)
4815 return -ENOENT;
4816
4817 spec->snap_id = snap_id;
4818 } else {
4819 spec->snap_id = CEPH_NOSNAP;
4820 }
4821
4822 return 0;
4823}
4824
4825/*
4826 * A parent image will have all ids but none of the names.
e1d4213f 4827 *
04077599
ID
4828 * All names in an rbd spec are dynamically allocated. It's OK if we
4829 * can't figure out the name for an image id.
9e15b77d 4830 */
04077599 4831static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 4832{
2e9f7f1c
AE
4833 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4834 struct rbd_spec *spec = rbd_dev->spec;
4835 const char *pool_name;
4836 const char *image_name;
4837 const char *snap_name;
9e15b77d
AE
4838 int ret;
4839
04077599
ID
4840 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4841 rbd_assert(spec->image_id);
4842 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 4843
2e9f7f1c 4844 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4845
2e9f7f1c
AE
4846 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4847 if (!pool_name) {
4848 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4849 return -EIO;
4850 }
2e9f7f1c
AE
4851 pool_name = kstrdup(pool_name, GFP_KERNEL);
4852 if (!pool_name)
9e15b77d
AE
4853 return -ENOMEM;
4854
4855 /* Fetch the image name; tolerate failure here */
4856
2e9f7f1c
AE
4857 image_name = rbd_dev_image_name(rbd_dev);
4858 if (!image_name)
06ecc6cb 4859 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4860
04077599 4861 /* Fetch the snapshot name */
9e15b77d 4862
2e9f7f1c 4863 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4864 if (IS_ERR(snap_name)) {
4865 ret = PTR_ERR(snap_name);
9e15b77d 4866 goto out_err;
2e9f7f1c
AE
4867 }
4868
4869 spec->pool_name = pool_name;
4870 spec->image_name = image_name;
4871 spec->snap_name = snap_name;
9e15b77d
AE
4872
4873 return 0;
04077599 4874
9e15b77d 4875out_err:
2e9f7f1c
AE
4876 kfree(image_name);
4877 kfree(pool_name);
9e15b77d
AE
4878 return ret;
4879}
4880
cc4a38bd 4881static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4882{
4883 size_t size;
4884 int ret;
4885 void *reply_buf;
4886 void *p;
4887 void *end;
4888 u64 seq;
4889 u32 snap_count;
4890 struct ceph_snap_context *snapc;
4891 u32 i;
4892
4893 /*
4894 * We'll need room for the seq value (maximum snapshot id),
4895 * snapshot count, and array of that many snapshot ids.
4896 * For now we have a fixed upper limit on the number we're
4897 * prepared to receive.
4898 */
4899 size = sizeof (__le64) + sizeof (__le32) +
4900 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4901 reply_buf = kzalloc(size, GFP_KERNEL);
4902 if (!reply_buf)
4903 return -ENOMEM;
4904
ecd4a68a
ID
4905 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4906 &rbd_dev->header_oloc, "get_snapcontext",
4907 NULL, 0, reply_buf, size);
36be9a76 4908 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4909 if (ret < 0)
4910 goto out;
4911
35d489f9 4912 p = reply_buf;
57385b51
AE
4913 end = reply_buf + ret;
4914 ret = -ERANGE;
35d489f9
AE
4915 ceph_decode_64_safe(&p, end, seq, out);
4916 ceph_decode_32_safe(&p, end, snap_count, out);
4917
4918 /*
4919 * Make sure the reported number of snapshot ids wouldn't go
4920 * beyond the end of our buffer. But before checking that,
4921 * make sure the computed size of the snapshot context we
4922 * allocate is representable in a size_t.
4923 */
4924 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4925 / sizeof (u64)) {
4926 ret = -EINVAL;
4927 goto out;
4928 }
4929 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4930 goto out;
468521c1 4931 ret = 0;
35d489f9 4932
812164f8 4933 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4934 if (!snapc) {
4935 ret = -ENOMEM;
4936 goto out;
4937 }
35d489f9 4938 snapc->seq = seq;
35d489f9
AE
4939 for (i = 0; i < snap_count; i++)
4940 snapc->snaps[i] = ceph_decode_64(&p);
4941
49ece554 4942 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4943 rbd_dev->header.snapc = snapc;
4944
4945 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4946 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4947out:
4948 kfree(reply_buf);
4949
57385b51 4950 return ret;
35d489f9
AE
4951}
4952
54cac61f
AE
4953static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4954 u64 snap_id)
b8b1e2db
AE
4955{
4956 size_t size;
4957 void *reply_buf;
54cac61f 4958 __le64 snapid;
b8b1e2db
AE
4959 int ret;
4960 void *p;
4961 void *end;
b8b1e2db
AE
4962 char *snap_name;
4963
4964 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4965 reply_buf = kmalloc(size, GFP_KERNEL);
4966 if (!reply_buf)
4967 return ERR_PTR(-ENOMEM);
4968
54cac61f 4969 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
4970 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4971 &rbd_dev->header_oloc, "get_snapshot_name",
4972 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 4973 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4974 if (ret < 0) {
4975 snap_name = ERR_PTR(ret);
b8b1e2db 4976 goto out;
f40eb349 4977 }
b8b1e2db
AE
4978
4979 p = reply_buf;
f40eb349 4980 end = reply_buf + ret;
e5c35534 4981 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4982 if (IS_ERR(snap_name))
b8b1e2db 4983 goto out;
b8b1e2db 4984
f40eb349 4985 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4986 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4987out:
4988 kfree(reply_buf);
4989
f40eb349 4990 return snap_name;
b8b1e2db
AE
4991}
4992
2df3fac7 4993static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4994{
2df3fac7 4995 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4996 int ret;
117973fb 4997
1617e40c
JD
4998 ret = rbd_dev_v2_image_size(rbd_dev);
4999 if (ret)
cfbf6377 5000 return ret;
1617e40c 5001
2df3fac7
AE
5002 if (first_time) {
5003 ret = rbd_dev_v2_header_onetime(rbd_dev);
5004 if (ret)
cfbf6377 5005 return ret;
2df3fac7
AE
5006 }
5007
cc4a38bd 5008 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5009 if (ret && first_time) {
5010 kfree(rbd_dev->header.object_prefix);
5011 rbd_dev->header.object_prefix = NULL;
5012 }
117973fb
AE
5013
5014 return ret;
5015}
5016
a720ae09
ID
5017static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5018{
5019 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5020
5021 if (rbd_dev->image_format == 1)
5022 return rbd_dev_v1_header_info(rbd_dev);
5023
5024 return rbd_dev_v2_header_info(rbd_dev);
5025}
5026
e28fff26
AE
5027/*
5028 * Skips over white space at *buf, and updates *buf to point to the
5029 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5030 * the token (string of non-white space characters) found. Note
5031 * that *buf must be terminated with '\0'.
e28fff26
AE
5032 */
5033static inline size_t next_token(const char **buf)
5034{
5035 /*
5036 * These are the characters that produce nonzero for
5037 * isspace() in the "C" and "POSIX" locales.
5038 */
5039 const char *spaces = " \f\n\r\t\v";
5040
5041 *buf += strspn(*buf, spaces); /* Find start of token */
5042
5043 return strcspn(*buf, spaces); /* Return token length */
5044}
5045
ea3352f4
AE
5046/*
5047 * Finds the next token in *buf, dynamically allocates a buffer big
5048 * enough to hold a copy of it, and copies the token into the new
5049 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5050 * that a duplicate buffer is created even for a zero-length token.
5051 *
5052 * Returns a pointer to the newly-allocated duplicate, or a null
5053 * pointer if memory for the duplicate was not available. If
5054 * the lenp argument is a non-null pointer, the length of the token
5055 * (not including the '\0') is returned in *lenp.
5056 *
5057 * If successful, the *buf pointer will be updated to point beyond
5058 * the end of the found token.
5059 *
5060 * Note: uses GFP_KERNEL for allocation.
5061 */
5062static inline char *dup_token(const char **buf, size_t *lenp)
5063{
5064 char *dup;
5065 size_t len;
5066
5067 len = next_token(buf);
4caf35f9 5068 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5069 if (!dup)
5070 return NULL;
ea3352f4
AE
5071 *(dup + len) = '\0';
5072 *buf += len;
5073
5074 if (lenp)
5075 *lenp = len;
5076
5077 return dup;
5078}
5079
a725f65e 5080/*
859c31df
AE
5081 * Parse the options provided for an "rbd add" (i.e., rbd image
5082 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5083 * and the data written is passed here via a NUL-terminated buffer.
5084 * Returns 0 if successful or an error code otherwise.
d22f76e7 5085 *
859c31df
AE
5086 * The information extracted from these options is recorded in
5087 * the other parameters which return dynamically-allocated
5088 * structures:
5089 * ceph_opts
5090 * The address of a pointer that will refer to a ceph options
5091 * structure. Caller must release the returned pointer using
5092 * ceph_destroy_options() when it is no longer needed.
5093 * rbd_opts
5094 * Address of an rbd options pointer. Fully initialized by
5095 * this function; caller must release with kfree().
5096 * spec
5097 * Address of an rbd image specification pointer. Fully
5098 * initialized by this function based on parsed options.
5099 * Caller must release with rbd_spec_put().
5100 *
5101 * The options passed take this form:
5102 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5103 * where:
5104 * <mon_addrs>
5105 * A comma-separated list of one or more monitor addresses.
5106 * A monitor address is an ip address, optionally followed
5107 * by a port number (separated by a colon).
5108 * I.e.: ip1[:port1][,ip2[:port2]...]
5109 * <options>
5110 * A comma-separated list of ceph and/or rbd options.
5111 * <pool_name>
5112 * The name of the rados pool containing the rbd image.
5113 * <image_name>
5114 * The name of the image in that pool to map.
5115 * <snap_id>
5116 * An optional snapshot id. If provided, the mapping will
5117 * present data from the image at the time that snapshot was
5118 * created. The image head is used if no snapshot id is
5119 * provided. Snapshot mappings are always read-only.
a725f65e 5120 */
859c31df 5121static int rbd_add_parse_args(const char *buf,
dc79b113 5122 struct ceph_options **ceph_opts,
859c31df
AE
5123 struct rbd_options **opts,
5124 struct rbd_spec **rbd_spec)
e28fff26 5125{
d22f76e7 5126 size_t len;
859c31df 5127 char *options;
0ddebc0c 5128 const char *mon_addrs;
ecb4dc22 5129 char *snap_name;
0ddebc0c 5130 size_t mon_addrs_size;
859c31df 5131 struct rbd_spec *spec = NULL;
4e9afeba 5132 struct rbd_options *rbd_opts = NULL;
859c31df 5133 struct ceph_options *copts;
dc79b113 5134 int ret;
e28fff26
AE
5135
5136 /* The first four tokens are required */
5137
7ef3214a 5138 len = next_token(&buf);
4fb5d671
AE
5139 if (!len) {
5140 rbd_warn(NULL, "no monitor address(es) provided");
5141 return -EINVAL;
5142 }
0ddebc0c 5143 mon_addrs = buf;
f28e565a 5144 mon_addrs_size = len + 1;
7ef3214a 5145 buf += len;
a725f65e 5146
dc79b113 5147 ret = -EINVAL;
f28e565a
AE
5148 options = dup_token(&buf, NULL);
5149 if (!options)
dc79b113 5150 return -ENOMEM;
4fb5d671
AE
5151 if (!*options) {
5152 rbd_warn(NULL, "no options provided");
5153 goto out_err;
5154 }
e28fff26 5155
859c31df
AE
5156 spec = rbd_spec_alloc();
5157 if (!spec)
f28e565a 5158 goto out_mem;
859c31df
AE
5159
5160 spec->pool_name = dup_token(&buf, NULL);
5161 if (!spec->pool_name)
5162 goto out_mem;
4fb5d671
AE
5163 if (!*spec->pool_name) {
5164 rbd_warn(NULL, "no pool name provided");
5165 goto out_err;
5166 }
e28fff26 5167
69e7a02f 5168 spec->image_name = dup_token(&buf, NULL);
859c31df 5169 if (!spec->image_name)
f28e565a 5170 goto out_mem;
4fb5d671
AE
5171 if (!*spec->image_name) {
5172 rbd_warn(NULL, "no image name provided");
5173 goto out_err;
5174 }
d4b125e9 5175
f28e565a
AE
5176 /*
5177 * Snapshot name is optional; default is to use "-"
5178 * (indicating the head/no snapshot).
5179 */
3feeb894 5180 len = next_token(&buf);
820a5f3e 5181 if (!len) {
3feeb894
AE
5182 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5183 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5184 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5185 ret = -ENAMETOOLONG;
f28e565a 5186 goto out_err;
849b4260 5187 }
ecb4dc22
AE
5188 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5189 if (!snap_name)
f28e565a 5190 goto out_mem;
ecb4dc22
AE
5191 *(snap_name + len) = '\0';
5192 spec->snap_name = snap_name;
e5c35534 5193
0ddebc0c 5194 /* Initialize all rbd options to the defaults */
e28fff26 5195
4e9afeba
AE
5196 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5197 if (!rbd_opts)
5198 goto out_mem;
5199
5200 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5201 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5202 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
e010dd0a 5203 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
d22f76e7 5204
859c31df 5205 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5206 mon_addrs + mon_addrs_size - 1,
4e9afeba 5207 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5208 if (IS_ERR(copts)) {
5209 ret = PTR_ERR(copts);
dc79b113
AE
5210 goto out_err;
5211 }
859c31df
AE
5212 kfree(options);
5213
5214 *ceph_opts = copts;
4e9afeba 5215 *opts = rbd_opts;
859c31df 5216 *rbd_spec = spec;
0ddebc0c 5217
dc79b113 5218 return 0;
f28e565a 5219out_mem:
dc79b113 5220 ret = -ENOMEM;
d22f76e7 5221out_err:
859c31df
AE
5222 kfree(rbd_opts);
5223 rbd_spec_put(spec);
f28e565a 5224 kfree(options);
d22f76e7 5225
dc79b113 5226 return ret;
a725f65e
AE
5227}
5228
30ba1f02
ID
5229/*
5230 * Return pool id (>= 0) or a negative error code.
5231 */
5232static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5233{
a319bf56 5234 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5235 u64 newest_epoch;
30ba1f02
ID
5236 int tries = 0;
5237 int ret;
5238
5239again:
5240 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5241 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5242 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5243 &newest_epoch);
30ba1f02
ID
5244 if (ret < 0)
5245 return ret;
5246
5247 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5248 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5249 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5250 newest_epoch,
5251 opts->mount_timeout);
30ba1f02
ID
5252 goto again;
5253 } else {
5254 /* the osdmap we have is new enough */
5255 return -ENOENT;
5256 }
5257 }
5258
5259 return ret;
5260}
5261
e010dd0a
ID
5262static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5263{
5264 down_write(&rbd_dev->lock_rwsem);
5265 if (__rbd_is_lock_owner(rbd_dev))
5266 rbd_unlock(rbd_dev);
5267 up_write(&rbd_dev->lock_rwsem);
5268}
5269
5270static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5271{
5272 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5273 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5274 return -EINVAL;
5275 }
5276
5277 /* FIXME: "rbd map --exclusive" should be in interruptible */
5278 down_read(&rbd_dev->lock_rwsem);
5279 rbd_wait_state_locked(rbd_dev);
5280 up_read(&rbd_dev->lock_rwsem);
5281 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5282 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5283 return -EROFS;
5284 }
5285
5286 return 0;
5287}
5288
589d30e0
AE
5289/*
5290 * An rbd format 2 image has a unique identifier, distinct from the
5291 * name given to it by the user. Internally, that identifier is
5292 * what's used to specify the names of objects related to the image.
5293 *
5294 * A special "rbd id" object is used to map an rbd image name to its
5295 * id. If that object doesn't exist, then there is no v2 rbd image
5296 * with the supplied name.
5297 *
5298 * This function will record the given rbd_dev's image_id field if
5299 * it can be determined, and in that case will return 0. If any
5300 * errors occur a negative errno will be returned and the rbd_dev's
5301 * image_id field will be unchanged (and should be NULL).
5302 */
5303static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5304{
5305 int ret;
5306 size_t size;
ecd4a68a 5307 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5308 void *response;
c0fba368 5309 char *image_id;
2f82ee54 5310
2c0d0a10
AE
5311 /*
5312 * When probing a parent image, the image id is already
5313 * known (and the image name likely is not). There's no
c0fba368
AE
5314 * need to fetch the image id again in this case. We
5315 * do still need to set the image format though.
2c0d0a10 5316 */
c0fba368
AE
5317 if (rbd_dev->spec->image_id) {
5318 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5319
2c0d0a10 5320 return 0;
c0fba368 5321 }
2c0d0a10 5322
589d30e0
AE
5323 /*
5324 * First, see if the format 2 image id file exists, and if
5325 * so, get the image's persistent id from it.
5326 */
ecd4a68a
ID
5327 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5328 rbd_dev->spec->image_name);
5329 if (ret)
5330 return ret;
5331
5332 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5333
5334 /* Response will be an encoded string, which includes a length */
5335
5336 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5337 response = kzalloc(size, GFP_NOIO);
5338 if (!response) {
5339 ret = -ENOMEM;
5340 goto out;
5341 }
5342
c0fba368
AE
5343 /* If it doesn't exist we'll assume it's a format 1 image */
5344
ecd4a68a
ID
5345 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5346 "get_id", NULL, 0,
5347 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5348 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5349 if (ret == -ENOENT) {
5350 image_id = kstrdup("", GFP_KERNEL);
5351 ret = image_id ? 0 : -ENOMEM;
5352 if (!ret)
5353 rbd_dev->image_format = 1;
7dd440c9 5354 } else if (ret >= 0) {
c0fba368
AE
5355 void *p = response;
5356
5357 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5358 NULL, GFP_NOIO);
461f758a 5359 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5360 if (!ret)
5361 rbd_dev->image_format = 2;
c0fba368
AE
5362 }
5363
5364 if (!ret) {
5365 rbd_dev->spec->image_id = image_id;
5366 dout("image_id is %s\n", image_id);
589d30e0
AE
5367 }
5368out:
5369 kfree(response);
ecd4a68a 5370 ceph_oid_destroy(&oid);
589d30e0
AE
5371 return ret;
5372}
5373
3abef3b3
AE
5374/*
5375 * Undo whatever state changes are made by v1 or v2 header info
5376 * call.
5377 */
6fd48b3b
AE
5378static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5379{
5380 struct rbd_image_header *header;
5381
e69b8d41 5382 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5383
5384 /* Free dynamic fields from the header, then zero it out */
5385
5386 header = &rbd_dev->header;
812164f8 5387 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5388 kfree(header->snap_sizes);
5389 kfree(header->snap_names);
5390 kfree(header->object_prefix);
5391 memset(header, 0, sizeof (*header));
5392}
5393
2df3fac7 5394static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5395{
5396 int ret;
a30b71b9 5397
1e130199 5398 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5399 if (ret)
b1b5402a
AE
5400 goto out_err;
5401
2df3fac7
AE
5402 /*
5403 * Get the and check features for the image. Currently the
5404 * features are assumed to never change.
5405 */
b1b5402a 5406 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5407 if (ret)
9d475de5 5408 goto out_err;
35d489f9 5409
cc070d59
AE
5410 /* If the image supports fancy striping, get its parameters */
5411
5412 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5413 ret = rbd_dev_v2_striping_info(rbd_dev);
5414 if (ret < 0)
5415 goto out_err;
5416 }
a30b71b9 5417
7e97332e
ID
5418 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5419 ret = rbd_dev_v2_data_pool(rbd_dev);
5420 if (ret)
5421 goto out_err;
5422 }
5423
263423f8 5424 rbd_init_layout(rbd_dev);
35152979 5425 return 0;
263423f8 5426
9d475de5 5427out_err:
642a2537 5428 rbd_dev->header.features = 0;
1e130199
AE
5429 kfree(rbd_dev->header.object_prefix);
5430 rbd_dev->header.object_prefix = NULL;
9d475de5 5431 return ret;
a30b71b9
AE
5432}
5433
6d69bb53
ID
5434/*
5435 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5436 * rbd_dev_image_probe() recursion depth, which means it's also the
5437 * length of the already discovered part of the parent chain.
5438 */
5439static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5440{
2f82ee54 5441 struct rbd_device *parent = NULL;
124afba2
AE
5442 int ret;
5443
5444 if (!rbd_dev->parent_spec)
5445 return 0;
124afba2 5446
6d69bb53
ID
5447 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5448 pr_info("parent chain is too long (%d)\n", depth);
5449 ret = -EINVAL;
5450 goto out_err;
5451 }
5452
1643dfa4 5453 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5454 if (!parent) {
5455 ret = -ENOMEM;
124afba2 5456 goto out_err;
1f2c6651
ID
5457 }
5458
5459 /*
5460 * Images related by parent/child relationships always share
5461 * rbd_client and spec/parent_spec, so bump their refcounts.
5462 */
5463 __rbd_get_client(rbd_dev->rbd_client);
5464 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5465
6d69bb53 5466 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5467 if (ret < 0)
5468 goto out_err;
1f2c6651 5469
124afba2 5470 rbd_dev->parent = parent;
a2acd00e 5471 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5472 return 0;
1f2c6651 5473
124afba2 5474out_err:
1f2c6651 5475 rbd_dev_unparent(rbd_dev);
1761b229 5476 rbd_dev_destroy(parent);
124afba2
AE
5477 return ret;
5478}
5479
5769ed0c
ID
5480static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5481{
5482 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5483 rbd_dev_mapping_clear(rbd_dev);
5484 rbd_free_disk(rbd_dev);
5485 if (!single_major)
5486 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5487}
5488
811c6688
ID
5489/*
5490 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5491 * upon return.
5492 */
200a6a8b 5493static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5494{
83a06263 5495 int ret;
d1cf5788 5496
9b60e70b 5497 /* Record our major and minor device numbers. */
83a06263 5498
9b60e70b
ID
5499 if (!single_major) {
5500 ret = register_blkdev(0, rbd_dev->name);
5501 if (ret < 0)
1643dfa4 5502 goto err_out_unlock;
9b60e70b
ID
5503
5504 rbd_dev->major = ret;
5505 rbd_dev->minor = 0;
5506 } else {
5507 rbd_dev->major = rbd_major;
5508 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5509 }
83a06263
AE
5510
5511 /* Set up the blkdev mapping. */
5512
5513 ret = rbd_init_disk(rbd_dev);
5514 if (ret)
5515 goto err_out_blkdev;
5516
f35a4dee 5517 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5518 if (ret)
5519 goto err_out_disk;
bc1ecc65 5520
f35a4dee 5521 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
9568c93e 5522 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
f35a4dee 5523
5769ed0c 5524 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 5525 if (ret)
f5ee37bd 5526 goto err_out_mapping;
83a06263 5527
129b79d4 5528 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5529 up_write(&rbd_dev->header_rwsem);
5769ed0c 5530 return 0;
2f82ee54 5531
f35a4dee
AE
5532err_out_mapping:
5533 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5534err_out_disk:
5535 rbd_free_disk(rbd_dev);
5536err_out_blkdev:
9b60e70b
ID
5537 if (!single_major)
5538 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5539err_out_unlock:
5540 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5541 return ret;
5542}
5543
332bb12d
AE
5544static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5545{
5546 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5547 int ret;
332bb12d
AE
5548
5549 /* Record the header object name for this rbd image. */
5550
5551 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 5552 if (rbd_dev->image_format == 1)
c41d13a3
ID
5553 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5554 spec->image_name, RBD_SUFFIX);
332bb12d 5555 else
c41d13a3
ID
5556 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5557 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5558
c41d13a3 5559 return ret;
332bb12d
AE
5560}
5561
200a6a8b
AE
5562static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5563{
6fd48b3b 5564 rbd_dev_unprobe(rbd_dev);
fd22aef8
ID
5565 if (rbd_dev->opts)
5566 rbd_unregister_watch(rbd_dev);
6fd48b3b
AE
5567 rbd_dev->image_format = 0;
5568 kfree(rbd_dev->spec->image_id);
5569 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
5570}
5571
a30b71b9
AE
5572/*
5573 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
5574 * device. If this image is the one being mapped (i.e., not a
5575 * parent), initiate a watch on its header object before using that
5576 * object to get detailed information about the rbd image.
a30b71b9 5577 */
6d69bb53 5578static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
5579{
5580 int ret;
5581
5582 /*
3abef3b3
AE
5583 * Get the id from the image id object. Unless there's an
5584 * error, rbd_dev->spec->image_id will be filled in with
5585 * a dynamically-allocated string, and rbd_dev->image_format
5586 * will be set to either 1 or 2.
a30b71b9
AE
5587 */
5588 ret = rbd_dev_image_id(rbd_dev);
5589 if (ret)
c0fba368 5590 return ret;
c0fba368 5591
332bb12d
AE
5592 ret = rbd_dev_header_name(rbd_dev);
5593 if (ret)
5594 goto err_out_format;
5595
6d69bb53 5596 if (!depth) {
99d16943 5597 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
5598 if (ret) {
5599 if (ret == -ENOENT)
5600 pr_info("image %s/%s does not exist\n",
5601 rbd_dev->spec->pool_name,
5602 rbd_dev->spec->image_name);
c41d13a3 5603 goto err_out_format;
1fe48023 5604 }
1f3ef788 5605 }
b644de2b 5606
a720ae09 5607 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 5608 if (ret)
b644de2b 5609 goto err_out_watch;
83a06263 5610
04077599
ID
5611 /*
5612 * If this image is the one being mapped, we have pool name and
5613 * id, image name and id, and snap name - need to fill snap id.
5614 * Otherwise this is a parent image, identified by pool, image
5615 * and snap ids - need to fill in names for those ids.
5616 */
6d69bb53 5617 if (!depth)
04077599
ID
5618 ret = rbd_spec_fill_snap_id(rbd_dev);
5619 else
5620 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
5621 if (ret) {
5622 if (ret == -ENOENT)
5623 pr_info("snap %s/%s@%s does not exist\n",
5624 rbd_dev->spec->pool_name,
5625 rbd_dev->spec->image_name,
5626 rbd_dev->spec->snap_name);
33dca39f 5627 goto err_out_probe;
1fe48023 5628 }
9bb81c9b 5629
e8f59b59
ID
5630 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5631 ret = rbd_dev_v2_parent_info(rbd_dev);
5632 if (ret)
5633 goto err_out_probe;
5634
5635 /*
5636 * Need to warn users if this image is the one being
5637 * mapped and has a parent.
5638 */
6d69bb53 5639 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
5640 rbd_warn(rbd_dev,
5641 "WARNING: kernel layering is EXPERIMENTAL!");
5642 }
5643
6d69bb53 5644 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
5645 if (ret)
5646 goto err_out_probe;
5647
5648 dout("discovered format %u image, header name is %s\n",
c41d13a3 5649 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 5650 return 0;
e8f59b59 5651
6fd48b3b
AE
5652err_out_probe:
5653 rbd_dev_unprobe(rbd_dev);
b644de2b 5654err_out_watch:
6d69bb53 5655 if (!depth)
99d16943 5656 rbd_unregister_watch(rbd_dev);
332bb12d
AE
5657err_out_format:
5658 rbd_dev->image_format = 0;
5655c4d9
AE
5659 kfree(rbd_dev->spec->image_id);
5660 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
5661 return ret;
5662}
5663
9b60e70b
ID
5664static ssize_t do_rbd_add(struct bus_type *bus,
5665 const char *buf,
5666 size_t count)
602adf40 5667{
cb8627c7 5668 struct rbd_device *rbd_dev = NULL;
dc79b113 5669 struct ceph_options *ceph_opts = NULL;
4e9afeba 5670 struct rbd_options *rbd_opts = NULL;
859c31df 5671 struct rbd_spec *spec = NULL;
9d3997fd 5672 struct rbd_client *rbdc;
b51c83c2 5673 int rc;
602adf40
YS
5674
5675 if (!try_module_get(THIS_MODULE))
5676 return -ENODEV;
5677
602adf40 5678 /* parse add command */
859c31df 5679 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5680 if (rc < 0)
dd5ac32d 5681 goto out;
78cea76e 5682
9d3997fd
AE
5683 rbdc = rbd_get_client(ceph_opts);
5684 if (IS_ERR(rbdc)) {
5685 rc = PTR_ERR(rbdc);
0ddebc0c 5686 goto err_out_args;
9d3997fd 5687 }
602adf40 5688
602adf40 5689 /* pick the pool */
30ba1f02 5690 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
5691 if (rc < 0) {
5692 if (rc == -ENOENT)
5693 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 5694 goto err_out_client;
1fe48023 5695 }
c0cd10db 5696 spec->pool_id = (u64)rc;
859c31df 5697
d147543d 5698 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
5699 if (!rbd_dev) {
5700 rc = -ENOMEM;
bd4ba655 5701 goto err_out_client;
b51c83c2 5702 }
c53d5893
AE
5703 rbdc = NULL; /* rbd_dev now owns this */
5704 spec = NULL; /* rbd_dev now owns this */
d147543d 5705 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 5706
0d6d1e9c
MC
5707 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5708 if (!rbd_dev->config_info) {
5709 rc = -ENOMEM;
5710 goto err_out_rbd_dev;
5711 }
5712
811c6688 5713 down_write(&rbd_dev->header_rwsem);
6d69bb53 5714 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
5715 if (rc < 0) {
5716 up_write(&rbd_dev->header_rwsem);
c53d5893 5717 goto err_out_rbd_dev;
0d6d1e9c 5718 }
05fd6f6f 5719
7ce4eef7 5720 /* If we are mapping a snapshot it must be marked read-only */
7ce4eef7 5721 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9568c93e 5722 rbd_dev->opts->read_only = true;
7ce4eef7 5723
b536f69a 5724 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 5725 if (rc)
8b679ec5 5726 goto err_out_image_probe;
3abef3b3 5727
e010dd0a
ID
5728 if (rbd_dev->opts->exclusive) {
5729 rc = rbd_add_acquire_lock(rbd_dev);
5730 if (rc)
5731 goto err_out_device_setup;
3abef3b3
AE
5732 }
5733
5769ed0c
ID
5734 /* Everything's ready. Announce the disk to the world. */
5735
5736 rc = device_add(&rbd_dev->dev);
5737 if (rc)
e010dd0a 5738 goto err_out_image_lock;
5769ed0c
ID
5739
5740 add_disk(rbd_dev->disk);
5741 /* see rbd_init_disk() */
5742 blk_put_queue(rbd_dev->disk->queue);
5743
5744 spin_lock(&rbd_dev_list_lock);
5745 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5746 spin_unlock(&rbd_dev_list_lock);
5747
5748 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5749 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5750 rbd_dev->header.features);
dd5ac32d
ID
5751 rc = count;
5752out:
5753 module_put(THIS_MODULE);
5754 return rc;
b536f69a 5755
e010dd0a
ID
5756err_out_image_lock:
5757 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
5758err_out_device_setup:
5759 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
5760err_out_image_probe:
5761 rbd_dev_image_release(rbd_dev);
c53d5893
AE
5762err_out_rbd_dev:
5763 rbd_dev_destroy(rbd_dev);
bd4ba655 5764err_out_client:
9d3997fd 5765 rbd_put_client(rbdc);
0ddebc0c 5766err_out_args:
859c31df 5767 rbd_spec_put(spec);
d147543d 5768 kfree(rbd_opts);
dd5ac32d 5769 goto out;
602adf40
YS
5770}
5771
9b60e70b
ID
5772static ssize_t rbd_add(struct bus_type *bus,
5773 const char *buf,
5774 size_t count)
5775{
5776 if (single_major)
5777 return -EINVAL;
5778
5779 return do_rbd_add(bus, buf, count);
5780}
5781
5782static ssize_t rbd_add_single_major(struct bus_type *bus,
5783 const char *buf,
5784 size_t count)
5785{
5786 return do_rbd_add(bus, buf, count);
5787}
5788
05a46afd
AE
5789static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5790{
ad945fc1 5791 while (rbd_dev->parent) {
05a46afd
AE
5792 struct rbd_device *first = rbd_dev;
5793 struct rbd_device *second = first->parent;
5794 struct rbd_device *third;
5795
5796 /*
5797 * Follow to the parent with no grandparent and
5798 * remove it.
5799 */
5800 while (second && (third = second->parent)) {
5801 first = second;
5802 second = third;
5803 }
ad945fc1 5804 rbd_assert(second);
8ad42cd0 5805 rbd_dev_image_release(second);
8b679ec5 5806 rbd_dev_destroy(second);
ad945fc1
AE
5807 first->parent = NULL;
5808 first->parent_overlap = 0;
5809
5810 rbd_assert(first->parent_spec);
05a46afd
AE
5811 rbd_spec_put(first->parent_spec);
5812 first->parent_spec = NULL;
05a46afd
AE
5813 }
5814}
5815
9b60e70b
ID
5816static ssize_t do_rbd_remove(struct bus_type *bus,
5817 const char *buf,
5818 size_t count)
602adf40
YS
5819{
5820 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5821 struct list_head *tmp;
5822 int dev_id;
0276dca6 5823 char opt_buf[6];
82a442d2 5824 bool already = false;
0276dca6 5825 bool force = false;
0d8189e1 5826 int ret;
602adf40 5827
0276dca6
MC
5828 dev_id = -1;
5829 opt_buf[0] = '\0';
5830 sscanf(buf, "%d %5s", &dev_id, opt_buf);
5831 if (dev_id < 0) {
5832 pr_err("dev_id out of range\n");
602adf40 5833 return -EINVAL;
0276dca6
MC
5834 }
5835 if (opt_buf[0] != '\0') {
5836 if (!strcmp(opt_buf, "force")) {
5837 force = true;
5838 } else {
5839 pr_err("bad remove option at '%s'\n", opt_buf);
5840 return -EINVAL;
5841 }
5842 }
602adf40 5843
751cc0e3
AE
5844 ret = -ENOENT;
5845 spin_lock(&rbd_dev_list_lock);
5846 list_for_each(tmp, &rbd_dev_list) {
5847 rbd_dev = list_entry(tmp, struct rbd_device, node);
5848 if (rbd_dev->dev_id == dev_id) {
5849 ret = 0;
5850 break;
5851 }
42382b70 5852 }
751cc0e3
AE
5853 if (!ret) {
5854 spin_lock_irq(&rbd_dev->lock);
0276dca6 5855 if (rbd_dev->open_count && !force)
751cc0e3
AE
5856 ret = -EBUSY;
5857 else
82a442d2
AE
5858 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5859 &rbd_dev->flags);
751cc0e3
AE
5860 spin_unlock_irq(&rbd_dev->lock);
5861 }
5862 spin_unlock(&rbd_dev_list_lock);
82a442d2 5863 if (ret < 0 || already)
1ba0f1e7 5864 return ret;
751cc0e3 5865
0276dca6
MC
5866 if (force) {
5867 /*
5868 * Prevent new IO from being queued and wait for existing
5869 * IO to complete/fail.
5870 */
5871 blk_mq_freeze_queue(rbd_dev->disk->queue);
5872 blk_set_queue_dying(rbd_dev->disk->queue);
5873 }
5874
5769ed0c
ID
5875 del_gendisk(rbd_dev->disk);
5876 spin_lock(&rbd_dev_list_lock);
5877 list_del_init(&rbd_dev->node);
5878 spin_unlock(&rbd_dev_list_lock);
5879 device_del(&rbd_dev->dev);
fca27065 5880
e010dd0a 5881 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 5882 rbd_dev_device_release(rbd_dev);
8ad42cd0 5883 rbd_dev_image_release(rbd_dev);
8b679ec5 5884 rbd_dev_destroy(rbd_dev);
1ba0f1e7 5885 return count;
602adf40
YS
5886}
5887
9b60e70b
ID
5888static ssize_t rbd_remove(struct bus_type *bus,
5889 const char *buf,
5890 size_t count)
5891{
5892 if (single_major)
5893 return -EINVAL;
5894
5895 return do_rbd_remove(bus, buf, count);
5896}
5897
5898static ssize_t rbd_remove_single_major(struct bus_type *bus,
5899 const char *buf,
5900 size_t count)
5901{
5902 return do_rbd_remove(bus, buf, count);
5903}
5904
602adf40
YS
5905/*
5906 * create control files in sysfs
dfc5606d 5907 * /sys/bus/rbd/...
602adf40
YS
5908 */
5909static int rbd_sysfs_init(void)
5910{
dfc5606d 5911 int ret;
602adf40 5912
fed4c143 5913 ret = device_register(&rbd_root_dev);
21079786 5914 if (ret < 0)
dfc5606d 5915 return ret;
602adf40 5916
fed4c143
AE
5917 ret = bus_register(&rbd_bus_type);
5918 if (ret < 0)
5919 device_unregister(&rbd_root_dev);
602adf40 5920
602adf40
YS
5921 return ret;
5922}
5923
5924static void rbd_sysfs_cleanup(void)
5925{
dfc5606d 5926 bus_unregister(&rbd_bus_type);
fed4c143 5927 device_unregister(&rbd_root_dev);
602adf40
YS
5928}
5929
1c2a9dfe
AE
5930static int rbd_slab_init(void)
5931{
5932 rbd_assert(!rbd_img_request_cache);
03d94406 5933 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
5934 if (!rbd_img_request_cache)
5935 return -ENOMEM;
5936
5937 rbd_assert(!rbd_obj_request_cache);
03d94406 5938 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
5939 if (!rbd_obj_request_cache)
5940 goto out_err;
5941
6c696d85 5942 return 0;
1c2a9dfe 5943
6c696d85 5944out_err:
868311b1
AE
5945 kmem_cache_destroy(rbd_img_request_cache);
5946 rbd_img_request_cache = NULL;
1c2a9dfe
AE
5947 return -ENOMEM;
5948}
5949
5950static void rbd_slab_exit(void)
5951{
868311b1
AE
5952 rbd_assert(rbd_obj_request_cache);
5953 kmem_cache_destroy(rbd_obj_request_cache);
5954 rbd_obj_request_cache = NULL;
5955
1c2a9dfe
AE
5956 rbd_assert(rbd_img_request_cache);
5957 kmem_cache_destroy(rbd_img_request_cache);
5958 rbd_img_request_cache = NULL;
5959}
5960
cc344fa1 5961static int __init rbd_init(void)
602adf40
YS
5962{
5963 int rc;
5964
1e32d34c
AE
5965 if (!libceph_compatible(NULL)) {
5966 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5967 return -EINVAL;
5968 }
e1b4d96d 5969
1c2a9dfe 5970 rc = rbd_slab_init();
602adf40
YS
5971 if (rc)
5972 return rc;
e1b4d96d 5973
f5ee37bd
ID
5974 /*
5975 * The number of active work items is limited by the number of
f77303bd 5976 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
5977 */
5978 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5979 if (!rbd_wq) {
5980 rc = -ENOMEM;
5981 goto err_out_slab;
5982 }
5983
9b60e70b
ID
5984 if (single_major) {
5985 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5986 if (rbd_major < 0) {
5987 rc = rbd_major;
f5ee37bd 5988 goto err_out_wq;
9b60e70b
ID
5989 }
5990 }
5991
1c2a9dfe
AE
5992 rc = rbd_sysfs_init();
5993 if (rc)
9b60e70b
ID
5994 goto err_out_blkdev;
5995
5996 if (single_major)
5997 pr_info("loaded (major %d)\n", rbd_major);
5998 else
5999 pr_info("loaded\n");
1c2a9dfe 6000
e1b4d96d
ID
6001 return 0;
6002
9b60e70b
ID
6003err_out_blkdev:
6004 if (single_major)
6005 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6006err_out_wq:
6007 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6008err_out_slab:
6009 rbd_slab_exit();
1c2a9dfe 6010 return rc;
602adf40
YS
6011}
6012
cc344fa1 6013static void __exit rbd_exit(void)
602adf40 6014{
ffe312cf 6015 ida_destroy(&rbd_dev_id_ida);
602adf40 6016 rbd_sysfs_cleanup();
9b60e70b
ID
6017 if (single_major)
6018 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6019 destroy_workqueue(rbd_wq);
1c2a9dfe 6020 rbd_slab_exit();
602adf40
YS
6021}
6022
6023module_init(rbd_init);
6024module_exit(rbd_exit);
6025
d552c619 6026MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6027MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6028MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6029/* following authorship retained from original osdblk.c */
6030MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6031
90da258b 6032MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6033MODULE_LICENSE("GPL");