rbd: kill rbd_image_header->total_snaps
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
df111be6
AE
55/* It might be useful to have this defined elsewhere too */
56
57#define U64_MAX ((u64) (~0ULL))
58
f0f8cef5
AE
59#define RBD_DRV_NAME "rbd"
60#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
61
62#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63
602adf40
YS
64#define RBD_MAX_SNAP_NAME_LEN 32
65#define RBD_MAX_OPT_LEN 1024
66
67#define RBD_SNAP_HEAD_NAME "-"
68
81a89793
AE
69/*
70 * An RBD device name will be "rbd#", where the "rbd" comes from
71 * RBD_DRV_NAME above, and # is a unique integer identifier.
72 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73 * enough to hold all possible device names.
74 */
602adf40 75#define DEV_NAME_LEN 32
81a89793 76#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 77
cc0538b6 78#define RBD_READ_ONLY_DEFAULT false
59c2be1e 79
602adf40
YS
80/*
81 * block device image metadata (in-memory version)
82 */
83struct rbd_image_header {
84 u64 image_size;
849b4260 85 char *object_prefix;
602adf40
YS
86 __u8 obj_order;
87 __u8 crypt_type;
88 __u8 comp_type;
602adf40 89 struct ceph_snap_context *snapc;
602adf40
YS
90
91 char *snap_names;
92 u64 *snap_sizes;
59c2be1e
YS
93
94 u64 obj_version;
95};
96
97struct rbd_options {
cc0538b6 98 bool read_only;
602adf40
YS
99};
100
101/*
f0f8cef5 102 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
103 */
104struct rbd_client {
105 struct ceph_client *client;
106 struct kref kref;
107 struct list_head node;
108};
109
110/*
f0f8cef5 111 * a request completion status
602adf40 112 */
1fec7093
YS
113struct rbd_req_status {
114 int done;
115 int rc;
116 u64 bytes;
117};
118
119/*
120 * a collection of requests
121 */
122struct rbd_req_coll {
123 int total;
124 int num_done;
125 struct kref kref;
126 struct rbd_req_status status[0];
602adf40
YS
127};
128
f0f8cef5
AE
129/*
130 * a single io request
131 */
132struct rbd_request {
133 struct request *rq; /* blk layer request */
134 struct bio *bio; /* cloned bio */
135 struct page **pages; /* list of used pages */
136 u64 len;
137 int coll_index;
138 struct rbd_req_coll *coll;
139};
140
dfc5606d
YS
141struct rbd_snap {
142 struct device dev;
143 const char *name;
3591538f 144 u64 size;
dfc5606d
YS
145 struct list_head node;
146 u64 id;
147};
148
602adf40
YS
149/*
150 * a single device
151 */
152struct rbd_device {
de71a297 153 int dev_id; /* blkdev unique id */
602adf40
YS
154
155 int major; /* blkdev assigned major */
156 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 157
f8c38929 158 struct rbd_options rbd_opts;
602adf40
YS
159 struct rbd_client *rbd_client;
160
161 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
162
163 spinlock_t lock; /* queue lock */
164
165 struct rbd_image_header header;
0bed54dc
AE
166 char *image_name;
167 size_t image_name_len;
168 char *header_name;
d22f76e7 169 char *pool_name;
9bb2f334 170 int pool_id;
602adf40 171
59c2be1e
YS
172 struct ceph_osd_event *watch_event;
173 struct ceph_osd_request *watch_request;
174
c666601a
JD
175 /* protects updating the header */
176 struct rw_semaphore header_rwsem;
e88a36ec 177 /* name of the snapshot this device reads from */
820a5f3e 178 char *snap_name;
e88a36ec 179 /* id of the snapshot this device reads from */
77dfe99f 180 u64 snap_id; /* current snapshot id */
e88a36ec
JD
181 /* whether the snap_id this device reads from still exists */
182 bool snap_exists;
cc0538b6 183 bool read_only;
602adf40
YS
184
185 struct list_head node;
dfc5606d
YS
186
187 /* list of snapshots */
188 struct list_head snaps;
189
190 /* sysfs related */
191 struct device dev;
192};
193
602adf40 194static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 195
602adf40 196static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
197static DEFINE_SPINLOCK(rbd_dev_list_lock);
198
432b8587
AE
199static LIST_HEAD(rbd_client_list); /* clients */
200static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 201
9fcbb800 202static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev);
dfc5606d 203static void rbd_dev_release(struct device *dev);
dfc5606d
YS
204static ssize_t rbd_snap_add(struct device *dev,
205 struct device_attribute *attr,
206 const char *buf,
207 size_t count);
14e7085d 208static void __rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 209
f0f8cef5
AE
210static ssize_t rbd_add(struct bus_type *bus, const char *buf,
211 size_t count);
212static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
213 size_t count);
214
215static struct bus_attribute rbd_bus_attrs[] = {
216 __ATTR(add, S_IWUSR, NULL, rbd_add),
217 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 __ATTR_NULL
219};
220
221static struct bus_type rbd_bus_type = {
222 .name = "rbd",
223 .bus_attrs = rbd_bus_attrs,
224};
225
226static void rbd_root_dev_release(struct device *dev)
227{
228}
229
230static struct device rbd_root_dev = {
231 .init_name = "rbd",
232 .release = rbd_root_dev_release,
233};
234
aafb230e
AE
235#ifdef RBD_DEBUG
236#define rbd_assert(expr) \
237 if (unlikely(!(expr))) { \
238 printk(KERN_ERR "\nAssertion failure in %s() " \
239 "at line %d:\n\n" \
240 "\trbd_assert(%s);\n\n", \
241 __func__, __LINE__, #expr); \
242 BUG(); \
243 }
244#else /* !RBD_DEBUG */
245# define rbd_assert(expr) ((void) 0)
246#endif /* !RBD_DEBUG */
dfc5606d 247
dfc5606d
YS
248static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
249{
250 return get_device(&rbd_dev->dev);
251}
252
253static void rbd_put_dev(struct rbd_device *rbd_dev)
254{
255 put_device(&rbd_dev->dev);
256}
602adf40 257
1fe5e993 258static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 259
602adf40
YS
260static int rbd_open(struct block_device *bdev, fmode_t mode)
261{
f0f8cef5 262 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 263
602adf40
YS
264 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
265 return -EROFS;
266
340c7a2b
AE
267 rbd_get_dev(rbd_dev);
268 set_device_ro(bdev, rbd_dev->read_only);
269
602adf40
YS
270 return 0;
271}
272
dfc5606d
YS
273static int rbd_release(struct gendisk *disk, fmode_t mode)
274{
275 struct rbd_device *rbd_dev = disk->private_data;
276
277 rbd_put_dev(rbd_dev);
278
279 return 0;
280}
281
602adf40
YS
282static const struct block_device_operations rbd_bd_ops = {
283 .owner = THIS_MODULE,
284 .open = rbd_open,
dfc5606d 285 .release = rbd_release,
602adf40
YS
286};
287
288/*
289 * Initialize an rbd client instance.
43ae4701 290 * We own *ceph_opts.
602adf40 291 */
f8c38929 292static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
293{
294 struct rbd_client *rbdc;
295 int ret = -ENOMEM;
296
297 dout("rbd_client_create\n");
298 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
299 if (!rbdc)
300 goto out_opt;
301
302 kref_init(&rbdc->kref);
303 INIT_LIST_HEAD(&rbdc->node);
304
bc534d86
AE
305 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
306
43ae4701 307 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 308 if (IS_ERR(rbdc->client))
bc534d86 309 goto out_mutex;
43ae4701 310 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
311
312 ret = ceph_open_session(rbdc->client);
313 if (ret < 0)
314 goto out_err;
315
432b8587 316 spin_lock(&rbd_client_list_lock);
602adf40 317 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 318 spin_unlock(&rbd_client_list_lock);
602adf40 319
bc534d86
AE
320 mutex_unlock(&ctl_mutex);
321
602adf40
YS
322 dout("rbd_client_create created %p\n", rbdc);
323 return rbdc;
324
325out_err:
326 ceph_destroy_client(rbdc->client);
bc534d86
AE
327out_mutex:
328 mutex_unlock(&ctl_mutex);
602adf40
YS
329 kfree(rbdc);
330out_opt:
43ae4701
AE
331 if (ceph_opts)
332 ceph_destroy_options(ceph_opts);
28f259b7 333 return ERR_PTR(ret);
602adf40
YS
334}
335
336/*
1f7ba331
AE
337 * Find a ceph client with specific addr and configuration. If
338 * found, bump its reference count.
602adf40 339 */
1f7ba331 340static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
341{
342 struct rbd_client *client_node;
1f7ba331 343 bool found = false;
602adf40 344
43ae4701 345 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
346 return NULL;
347
1f7ba331
AE
348 spin_lock(&rbd_client_list_lock);
349 list_for_each_entry(client_node, &rbd_client_list, node) {
350 if (!ceph_compare_options(ceph_opts, client_node->client)) {
351 kref_get(&client_node->kref);
352 found = true;
353 break;
354 }
355 }
356 spin_unlock(&rbd_client_list_lock);
357
358 return found ? client_node : NULL;
602adf40
YS
359}
360
59c2be1e
YS
361/*
362 * mount options
363 */
364enum {
59c2be1e
YS
365 Opt_last_int,
366 /* int args above */
367 Opt_last_string,
368 /* string args above */
cc0538b6
AE
369 Opt_read_only,
370 Opt_read_write,
371 /* Boolean args above */
372 Opt_last_bool,
59c2be1e
YS
373};
374
43ae4701 375static match_table_t rbd_opts_tokens = {
59c2be1e
YS
376 /* int args above */
377 /* string args above */
cc0538b6
AE
378 {Opt_read_only, "read_only"},
379 {Opt_read_only, "ro"}, /* Alternate spelling */
380 {Opt_read_write, "read_write"},
381 {Opt_read_write, "rw"}, /* Alternate spelling */
382 /* Boolean args above */
59c2be1e
YS
383 {-1, NULL}
384};
385
386static int parse_rbd_opts_token(char *c, void *private)
387{
43ae4701 388 struct rbd_options *rbd_opts = private;
59c2be1e
YS
389 substring_t argstr[MAX_OPT_ARGS];
390 int token, intval, ret;
391
43ae4701 392 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
393 if (token < 0)
394 return -EINVAL;
395
396 if (token < Opt_last_int) {
397 ret = match_int(&argstr[0], &intval);
398 if (ret < 0) {
399 pr_err("bad mount option arg (not int) "
400 "at '%s'\n", c);
401 return ret;
402 }
403 dout("got int token %d val %d\n", token, intval);
404 } else if (token > Opt_last_int && token < Opt_last_string) {
405 dout("got string token %d val %s\n", token,
406 argstr[0].from);
cc0538b6
AE
407 } else if (token > Opt_last_string && token < Opt_last_bool) {
408 dout("got Boolean token %d\n", token);
59c2be1e
YS
409 } else {
410 dout("got token %d\n", token);
411 }
412
413 switch (token) {
cc0538b6
AE
414 case Opt_read_only:
415 rbd_opts->read_only = true;
416 break;
417 case Opt_read_write:
418 rbd_opts->read_only = false;
419 break;
59c2be1e 420 default:
aafb230e
AE
421 rbd_assert(false);
422 break;
59c2be1e
YS
423 }
424 return 0;
425}
426
602adf40
YS
427/*
428 * Get a ceph client with specific addr and configuration, if one does
429 * not exist create it.
430 */
f8c38929
AE
431static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
432 size_t mon_addr_len, char *options)
602adf40 433{
f8c38929 434 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
43ae4701 435 struct ceph_options *ceph_opts;
f8c38929 436 struct rbd_client *rbdc;
59c2be1e 437
cc0538b6 438 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
602adf40 439
43ae4701
AE
440 ceph_opts = ceph_parse_options(options, mon_addr,
441 mon_addr + mon_addr_len,
442 parse_rbd_opts_token, rbd_opts);
f8c38929
AE
443 if (IS_ERR(ceph_opts))
444 return PTR_ERR(ceph_opts);
602adf40 445
1f7ba331 446 rbdc = rbd_client_find(ceph_opts);
602adf40 447 if (rbdc) {
602adf40 448 /* using an existing client */
43ae4701 449 ceph_destroy_options(ceph_opts);
f8c38929
AE
450 } else {
451 rbdc = rbd_client_create(ceph_opts);
452 if (IS_ERR(rbdc))
453 return PTR_ERR(rbdc);
602adf40 454 }
f8c38929 455 rbd_dev->rbd_client = rbdc;
602adf40 456
f8c38929 457 return 0;
602adf40
YS
458}
459
460/*
461 * Destroy ceph client
d23a4b3f 462 *
432b8587 463 * Caller must hold rbd_client_list_lock.
602adf40
YS
464 */
465static void rbd_client_release(struct kref *kref)
466{
467 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
468
469 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 470 spin_lock(&rbd_client_list_lock);
602adf40 471 list_del(&rbdc->node);
cd9d9f5d 472 spin_unlock(&rbd_client_list_lock);
602adf40
YS
473
474 ceph_destroy_client(rbdc->client);
475 kfree(rbdc);
476}
477
478/*
479 * Drop reference to ceph client node. If it's not referenced anymore, release
480 * it.
481 */
482static void rbd_put_client(struct rbd_device *rbd_dev)
483{
484 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
485 rbd_dev->rbd_client = NULL;
602adf40
YS
486}
487
1fec7093
YS
488/*
489 * Destroy requests collection
490 */
491static void rbd_coll_release(struct kref *kref)
492{
493 struct rbd_req_coll *coll =
494 container_of(kref, struct rbd_req_coll, kref);
495
496 dout("rbd_coll_release %p\n", coll);
497 kfree(coll);
498}
602adf40 499
8e94af8e
AE
500static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
501{
103a150f
AE
502 size_t size;
503 u32 snap_count;
504
505 /* The header has to start with the magic rbd header text */
506 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
507 return false;
508
509 /*
510 * The size of a snapshot header has to fit in a size_t, and
511 * that limits the number of snapshots.
512 */
513 snap_count = le32_to_cpu(ondisk->snap_count);
514 size = SIZE_MAX - sizeof (struct ceph_snap_context);
515 if (snap_count > size / sizeof (__le64))
516 return false;
517
518 /*
519 * Not only that, but the size of the entire the snapshot
520 * header must also be representable in a size_t.
521 */
522 size -= snap_count * sizeof (__le64);
523 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
524 return false;
525
526 return true;
8e94af8e
AE
527}
528
602adf40
YS
529/*
530 * Create a new header structure, translate header format from the on-disk
531 * header.
532 */
533static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 534 struct rbd_image_header_ondisk *ondisk)
602adf40 535{
ccece235 536 u32 snap_count;
58c17b0e 537 size_t len;
d2bb24e5 538 size_t size;
621901d6 539 u32 i;
602adf40 540
6a52325f
AE
541 memset(header, 0, sizeof (*header));
542
103a150f
AE
543 snap_count = le32_to_cpu(ondisk->snap_count);
544
58c17b0e
AE
545 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
546 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 547 if (!header->object_prefix)
602adf40 548 return -ENOMEM;
58c17b0e
AE
549 memcpy(header->object_prefix, ondisk->object_prefix, len);
550 header->object_prefix[len] = '\0';
00f1f36f 551
602adf40 552 if (snap_count) {
f785cc1d
AE
553 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
554
621901d6
AE
555 /* Save a copy of the snapshot names */
556
f785cc1d
AE
557 if (snap_names_len > (u64) SIZE_MAX)
558 return -EIO;
559 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 560 if (!header->snap_names)
6a52325f 561 goto out_err;
f785cc1d
AE
562 /*
563 * Note that rbd_dev_v1_header_read() guarantees
564 * the ondisk buffer we're working with has
565 * snap_names_len bytes beyond the end of the
566 * snapshot id array, this memcpy() is safe.
567 */
568 memcpy(header->snap_names, &ondisk->snaps[snap_count],
569 snap_names_len);
6a52325f 570
621901d6
AE
571 /* Record each snapshot's size */
572
d2bb24e5
AE
573 size = snap_count * sizeof (*header->snap_sizes);
574 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 575 if (!header->snap_sizes)
6a52325f 576 goto out_err;
621901d6
AE
577 for (i = 0; i < snap_count; i++)
578 header->snap_sizes[i] =
579 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 580 } else {
ccece235 581 WARN_ON(ondisk->snap_names_len);
602adf40
YS
582 header->snap_names = NULL;
583 header->snap_sizes = NULL;
584 }
849b4260 585
602adf40
YS
586 header->image_size = le64_to_cpu(ondisk->image_size);
587 header->obj_order = ondisk->options.order;
588 header->crypt_type = ondisk->options.crypt_type;
589 header->comp_type = ondisk->options.comp_type;
6a52325f 590
621901d6
AE
591 /* Allocate and fill in the snapshot context */
592
6a52325f
AE
593 size = sizeof (struct ceph_snap_context);
594 size += snap_count * sizeof (header->snapc->snaps[0]);
595 header->snapc = kzalloc(size, GFP_KERNEL);
596 if (!header->snapc)
597 goto out_err;
602adf40
YS
598
599 atomic_set(&header->snapc->nref, 1);
505cbb9b 600 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 601 header->snapc->num_snaps = snap_count;
621901d6
AE
602 for (i = 0; i < snap_count; i++)
603 header->snapc->snaps[i] =
604 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
605
606 return 0;
607
6a52325f 608out_err:
849b4260 609 kfree(header->snap_sizes);
ccece235 610 header->snap_sizes = NULL;
602adf40 611 kfree(header->snap_names);
ccece235 612 header->snap_names = NULL;
6a52325f
AE
613 kfree(header->object_prefix);
614 header->object_prefix = NULL;
ccece235 615
00f1f36f 616 return -ENOMEM;
602adf40
YS
617}
618
602adf40
YS
619static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
620 u64 *seq, u64 *size)
621{
622 int i;
623 char *p = header->snap_names;
624
c9aadfe7
AE
625 rbd_assert(header->snapc != NULL);
626 for (i = 0; i < header->snapc->num_snaps; i++) {
00f1f36f 627 if (!strcmp(snap_name, p)) {
602adf40 628
00f1f36f 629 /* Found it. Pass back its id and/or size */
602adf40 630
00f1f36f
AE
631 if (seq)
632 *seq = header->snapc->snaps[i];
633 if (size)
634 *size = header->snap_sizes[i];
635 return i;
636 }
637 p += strlen(p) + 1; /* Skip ahead to the next name */
638 }
639 return -ENOENT;
602adf40
YS
640}
641
0ce1a794 642static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 643{
78dc447d 644 int ret;
602adf40 645
0ce1a794 646 down_write(&rbd_dev->header_rwsem);
602adf40 647
0ce1a794 648 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 649 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 650 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 651 rbd_dev->snap_exists = false;
cc0538b6 652 rbd_dev->read_only = rbd_dev->rbd_opts.read_only;
602adf40 653 if (size)
78dc447d 654 *size = rbd_dev->header.image_size;
602adf40 655 } else {
78dc447d
AE
656 u64 snap_id = 0;
657
658 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
659 &snap_id, size);
602adf40
YS
660 if (ret < 0)
661 goto done;
78dc447d 662 rbd_dev->snap_id = snap_id;
e88a36ec 663 rbd_dev->snap_exists = true;
cc0538b6 664 rbd_dev->read_only = true; /* No choice for snapshots */
602adf40
YS
665 }
666
667 ret = 0;
668done:
0ce1a794 669 up_write(&rbd_dev->header_rwsem);
602adf40
YS
670 return ret;
671}
672
673static void rbd_header_free(struct rbd_image_header *header)
674{
849b4260 675 kfree(header->object_prefix);
d78fd7ae 676 header->object_prefix = NULL;
602adf40 677 kfree(header->snap_sizes);
d78fd7ae 678 header->snap_sizes = NULL;
849b4260 679 kfree(header->snap_names);
d78fd7ae 680 header->snap_names = NULL;
d1d25646 681 ceph_put_snap_context(header->snapc);
d78fd7ae 682 header->snapc = NULL;
602adf40
YS
683}
684
65ccfe21 685static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 686{
65ccfe21
AE
687 char *name;
688 u64 segment;
689 int ret;
602adf40 690
65ccfe21
AE
691 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
692 if (!name)
693 return NULL;
694 segment = offset >> rbd_dev->header.obj_order;
695 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
696 rbd_dev->header.object_prefix, segment);
697 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
698 pr_err("error formatting segment name for #%llu (%d)\n",
699 segment, ret);
700 kfree(name);
701 name = NULL;
702 }
602adf40 703
65ccfe21
AE
704 return name;
705}
602adf40 706
65ccfe21
AE
707static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
708{
709 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 710
65ccfe21
AE
711 return offset & (segment_size - 1);
712}
713
714static u64 rbd_segment_length(struct rbd_device *rbd_dev,
715 u64 offset, u64 length)
716{
717 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
718
719 offset &= segment_size - 1;
720
aafb230e 721 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
722 if (offset + length > segment_size)
723 length = segment_size - offset;
724
725 return length;
602adf40
YS
726}
727
1fec7093
YS
728static int rbd_get_num_segments(struct rbd_image_header *header,
729 u64 ofs, u64 len)
730{
df111be6
AE
731 u64 start_seg;
732 u64 end_seg;
733
734 if (!len)
735 return 0;
736 if (len - 1 > U64_MAX - ofs)
737 return -ERANGE;
738
739 start_seg = ofs >> header->obj_order;
740 end_seg = (ofs + len - 1) >> header->obj_order;
741
1fec7093
YS
742 return end_seg - start_seg + 1;
743}
744
029bcbd8
JD
745/*
746 * returns the size of an object in the image
747 */
748static u64 rbd_obj_bytes(struct rbd_image_header *header)
749{
750 return 1 << header->obj_order;
751}
752
602adf40
YS
753/*
754 * bio helpers
755 */
756
757static void bio_chain_put(struct bio *chain)
758{
759 struct bio *tmp;
760
761 while (chain) {
762 tmp = chain;
763 chain = chain->bi_next;
764 bio_put(tmp);
765 }
766}
767
768/*
769 * zeros a bio chain, starting at specific offset
770 */
771static void zero_bio_chain(struct bio *chain, int start_ofs)
772{
773 struct bio_vec *bv;
774 unsigned long flags;
775 void *buf;
776 int i;
777 int pos = 0;
778
779 while (chain) {
780 bio_for_each_segment(bv, chain, i) {
781 if (pos + bv->bv_len > start_ofs) {
782 int remainder = max(start_ofs - pos, 0);
783 buf = bvec_kmap_irq(bv, &flags);
784 memset(buf + remainder, 0,
785 bv->bv_len - remainder);
85b5aaa6 786 bvec_kunmap_irq(buf, &flags);
602adf40
YS
787 }
788 pos += bv->bv_len;
789 }
790
791 chain = chain->bi_next;
792 }
793}
794
795/*
796 * bio_chain_clone - clone a chain of bios up to a certain length.
797 * might return a bio_pair that will need to be released.
798 */
799static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
800 struct bio_pair **bp,
801 int len, gfp_t gfpmask)
802{
542582fc
AE
803 struct bio *old_chain = *old;
804 struct bio *new_chain = NULL;
805 struct bio *tail;
602adf40
YS
806 int total = 0;
807
808 if (*bp) {
809 bio_pair_release(*bp);
810 *bp = NULL;
811 }
812
813 while (old_chain && (total < len)) {
542582fc
AE
814 struct bio *tmp;
815
602adf40
YS
816 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
817 if (!tmp)
818 goto err_out;
542582fc 819 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
602adf40
YS
820
821 if (total + old_chain->bi_size > len) {
822 struct bio_pair *bp;
823
824 /*
825 * this split can only happen with a single paged bio,
826 * split_bio will BUG_ON if this is not the case
827 */
828 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
829 "bi_size=%u\n",
830 total, len - total, old_chain->bi_size);
602adf40
YS
831
832 /* split the bio. We'll release it either in the next
833 call, or it will have to be released outside */
593a9e7b 834 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
835 if (!bp)
836 goto err_out;
837
838 __bio_clone(tmp, &bp->bio1);
839
840 *next = &bp->bio2;
841 } else {
842 __bio_clone(tmp, old_chain);
843 *next = old_chain->bi_next;
844 }
845
846 tmp->bi_bdev = NULL;
602adf40 847 tmp->bi_next = NULL;
542582fc 848 if (new_chain)
602adf40 849 tail->bi_next = tmp;
542582fc
AE
850 else
851 new_chain = tmp;
852 tail = tmp;
602adf40
YS
853 old_chain = old_chain->bi_next;
854
855 total += tmp->bi_size;
856 }
857
aafb230e 858 rbd_assert(total == len);
602adf40 859
602adf40
YS
860 *old = old_chain;
861
862 return new_chain;
863
864err_out:
865 dout("bio_chain_clone with err\n");
866 bio_chain_put(new_chain);
867 return NULL;
868}
869
870/*
871 * helpers for osd request op vectors.
872 */
57cfc106
AE
873static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
874 int opcode, u32 payload_len)
602adf40 875{
57cfc106
AE
876 struct ceph_osd_req_op *ops;
877
878 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
879 if (!ops)
880 return NULL;
881
882 ops[0].op = opcode;
883
602adf40
YS
884 /*
885 * op extent offset and length will be set later on
886 * in calc_raw_layout()
887 */
57cfc106
AE
888 ops[0].payload_len = payload_len;
889
890 return ops;
602adf40
YS
891}
892
893static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
894{
895 kfree(ops);
896}
897
1fec7093
YS
898static void rbd_coll_end_req_index(struct request *rq,
899 struct rbd_req_coll *coll,
900 int index,
901 int ret, u64 len)
902{
903 struct request_queue *q;
904 int min, max, i;
905
bd919d45
AE
906 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
907 coll, index, ret, (unsigned long long) len);
1fec7093
YS
908
909 if (!rq)
910 return;
911
912 if (!coll) {
913 blk_end_request(rq, ret, len);
914 return;
915 }
916
917 q = rq->q;
918
919 spin_lock_irq(q->queue_lock);
920 coll->status[index].done = 1;
921 coll->status[index].rc = ret;
922 coll->status[index].bytes = len;
923 max = min = coll->num_done;
924 while (max < coll->total && coll->status[max].done)
925 max++;
926
927 for (i = min; i<max; i++) {
928 __blk_end_request(rq, coll->status[i].rc,
929 coll->status[i].bytes);
930 coll->num_done++;
931 kref_put(&coll->kref, rbd_coll_release);
932 }
933 spin_unlock_irq(q->queue_lock);
934}
935
936static void rbd_coll_end_req(struct rbd_request *req,
937 int ret, u64 len)
938{
939 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
940}
941
602adf40
YS
942/*
943 * Send ceph osd request
944 */
945static int rbd_do_request(struct request *rq,
0ce1a794 946 struct rbd_device *rbd_dev,
602adf40
YS
947 struct ceph_snap_context *snapc,
948 u64 snapid,
aded07ea 949 const char *object_name, u64 ofs, u64 len,
602adf40
YS
950 struct bio *bio,
951 struct page **pages,
952 int num_pages,
953 int flags,
954 struct ceph_osd_req_op *ops,
1fec7093
YS
955 struct rbd_req_coll *coll,
956 int coll_index,
602adf40 957 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
958 struct ceph_msg *msg),
959 struct ceph_osd_request **linger_req,
960 u64 *ver)
602adf40
YS
961{
962 struct ceph_osd_request *req;
963 struct ceph_file_layout *layout;
964 int ret;
965 u64 bno;
966 struct timespec mtime = CURRENT_TIME;
967 struct rbd_request *req_data;
968 struct ceph_osd_request_head *reqhead;
1dbb4399 969 struct ceph_osd_client *osdc;
602adf40 970
602adf40 971 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
972 if (!req_data) {
973 if (coll)
974 rbd_coll_end_req_index(rq, coll, coll_index,
975 -ENOMEM, len);
976 return -ENOMEM;
977 }
978
979 if (coll) {
980 req_data->coll = coll;
981 req_data->coll_index = coll_index;
982 }
602adf40 983
bd919d45
AE
984 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
985 (unsigned long long) ofs, (unsigned long long) len);
602adf40 986
0ce1a794 987 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
988 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
989 false, GFP_NOIO, pages, bio);
4ad12621 990 if (!req) {
4ad12621 991 ret = -ENOMEM;
602adf40
YS
992 goto done_pages;
993 }
994
995 req->r_callback = rbd_cb;
996
997 req_data->rq = rq;
998 req_data->bio = bio;
999 req_data->pages = pages;
1000 req_data->len = len;
1001
1002 req->r_priv = req_data;
1003
1004 reqhead = req->r_request->front.iov_base;
1005 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1006
aded07ea 1007 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
1008 req->r_oid_len = strlen(req->r_oid);
1009
1010 layout = &req->r_file_layout;
1011 memset(layout, 0, sizeof(*layout));
1012 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1013 layout->fl_stripe_count = cpu_to_le32(1);
1014 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 1015 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
1016 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1017 req, ops);
602adf40
YS
1018
1019 ceph_osdc_build_request(req, ofs, &len,
1020 ops,
1021 snapc,
1022 &mtime,
1023 req->r_oid, req->r_oid_len);
602adf40 1024
59c2be1e 1025 if (linger_req) {
1dbb4399 1026 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
1027 *linger_req = req;
1028 }
1029
1dbb4399 1030 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
1031 if (ret < 0)
1032 goto done_err;
1033
1034 if (!rbd_cb) {
1dbb4399 1035 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
1036 if (ver)
1037 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
1038 dout("reassert_ver=%llu\n",
1039 (unsigned long long)
1040 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
1041 ceph_osdc_put_request(req);
1042 }
1043 return ret;
1044
1045done_err:
1046 bio_chain_put(req_data->bio);
1047 ceph_osdc_put_request(req);
1048done_pages:
1fec7093 1049 rbd_coll_end_req(req_data, ret, len);
602adf40 1050 kfree(req_data);
602adf40
YS
1051 return ret;
1052}
1053
1054/*
1055 * Ceph osd op callback
1056 */
1057static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1058{
1059 struct rbd_request *req_data = req->r_priv;
1060 struct ceph_osd_reply_head *replyhead;
1061 struct ceph_osd_op *op;
1062 __s32 rc;
1063 u64 bytes;
1064 int read_op;
1065
1066 /* parse reply */
1067 replyhead = msg->front.iov_base;
1068 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1069 op = (void *)(replyhead + 1);
1070 rc = le32_to_cpu(replyhead->result);
1071 bytes = le64_to_cpu(op->extent.length);
895cfcc8 1072 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 1073
bd919d45
AE
1074 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1075 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
1076
1077 if (rc == -ENOENT && read_op) {
1078 zero_bio_chain(req_data->bio, 0);
1079 rc = 0;
1080 } else if (rc == 0 && read_op && bytes < req_data->len) {
1081 zero_bio_chain(req_data->bio, bytes);
1082 bytes = req_data->len;
1083 }
1084
1fec7093 1085 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1086
1087 if (req_data->bio)
1088 bio_chain_put(req_data->bio);
1089
1090 ceph_osdc_put_request(req);
1091 kfree(req_data);
1092}
1093
59c2be1e
YS
1094static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1095{
1096 ceph_osdc_put_request(req);
1097}
1098
602adf40
YS
1099/*
1100 * Do a synchronous ceph osd operation
1101 */
0ce1a794 1102static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1103 struct ceph_snap_context *snapc,
1104 u64 snapid,
602adf40 1105 int flags,
913d2fdc 1106 struct ceph_osd_req_op *ops,
aded07ea 1107 const char *object_name,
602adf40 1108 u64 ofs, u64 len,
59c2be1e
YS
1109 char *buf,
1110 struct ceph_osd_request **linger_req,
1111 u64 *ver)
602adf40
YS
1112{
1113 int ret;
1114 struct page **pages;
1115 int num_pages;
913d2fdc 1116
aafb230e 1117 rbd_assert(ops != NULL);
602adf40
YS
1118
1119 num_pages = calc_pages_for(ofs , len);
1120 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1121 if (IS_ERR(pages))
1122 return PTR_ERR(pages);
602adf40 1123
0ce1a794 1124 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1125 object_name, ofs, len, NULL,
602adf40
YS
1126 pages, num_pages,
1127 flags,
1128 ops,
1fec7093 1129 NULL, 0,
59c2be1e
YS
1130 NULL,
1131 linger_req, ver);
602adf40 1132 if (ret < 0)
913d2fdc 1133 goto done;
602adf40
YS
1134
1135 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1136 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1137
602adf40
YS
1138done:
1139 ceph_release_page_vector(pages, num_pages);
1140 return ret;
1141}
1142
1143/*
1144 * Do an asynchronous ceph osd operation
1145 */
1146static int rbd_do_op(struct request *rq,
0ce1a794 1147 struct rbd_device *rbd_dev,
602adf40
YS
1148 struct ceph_snap_context *snapc,
1149 u64 snapid,
d1f57ea6 1150 int opcode, int flags,
602adf40 1151 u64 ofs, u64 len,
1fec7093
YS
1152 struct bio *bio,
1153 struct rbd_req_coll *coll,
1154 int coll_index)
602adf40
YS
1155{
1156 char *seg_name;
1157 u64 seg_ofs;
1158 u64 seg_len;
1159 int ret;
1160 struct ceph_osd_req_op *ops;
1161 u32 payload_len;
1162
65ccfe21 1163 seg_name = rbd_segment_name(rbd_dev, ofs);
602adf40
YS
1164 if (!seg_name)
1165 return -ENOMEM;
65ccfe21
AE
1166 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1167 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
602adf40
YS
1168
1169 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1170
57cfc106
AE
1171 ret = -ENOMEM;
1172 ops = rbd_create_rw_ops(1, opcode, payload_len);
1173 if (!ops)
602adf40
YS
1174 goto done;
1175
1176 /* we've taken care of segment sizes earlier when we
1177 cloned the bios. We should never have a segment
1178 truncated at this point */
aafb230e 1179 rbd_assert(seg_len == len);
602adf40
YS
1180
1181 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1182 seg_name, seg_ofs, seg_len,
1183 bio,
1184 NULL, 0,
1185 flags,
1186 ops,
1fec7093 1187 coll, coll_index,
59c2be1e 1188 rbd_req_cb, 0, NULL);
11f77002
SW
1189
1190 rbd_destroy_ops(ops);
602adf40
YS
1191done:
1192 kfree(seg_name);
1193 return ret;
1194}
1195
1196/*
1197 * Request async osd write
1198 */
1199static int rbd_req_write(struct request *rq,
1200 struct rbd_device *rbd_dev,
1201 struct ceph_snap_context *snapc,
1202 u64 ofs, u64 len,
1fec7093
YS
1203 struct bio *bio,
1204 struct rbd_req_coll *coll,
1205 int coll_index)
602adf40
YS
1206{
1207 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1208 CEPH_OSD_OP_WRITE,
1209 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1210 ofs, len, bio, coll, coll_index);
602adf40
YS
1211}
1212
1213/*
1214 * Request async osd read
1215 */
1216static int rbd_req_read(struct request *rq,
1217 struct rbd_device *rbd_dev,
1218 u64 snapid,
1219 u64 ofs, u64 len,
1fec7093
YS
1220 struct bio *bio,
1221 struct rbd_req_coll *coll,
1222 int coll_index)
602adf40
YS
1223{
1224 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1225 snapid,
602adf40
YS
1226 CEPH_OSD_OP_READ,
1227 CEPH_OSD_FLAG_READ,
1fec7093 1228 ofs, len, bio, coll, coll_index);
602adf40
YS
1229}
1230
1231/*
1232 * Request sync osd read
1233 */
0ce1a794 1234static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40 1235 u64 snapid,
aded07ea 1236 const char *object_name,
602adf40 1237 u64 ofs, u64 len,
59c2be1e
YS
1238 char *buf,
1239 u64 *ver)
602adf40 1240{
913d2fdc
AE
1241 struct ceph_osd_req_op *ops;
1242 int ret;
1243
1244 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1245 if (!ops)
1246 return -ENOMEM;
1247
1248 ret = rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1249 snapid,
602adf40 1250 CEPH_OSD_FLAG_READ,
913d2fdc
AE
1251 ops, object_name, ofs, len, buf, NULL, ver);
1252 rbd_destroy_ops(ops);
1253
1254 return ret;
602adf40
YS
1255}
1256
1257/*
59c2be1e
YS
1258 * Request sync osd watch
1259 */
0ce1a794 1260static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1261 u64 ver,
7f0a24d8 1262 u64 notify_id)
59c2be1e
YS
1263{
1264 struct ceph_osd_req_op *ops;
11f77002
SW
1265 int ret;
1266
57cfc106
AE
1267 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1268 if (!ops)
1269 return -ENOMEM;
59c2be1e 1270
a71b891b 1271 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1272 ops[0].watch.cookie = notify_id;
1273 ops[0].watch.flag = 0;
1274
0ce1a794 1275 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1276 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1277 NULL, 0,
59c2be1e
YS
1278 CEPH_OSD_FLAG_READ,
1279 ops,
1fec7093 1280 NULL, 0,
59c2be1e
YS
1281 rbd_simple_req_cb, 0, NULL);
1282
1283 rbd_destroy_ops(ops);
1284 return ret;
1285}
1286
1287static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1288{
0ce1a794 1289 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1290 u64 hver;
13143d2d
SW
1291 int rc;
1292
0ce1a794 1293 if (!rbd_dev)
59c2be1e
YS
1294 return;
1295
bd919d45
AE
1296 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1297 rbd_dev->header_name, (unsigned long long) notify_id,
1298 (unsigned int) opcode);
1fe5e993 1299 rc = rbd_refresh_header(rbd_dev, &hver);
13143d2d 1300 if (rc)
f0f8cef5 1301 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1302 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1303
7f0a24d8 1304 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1305}
1306
1307/*
1308 * Request sync osd watch
1309 */
0e6f322d 1310static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
59c2be1e
YS
1311{
1312 struct ceph_osd_req_op *ops;
0ce1a794 1313 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
57cfc106 1314 int ret;
59c2be1e 1315
57cfc106
AE
1316 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1317 if (!ops)
1318 return -ENOMEM;
59c2be1e
YS
1319
1320 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1321 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1322 if (ret < 0)
1323 goto fail;
1324
0e6f322d 1325 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
0ce1a794 1326 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1327 ops[0].watch.flag = 1;
1328
0ce1a794 1329 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1330 CEPH_NOSNAP,
59c2be1e
YS
1331 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1332 ops,
0e6f322d
AE
1333 rbd_dev->header_name,
1334 0, 0, NULL,
0ce1a794 1335 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1336
1337 if (ret < 0)
1338 goto fail_event;
1339
1340 rbd_destroy_ops(ops);
1341 return 0;
1342
1343fail_event:
0ce1a794
AE
1344 ceph_osdc_cancel_event(rbd_dev->watch_event);
1345 rbd_dev->watch_event = NULL;
59c2be1e
YS
1346fail:
1347 rbd_destroy_ops(ops);
1348 return ret;
1349}
1350
79e3057c
YS
1351/*
1352 * Request sync osd unwatch
1353 */
070c633f 1354static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
79e3057c
YS
1355{
1356 struct ceph_osd_req_op *ops;
57cfc106 1357 int ret;
79e3057c 1358
57cfc106
AE
1359 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1360 if (!ops)
1361 return -ENOMEM;
79e3057c
YS
1362
1363 ops[0].watch.ver = 0;
0ce1a794 1364 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1365 ops[0].watch.flag = 0;
1366
0ce1a794 1367 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c 1368 CEPH_NOSNAP,
79e3057c
YS
1369 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1370 ops,
070c633f
AE
1371 rbd_dev->header_name,
1372 0, 0, NULL, NULL, NULL);
1373
79e3057c
YS
1374
1375 rbd_destroy_ops(ops);
0ce1a794
AE
1376 ceph_osdc_cancel_event(rbd_dev->watch_event);
1377 rbd_dev->watch_event = NULL;
79e3057c
YS
1378 return ret;
1379}
1380
59c2be1e 1381struct rbd_notify_info {
0ce1a794 1382 struct rbd_device *rbd_dev;
59c2be1e
YS
1383};
1384
1385static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1386{
0ce1a794
AE
1387 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1388 if (!rbd_dev)
59c2be1e
YS
1389 return;
1390
bd919d45
AE
1391 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1392 rbd_dev->header_name, (unsigned long long) notify_id,
1393 (unsigned int) opcode);
59c2be1e
YS
1394}
1395
1396/*
1397 * Request sync osd notify
1398 */
4cb16250 1399static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
59c2be1e
YS
1400{
1401 struct ceph_osd_req_op *ops;
0ce1a794 1402 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1403 struct ceph_osd_event *event;
1404 struct rbd_notify_info info;
1405 int payload_len = sizeof(u32) + sizeof(u32);
1406 int ret;
1407
57cfc106
AE
1408 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1409 if (!ops)
1410 return -ENOMEM;
59c2be1e 1411
0ce1a794 1412 info.rbd_dev = rbd_dev;
59c2be1e
YS
1413
1414 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1415 (void *)&info, &event);
1416 if (ret < 0)
1417 goto fail;
1418
1419 ops[0].watch.ver = 1;
1420 ops[0].watch.flag = 1;
1421 ops[0].watch.cookie = event->cookie;
1422 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1423 ops[0].watch.timeout = 12;
1424
0ce1a794 1425 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1426 CEPH_NOSNAP,
59c2be1e
YS
1427 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1428 ops,
4cb16250
AE
1429 rbd_dev->header_name,
1430 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1431 if (ret < 0)
1432 goto fail_event;
1433
1434 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1435 dout("ceph_osdc_wait_event returned %d\n", ret);
1436 rbd_destroy_ops(ops);
1437 return 0;
1438
1439fail_event:
1440 ceph_osdc_cancel_event(event);
1441fail:
1442 rbd_destroy_ops(ops);
1443 return ret;
1444}
1445
602adf40
YS
1446/*
1447 * Request sync osd read
1448 */
0ce1a794 1449static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1450 const char *object_name,
1451 const char *class_name,
1452 const char *method_name,
602adf40 1453 const char *data,
59c2be1e
YS
1454 int len,
1455 u64 *ver)
602adf40
YS
1456{
1457 struct ceph_osd_req_op *ops;
aded07ea
AE
1458 int class_name_len = strlen(class_name);
1459 int method_name_len = strlen(method_name);
57cfc106
AE
1460 int ret;
1461
1462 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
aded07ea 1463 class_name_len + method_name_len + len);
57cfc106
AE
1464 if (!ops)
1465 return -ENOMEM;
602adf40 1466
aded07ea
AE
1467 ops[0].cls.class_name = class_name;
1468 ops[0].cls.class_len = (__u8) class_name_len;
1469 ops[0].cls.method_name = method_name;
1470 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1471 ops[0].cls.argc = 0;
1472 ops[0].cls.indata = data;
1473 ops[0].cls.indata_len = len;
1474
0ce1a794 1475 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40 1476 CEPH_NOSNAP,
602adf40
YS
1477 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1478 ops,
d1f57ea6 1479 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1480
1481 rbd_destroy_ops(ops);
1482
1483 dout("cls_exec returned %d\n", ret);
1484 return ret;
1485}
1486
1fec7093
YS
1487static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1488{
1489 struct rbd_req_coll *coll =
1490 kzalloc(sizeof(struct rbd_req_coll) +
1491 sizeof(struct rbd_req_status) * num_reqs,
1492 GFP_ATOMIC);
1493
1494 if (!coll)
1495 return NULL;
1496 coll->total = num_reqs;
1497 kref_init(&coll->kref);
1498 return coll;
1499}
1500
602adf40
YS
1501/*
1502 * block device queue callback
1503 */
1504static void rbd_rq_fn(struct request_queue *q)
1505{
1506 struct rbd_device *rbd_dev = q->queuedata;
1507 struct request *rq;
1508 struct bio_pair *bp = NULL;
1509
00f1f36f 1510 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1511 struct bio *bio;
1512 struct bio *rq_bio, *next_bio = NULL;
1513 bool do_write;
bd919d45
AE
1514 unsigned int size;
1515 u64 op_size = 0;
602adf40 1516 u64 ofs;
1fec7093
YS
1517 int num_segs, cur_seg = 0;
1518 struct rbd_req_coll *coll;
d1d25646 1519 struct ceph_snap_context *snapc;
602adf40 1520
602adf40
YS
1521 dout("fetched request\n");
1522
1523 /* filter out block requests we don't understand */
1524 if ((rq->cmd_type != REQ_TYPE_FS)) {
1525 __blk_end_request_all(rq, 0);
00f1f36f 1526 continue;
602adf40
YS
1527 }
1528
1529 /* deduce our operation (read, write) */
1530 do_write = (rq_data_dir(rq) == WRITE);
1531
1532 size = blk_rq_bytes(rq);
593a9e7b 1533 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1534 rq_bio = rq->bio;
1535 if (do_write && rbd_dev->read_only) {
1536 __blk_end_request_all(rq, -EROFS);
00f1f36f 1537 continue;
602adf40
YS
1538 }
1539
1540 spin_unlock_irq(q->queue_lock);
1541
d1d25646 1542 down_read(&rbd_dev->header_rwsem);
e88a36ec 1543
d1d25646 1544 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1545 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1546 dout("request for non-existent snapshot");
1547 spin_lock_irq(q->queue_lock);
1548 __blk_end_request_all(rq, -ENXIO);
1549 continue;
e88a36ec
JD
1550 }
1551
d1d25646
JD
1552 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1553
1554 up_read(&rbd_dev->header_rwsem);
1555
602adf40
YS
1556 dout("%s 0x%x bytes at 0x%llx\n",
1557 do_write ? "write" : "read",
bd919d45 1558 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1559
1fec7093 1560 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
df111be6
AE
1561 if (num_segs <= 0) {
1562 spin_lock_irq(q->queue_lock);
1563 __blk_end_request_all(rq, num_segs);
1564 ceph_put_snap_context(snapc);
1565 continue;
1566 }
1fec7093
YS
1567 coll = rbd_alloc_coll(num_segs);
1568 if (!coll) {
1569 spin_lock_irq(q->queue_lock);
1570 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1571 ceph_put_snap_context(snapc);
00f1f36f 1572 continue;
1fec7093
YS
1573 }
1574
602adf40
YS
1575 do {
1576 /* a bio clone to be passed down to OSD req */
bd919d45 1577 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
65ccfe21 1578 op_size = rbd_segment_length(rbd_dev, ofs, size);
1fec7093 1579 kref_get(&coll->kref);
602adf40
YS
1580 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1581 op_size, GFP_ATOMIC);
1582 if (!bio) {
1fec7093
YS
1583 rbd_coll_end_req_index(rq, coll, cur_seg,
1584 -ENOMEM, op_size);
1585 goto next_seg;
602adf40
YS
1586 }
1587
1fec7093 1588
602adf40
YS
1589 /* init OSD command: write or read */
1590 if (do_write)
1591 rbd_req_write(rq, rbd_dev,
d1d25646 1592 snapc,
602adf40 1593 ofs,
1fec7093
YS
1594 op_size, bio,
1595 coll, cur_seg);
602adf40
YS
1596 else
1597 rbd_req_read(rq, rbd_dev,
77dfe99f 1598 rbd_dev->snap_id,
602adf40 1599 ofs,
1fec7093
YS
1600 op_size, bio,
1601 coll, cur_seg);
602adf40 1602
1fec7093 1603next_seg:
602adf40
YS
1604 size -= op_size;
1605 ofs += op_size;
1606
1fec7093 1607 cur_seg++;
602adf40
YS
1608 rq_bio = next_bio;
1609 } while (size > 0);
1fec7093 1610 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1611
1612 if (bp)
1613 bio_pair_release(bp);
602adf40 1614 spin_lock_irq(q->queue_lock);
d1d25646
JD
1615
1616 ceph_put_snap_context(snapc);
602adf40
YS
1617 }
1618}
1619
1620/*
1621 * a queue callback. Makes sure that we don't create a bio that spans across
1622 * multiple osd objects. One exception would be with a single page bios,
1623 * which we handle later at bio_chain_clone
1624 */
1625static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1626 struct bio_vec *bvec)
1627{
1628 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1629 unsigned int chunk_sectors;
1630 sector_t sector;
1631 unsigned int bio_sectors;
602adf40
YS
1632 int max;
1633
593a9e7b
AE
1634 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1635 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1636 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1637
602adf40 1638 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1639 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1640 if (max < 0)
1641 max = 0; /* bio_add cannot handle a negative return */
1642 if (max <= bvec->bv_len && bio_sectors == 0)
1643 return bvec->bv_len;
1644 return max;
1645}
1646
1647static void rbd_free_disk(struct rbd_device *rbd_dev)
1648{
1649 struct gendisk *disk = rbd_dev->disk;
1650
1651 if (!disk)
1652 return;
1653
1654 rbd_header_free(&rbd_dev->header);
1655
1656 if (disk->flags & GENHD_FL_UP)
1657 del_gendisk(disk);
1658 if (disk->queue)
1659 blk_cleanup_queue(disk->queue);
1660 put_disk(disk);
1661}
1662
1663/*
4156d998
AE
1664 * Read the complete header for the given rbd device.
1665 *
1666 * Returns a pointer to a dynamically-allocated buffer containing
1667 * the complete and validated header. Caller can pass the address
1668 * of a variable that will be filled in with the version of the
1669 * header object at the time it was read.
1670 *
1671 * Returns a pointer-coded errno if a failure occurs.
602adf40 1672 */
4156d998
AE
1673static struct rbd_image_header_ondisk *
1674rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 1675{
4156d998 1676 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 1677 u32 snap_count = 0;
4156d998
AE
1678 u64 names_size = 0;
1679 u32 want_count;
1680 int ret;
602adf40 1681
00f1f36f 1682 /*
4156d998
AE
1683 * The complete header will include an array of its 64-bit
1684 * snapshot ids, followed by the names of those snapshots as
1685 * a contiguous block of NUL-terminated strings. Note that
1686 * the number of snapshots could change by the time we read
1687 * it in, in which case we re-read it.
00f1f36f 1688 */
4156d998
AE
1689 do {
1690 size_t size;
1691
1692 kfree(ondisk);
1693
1694 size = sizeof (*ondisk);
1695 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1696 size += names_size;
1697 ondisk = kmalloc(size, GFP_KERNEL);
1698 if (!ondisk)
1699 return ERR_PTR(-ENOMEM);
1700
1701 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
0bed54dc 1702 rbd_dev->header_name,
4156d998
AE
1703 0, size,
1704 (char *) ondisk, version);
1705
1706 if (ret < 0)
1707 goto out_err;
1708 if (WARN_ON((size_t) ret < size)) {
1709 ret = -ENXIO;
1710 pr_warning("short header read for image %s"
1711 " (want %zd got %d)\n",
1712 rbd_dev->image_name, size, ret);
1713 goto out_err;
1714 }
1715 if (!rbd_dev_ondisk_valid(ondisk)) {
1716 ret = -ENXIO;
1717 pr_warning("invalid header for image %s\n",
1718 rbd_dev->image_name);
1719 goto out_err;
81e759fb 1720 }
602adf40 1721
4156d998
AE
1722 names_size = le64_to_cpu(ondisk->snap_names_len);
1723 want_count = snap_count;
1724 snap_count = le32_to_cpu(ondisk->snap_count);
1725 } while (snap_count != want_count);
00f1f36f 1726
4156d998 1727 return ondisk;
00f1f36f 1728
4156d998
AE
1729out_err:
1730 kfree(ondisk);
1731
1732 return ERR_PTR(ret);
1733}
1734
1735/*
1736 * reload the ondisk the header
1737 */
1738static int rbd_read_header(struct rbd_device *rbd_dev,
1739 struct rbd_image_header *header)
1740{
1741 struct rbd_image_header_ondisk *ondisk;
1742 u64 ver = 0;
1743 int ret;
602adf40 1744
4156d998
AE
1745 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1746 if (IS_ERR(ondisk))
1747 return PTR_ERR(ondisk);
1748 ret = rbd_header_from_disk(header, ondisk);
1749 if (ret >= 0)
1750 header->obj_version = ver;
1751 kfree(ondisk);
1752
1753 return ret;
602adf40
YS
1754}
1755
1756/*
1757 * create a snapshot
1758 */
0ce1a794 1759static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1760 const char *snap_name,
1761 gfp_t gfp_flags)
1762{
1763 int name_len = strlen(snap_name);
1764 u64 new_snapid;
1765 int ret;
916d4d67 1766 void *data, *p, *e;
1dbb4399 1767 struct ceph_mon_client *monc;
602adf40
YS
1768
1769 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1770 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1771 return -EINVAL;
1772
0ce1a794
AE
1773 monc = &rbd_dev->rbd_client->client->monc;
1774 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1775 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1776 if (ret < 0)
1777 return ret;
1778
1779 data = kmalloc(name_len + 16, gfp_flags);
1780 if (!data)
1781 return -ENOMEM;
1782
916d4d67
SW
1783 p = data;
1784 e = data + name_len + 16;
602adf40 1785
916d4d67
SW
1786 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1787 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1788
0bed54dc 1789 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1790 "rbd", "snap_add",
d67d4be5 1791 data, p - data, NULL);
602adf40 1792
916d4d67 1793 kfree(data);
602adf40 1794
505cbb9b 1795 return ret < 0 ? ret : 0;
602adf40
YS
1796bad:
1797 return -ERANGE;
1798}
1799
dfc5606d
YS
1800static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1801{
1802 struct rbd_snap *snap;
a0593290 1803 struct rbd_snap *next;
dfc5606d 1804
a0593290 1805 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
14e7085d 1806 __rbd_remove_snap_dev(snap);
dfc5606d
YS
1807}
1808
602adf40
YS
1809/*
1810 * only read the first part of the ondisk header, without the snaps info
1811 */
b813623a 1812static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
1813{
1814 int ret;
1815 struct rbd_image_header h;
602adf40
YS
1816
1817 ret = rbd_read_header(rbd_dev, &h);
1818 if (ret < 0)
1819 return ret;
1820
a51aa0c0
JD
1821 down_write(&rbd_dev->header_rwsem);
1822
9db4b3e3 1823 /* resized? */
474ef7ce
JD
1824 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1825 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1826
1827 dout("setting size to %llu sectors", (unsigned long long) size);
1828 set_capacity(rbd_dev->disk, size);
1829 }
9db4b3e3 1830
849b4260 1831 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1832 kfree(rbd_dev->header.snap_sizes);
849b4260 1833 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1834 /* osd requests may still refer to snapc */
1835 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1836
b813623a
AE
1837 if (hver)
1838 *hver = h.obj_version;
a71b891b 1839 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1840 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1841 rbd_dev->header.snapc = h.snapc;
1842 rbd_dev->header.snap_names = h.snap_names;
1843 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1844 /* Free the extra copy of the object prefix */
1845 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1846 kfree(h.object_prefix);
1847
9fcbb800 1848 ret = rbd_dev_snap_devs_update(rbd_dev);
dfc5606d 1849
c666601a 1850 up_write(&rbd_dev->header_rwsem);
602adf40 1851
dfc5606d 1852 return ret;
602adf40
YS
1853}
1854
1fe5e993
AE
1855static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1856{
1857 int ret;
1858
1859 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1860 ret = __rbd_refresh_header(rbd_dev, hver);
1861 mutex_unlock(&ctl_mutex);
1862
1863 return ret;
1864}
1865
602adf40
YS
1866static int rbd_init_disk(struct rbd_device *rbd_dev)
1867{
1868 struct gendisk *disk;
1869 struct request_queue *q;
1870 int rc;
593a9e7b 1871 u64 segment_size;
602adf40
YS
1872 u64 total_size = 0;
1873
1874 /* contact OSD, request size info about the object being mapped */
1875 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1876 if (rc)
1877 return rc;
1878
dfc5606d 1879 /* no need to lock here, as rbd_dev is not registered yet */
9fcbb800 1880 rc = rbd_dev_snap_devs_update(rbd_dev);
dfc5606d
YS
1881 if (rc)
1882 return rc;
1883
cc9d734c 1884 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1885 if (rc)
1886 return rc;
1887
1888 /* create gendisk info */
1889 rc = -ENOMEM;
1890 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1891 if (!disk)
1892 goto out;
1893
f0f8cef5 1894 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 1895 rbd_dev->dev_id);
602adf40
YS
1896 disk->major = rbd_dev->major;
1897 disk->first_minor = 0;
1898 disk->fops = &rbd_bd_ops;
1899 disk->private_data = rbd_dev;
1900
1901 /* init rq */
1902 rc = -ENOMEM;
1903 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1904 if (!q)
1905 goto out_disk;
029bcbd8 1906
593a9e7b
AE
1907 /* We use the default size, but let's be explicit about it. */
1908 blk_queue_physical_block_size(q, SECTOR_SIZE);
1909
029bcbd8 1910 /* set io sizes to object size */
593a9e7b
AE
1911 segment_size = rbd_obj_bytes(&rbd_dev->header);
1912 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1913 blk_queue_max_segment_size(q, segment_size);
1914 blk_queue_io_min(q, segment_size);
1915 blk_queue_io_opt(q, segment_size);
029bcbd8 1916
602adf40
YS
1917 blk_queue_merge_bvec(q, rbd_merge_bvec);
1918 disk->queue = q;
1919
1920 q->queuedata = rbd_dev;
1921
1922 rbd_dev->disk = disk;
602adf40
YS
1923
1924 /* finally, announce the disk to the world */
593a9e7b 1925 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1926 add_disk(disk);
1927
1928 pr_info("%s: added with size 0x%llx\n",
1929 disk->disk_name, (unsigned long long)total_size);
1930 return 0;
1931
1932out_disk:
1933 put_disk(disk);
1934out:
1935 return rc;
1936}
1937
dfc5606d
YS
1938/*
1939 sysfs
1940*/
1941
593a9e7b
AE
1942static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1943{
1944 return container_of(dev, struct rbd_device, dev);
1945}
1946
dfc5606d
YS
1947static ssize_t rbd_size_show(struct device *dev,
1948 struct device_attribute *attr, char *buf)
1949{
593a9e7b 1950 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1951 sector_t size;
1952
1953 down_read(&rbd_dev->header_rwsem);
1954 size = get_capacity(rbd_dev->disk);
1955 up_read(&rbd_dev->header_rwsem);
dfc5606d 1956
a51aa0c0 1957 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1958}
1959
1960static ssize_t rbd_major_show(struct device *dev,
1961 struct device_attribute *attr, char *buf)
1962{
593a9e7b 1963 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1964
dfc5606d
YS
1965 return sprintf(buf, "%d\n", rbd_dev->major);
1966}
1967
1968static ssize_t rbd_client_id_show(struct device *dev,
1969 struct device_attribute *attr, char *buf)
602adf40 1970{
593a9e7b 1971 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1972
1dbb4399
AE
1973 return sprintf(buf, "client%lld\n",
1974 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1975}
1976
dfc5606d
YS
1977static ssize_t rbd_pool_show(struct device *dev,
1978 struct device_attribute *attr, char *buf)
602adf40 1979{
593a9e7b 1980 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1981
1982 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1983}
1984
9bb2f334
AE
1985static ssize_t rbd_pool_id_show(struct device *dev,
1986 struct device_attribute *attr, char *buf)
1987{
1988 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1989
1990 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1991}
1992
dfc5606d
YS
1993static ssize_t rbd_name_show(struct device *dev,
1994 struct device_attribute *attr, char *buf)
1995{
593a9e7b 1996 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1997
0bed54dc 1998 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1999}
2000
2001static ssize_t rbd_snap_show(struct device *dev,
2002 struct device_attribute *attr,
2003 char *buf)
2004{
593a9e7b 2005 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2006
2007 return sprintf(buf, "%s\n", rbd_dev->snap_name);
2008}
2009
2010static ssize_t rbd_image_refresh(struct device *dev,
2011 struct device_attribute *attr,
2012 const char *buf,
2013 size_t size)
2014{
593a9e7b 2015 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 2016 int ret;
602adf40 2017
1fe5e993 2018 ret = rbd_refresh_header(rbd_dev, NULL);
b813623a
AE
2019
2020 return ret < 0 ? ret : size;
dfc5606d 2021}
602adf40 2022
dfc5606d
YS
2023static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2024static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2025static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2026static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2027static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
2028static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2029static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2030static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2031static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
2032
2033static struct attribute *rbd_attrs[] = {
2034 &dev_attr_size.attr,
2035 &dev_attr_major.attr,
2036 &dev_attr_client_id.attr,
2037 &dev_attr_pool.attr,
9bb2f334 2038 &dev_attr_pool_id.attr,
dfc5606d
YS
2039 &dev_attr_name.attr,
2040 &dev_attr_current_snap.attr,
2041 &dev_attr_refresh.attr,
2042 &dev_attr_create_snap.attr,
dfc5606d
YS
2043 NULL
2044};
2045
2046static struct attribute_group rbd_attr_group = {
2047 .attrs = rbd_attrs,
2048};
2049
2050static const struct attribute_group *rbd_attr_groups[] = {
2051 &rbd_attr_group,
2052 NULL
2053};
2054
2055static void rbd_sysfs_dev_release(struct device *dev)
2056{
2057}
2058
2059static struct device_type rbd_device_type = {
2060 .name = "rbd",
2061 .groups = rbd_attr_groups,
2062 .release = rbd_sysfs_dev_release,
2063};
2064
2065
2066/*
2067 sysfs - snapshots
2068*/
2069
2070static ssize_t rbd_snap_size_show(struct device *dev,
2071 struct device_attribute *attr,
2072 char *buf)
2073{
2074 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2075
3591538f 2076 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2077}
2078
2079static ssize_t rbd_snap_id_show(struct device *dev,
2080 struct device_attribute *attr,
2081 char *buf)
2082{
2083 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2084
3591538f 2085 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2086}
2087
2088static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2089static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2090
2091static struct attribute *rbd_snap_attrs[] = {
2092 &dev_attr_snap_size.attr,
2093 &dev_attr_snap_id.attr,
2094 NULL,
2095};
2096
2097static struct attribute_group rbd_snap_attr_group = {
2098 .attrs = rbd_snap_attrs,
2099};
2100
2101static void rbd_snap_dev_release(struct device *dev)
2102{
2103 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2104 kfree(snap->name);
2105 kfree(snap);
2106}
2107
2108static const struct attribute_group *rbd_snap_attr_groups[] = {
2109 &rbd_snap_attr_group,
2110 NULL
2111};
2112
2113static struct device_type rbd_snap_device_type = {
2114 .groups = rbd_snap_attr_groups,
2115 .release = rbd_snap_dev_release,
2116};
2117
14e7085d 2118static void __rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2119{
2120 list_del(&snap->node);
2121 device_unregister(&snap->dev);
2122}
2123
14e7085d 2124static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2125 struct device *parent)
2126{
2127 struct device *dev = &snap->dev;
2128 int ret;
2129
2130 dev->type = &rbd_snap_device_type;
2131 dev->parent = parent;
2132 dev->release = rbd_snap_dev_release;
2133 dev_set_name(dev, "snap_%s", snap->name);
2134 ret = device_register(dev);
2135
2136 return ret;
2137}
2138
4e891e0a
AE
2139static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2140 int i, const char *name)
dfc5606d 2141{
4e891e0a 2142 struct rbd_snap *snap;
dfc5606d 2143 int ret;
4e891e0a
AE
2144
2145 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2146 if (!snap)
4e891e0a
AE
2147 return ERR_PTR(-ENOMEM);
2148
2149 ret = -ENOMEM;
dfc5606d 2150 snap->name = kstrdup(name, GFP_KERNEL);
4e891e0a
AE
2151 if (!snap->name)
2152 goto err;
2153
dfc5606d
YS
2154 snap->size = rbd_dev->header.snap_sizes[i];
2155 snap->id = rbd_dev->header.snapc->snaps[i];
2156 if (device_is_registered(&rbd_dev->dev)) {
14e7085d 2157 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d
YS
2158 if (ret < 0)
2159 goto err;
2160 }
4e891e0a
AE
2161
2162 return snap;
2163
dfc5606d
YS
2164err:
2165 kfree(snap->name);
2166 kfree(snap);
4e891e0a
AE
2167
2168 return ERR_PTR(ret);
dfc5606d
YS
2169}
2170
2171/*
35938150
AE
2172 * Scan the rbd device's current snapshot list and compare it to the
2173 * newly-received snapshot context. Remove any existing snapshots
2174 * not present in the new snapshot context. Add a new snapshot for
2175 * any snaphots in the snapshot context not in the current list.
2176 * And verify there are no changes to snapshots we already know
2177 * about.
2178 *
2179 * Assumes the snapshots in the snapshot context are sorted by
2180 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2181 * are also maintained in that order.)
dfc5606d 2182 */
9fcbb800 2183static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev)
dfc5606d 2184{
35938150
AE
2185 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2186 const u32 snap_count = snapc->num_snaps;
2187 char *snap_name = rbd_dev->header.snap_names;
2188 struct list_head *head = &rbd_dev->snaps;
2189 struct list_head *links = head->next;
2190 u32 index = 0;
dfc5606d 2191
9fcbb800 2192 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
2193 while (index < snap_count || links != head) {
2194 u64 snap_id;
2195 struct rbd_snap *snap;
dfc5606d 2196
35938150
AE
2197 snap_id = index < snap_count ? snapc->snaps[index]
2198 : CEPH_NOSNAP;
2199 snap = links != head ? list_entry(links, struct rbd_snap, node)
2200 : NULL;
aafb230e 2201 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 2202
35938150
AE
2203 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2204 struct list_head *next = links->next;
dfc5606d 2205
35938150 2206 /* Existing snapshot not in the new snap context */
dfc5606d 2207
35938150 2208 if (rbd_dev->snap_id == snap->id)
e88a36ec 2209 rbd_dev->snap_exists = false;
35938150 2210 __rbd_remove_snap_dev(snap);
9fcbb800
AE
2211 dout("%ssnap id %llu has been removed\n",
2212 rbd_dev->snap_id == snap->id ? "mapped " : "",
2213 (unsigned long long) snap->id);
35938150
AE
2214
2215 /* Done with this list entry; advance */
2216
2217 links = next;
dfc5606d
YS
2218 continue;
2219 }
35938150 2220
9fcbb800
AE
2221 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2222 (unsigned long long) snap_id);
35938150
AE
2223 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2224 struct rbd_snap *new_snap;
2225
2226 /* We haven't seen this snapshot before */
2227
2228 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2229 snap_name);
9fcbb800
AE
2230 if (IS_ERR(new_snap)) {
2231 int err = PTR_ERR(new_snap);
2232
2233 dout(" failed to add dev, error %d\n", err);
2234
2235 return err;
2236 }
35938150
AE
2237
2238 /* New goes before existing, or at end of list */
2239
9fcbb800 2240 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
2241 if (snap)
2242 list_add_tail(&new_snap->node, &snap->node);
2243 else
523f3258 2244 list_add_tail(&new_snap->node, head);
35938150
AE
2245 } else {
2246 /* Already have this one */
2247
9fcbb800
AE
2248 dout(" already present\n");
2249
aafb230e
AE
2250 rbd_assert(snap->size ==
2251 rbd_dev->header.snap_sizes[index]);
2252 rbd_assert(!strcmp(snap->name, snap_name));
35938150
AE
2253
2254 /* Done with this list entry; advance */
2255
2256 links = links->next;
dfc5606d 2257 }
35938150
AE
2258
2259 /* Advance to the next entry in the snapshot context */
2260
2261 index++;
2262 snap_name += strlen(snap_name) + 1;
dfc5606d 2263 }
9fcbb800 2264 dout("%s: done\n", __func__);
dfc5606d
YS
2265
2266 return 0;
2267}
2268
dfc5606d
YS
2269static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2270{
f0f8cef5 2271 int ret;
dfc5606d
YS
2272 struct device *dev;
2273 struct rbd_snap *snap;
2274
2275 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2276 dev = &rbd_dev->dev;
2277
2278 dev->bus = &rbd_bus_type;
2279 dev->type = &rbd_device_type;
2280 dev->parent = &rbd_root_dev;
2281 dev->release = rbd_dev_release;
de71a297 2282 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d
YS
2283 ret = device_register(dev);
2284 if (ret < 0)
f0f8cef5 2285 goto out;
dfc5606d
YS
2286
2287 list_for_each_entry(snap, &rbd_dev->snaps, node) {
14e7085d 2288 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d 2289 if (ret < 0)
602adf40
YS
2290 break;
2291 }
f0f8cef5 2292out:
dfc5606d
YS
2293 mutex_unlock(&ctl_mutex);
2294 return ret;
602adf40
YS
2295}
2296
dfc5606d
YS
2297static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2298{
2299 device_unregister(&rbd_dev->dev);
2300}
2301
59c2be1e
YS
2302static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2303{
2304 int ret, rc;
2305
2306 do {
0e6f322d 2307 ret = rbd_req_sync_watch(rbd_dev);
59c2be1e 2308 if (ret == -ERANGE) {
1fe5e993 2309 rc = rbd_refresh_header(rbd_dev, NULL);
59c2be1e
YS
2310 if (rc < 0)
2311 return rc;
2312 }
2313 } while (ret == -ERANGE);
2314
2315 return ret;
2316}
2317
e2839308 2318static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
2319
2320/*
499afd5b
AE
2321 * Get a unique rbd identifier for the given new rbd_dev, and add
2322 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2323 */
e2839308 2324static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 2325{
e2839308 2326 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
2327
2328 spin_lock(&rbd_dev_list_lock);
2329 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2330 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
2331 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2332 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 2333}
b7f23c36 2334
1ddbe94e 2335/*
499afd5b
AE
2336 * Remove an rbd_dev from the global list, and record that its
2337 * identifier is no longer in use.
1ddbe94e 2338 */
e2839308 2339static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2340{
d184f6bf 2341 struct list_head *tmp;
de71a297 2342 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
2343 int max_id;
2344
aafb230e 2345 rbd_assert(rbd_id > 0);
499afd5b 2346
e2839308
AE
2347 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2348 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
2349 spin_lock(&rbd_dev_list_lock);
2350 list_del_init(&rbd_dev->node);
d184f6bf
AE
2351
2352 /*
2353 * If the id being "put" is not the current maximum, there
2354 * is nothing special we need to do.
2355 */
e2839308 2356 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
2357 spin_unlock(&rbd_dev_list_lock);
2358 return;
2359 }
2360
2361 /*
2362 * We need to update the current maximum id. Search the
2363 * list to find out what it is. We're more likely to find
2364 * the maximum at the end, so search the list backward.
2365 */
2366 max_id = 0;
2367 list_for_each_prev(tmp, &rbd_dev_list) {
2368 struct rbd_device *rbd_dev;
2369
2370 rbd_dev = list_entry(tmp, struct rbd_device, node);
2371 if (rbd_id > max_id)
2372 max_id = rbd_id;
2373 }
499afd5b 2374 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2375
1ddbe94e 2376 /*
e2839308 2377 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
2378 * which case it now accurately reflects the new maximum.
2379 * Be careful not to overwrite the maximum value in that
2380 * case.
1ddbe94e 2381 */
e2839308
AE
2382 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2383 dout(" max dev id has been reset\n");
b7f23c36
AE
2384}
2385
e28fff26
AE
2386/*
2387 * Skips over white space at *buf, and updates *buf to point to the
2388 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2389 * the token (string of non-white space characters) found. Note
2390 * that *buf must be terminated with '\0'.
e28fff26
AE
2391 */
2392static inline size_t next_token(const char **buf)
2393{
2394 /*
2395 * These are the characters that produce nonzero for
2396 * isspace() in the "C" and "POSIX" locales.
2397 */
2398 const char *spaces = " \f\n\r\t\v";
2399
2400 *buf += strspn(*buf, spaces); /* Find start of token */
2401
2402 return strcspn(*buf, spaces); /* Return token length */
2403}
2404
2405/*
2406 * Finds the next token in *buf, and if the provided token buffer is
2407 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2408 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2409 * must be terminated with '\0' on entry.
e28fff26
AE
2410 *
2411 * Returns the length of the token found (not including the '\0').
2412 * Return value will be 0 if no token is found, and it will be >=
2413 * token_size if the token would not fit.
2414 *
593a9e7b 2415 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2416 * found token. Note that this occurs even if the token buffer is
2417 * too small to hold it.
2418 */
2419static inline size_t copy_token(const char **buf,
2420 char *token,
2421 size_t token_size)
2422{
2423 size_t len;
2424
2425 len = next_token(buf);
2426 if (len < token_size) {
2427 memcpy(token, *buf, len);
2428 *(token + len) = '\0';
2429 }
2430 *buf += len;
2431
2432 return len;
2433}
2434
ea3352f4
AE
2435/*
2436 * Finds the next token in *buf, dynamically allocates a buffer big
2437 * enough to hold a copy of it, and copies the token into the new
2438 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2439 * that a duplicate buffer is created even for a zero-length token.
2440 *
2441 * Returns a pointer to the newly-allocated duplicate, or a null
2442 * pointer if memory for the duplicate was not available. If
2443 * the lenp argument is a non-null pointer, the length of the token
2444 * (not including the '\0') is returned in *lenp.
2445 *
2446 * If successful, the *buf pointer will be updated to point beyond
2447 * the end of the found token.
2448 *
2449 * Note: uses GFP_KERNEL for allocation.
2450 */
2451static inline char *dup_token(const char **buf, size_t *lenp)
2452{
2453 char *dup;
2454 size_t len;
2455
2456 len = next_token(buf);
2457 dup = kmalloc(len + 1, GFP_KERNEL);
2458 if (!dup)
2459 return NULL;
2460
2461 memcpy(dup, *buf, len);
2462 *(dup + len) = '\0';
2463 *buf += len;
2464
2465 if (lenp)
2466 *lenp = len;
2467
2468 return dup;
2469}
2470
a725f65e 2471/*
0bed54dc 2472 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2473 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2474 * on the list of monitor addresses and other options provided via
2475 * /sys/bus/rbd/add.
d22f76e7
AE
2476 *
2477 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2478 */
2479static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2480 const char *buf,
7ef3214a 2481 const char **mon_addrs,
5214ecc4 2482 size_t *mon_addrs_size,
e28fff26 2483 char *options,
0bed54dc 2484 size_t options_size)
e28fff26 2485{
d22f76e7
AE
2486 size_t len;
2487 int ret;
e28fff26
AE
2488
2489 /* The first four tokens are required */
2490
7ef3214a
AE
2491 len = next_token(&buf);
2492 if (!len)
a725f65e 2493 return -EINVAL;
5214ecc4 2494 *mon_addrs_size = len + 1;
7ef3214a
AE
2495 *mon_addrs = buf;
2496
2497 buf += len;
a725f65e 2498
e28fff26
AE
2499 len = copy_token(&buf, options, options_size);
2500 if (!len || len >= options_size)
2501 return -EINVAL;
2502
bf3e5ae1 2503 ret = -ENOMEM;
d22f76e7
AE
2504 rbd_dev->pool_name = dup_token(&buf, NULL);
2505 if (!rbd_dev->pool_name)
d22f76e7 2506 goto out_err;
e28fff26 2507
0bed54dc
AE
2508 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2509 if (!rbd_dev->image_name)
bf3e5ae1 2510 goto out_err;
a725f65e 2511
cb8627c7
AE
2512 /* Create the name of the header object */
2513
0bed54dc 2514 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2515 + sizeof (RBD_SUFFIX),
2516 GFP_KERNEL);
0bed54dc 2517 if (!rbd_dev->header_name)
cb8627c7 2518 goto out_err;
0bed54dc 2519 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2520
e28fff26 2521 /*
820a5f3e
AE
2522 * The snapshot name is optional. If none is is supplied,
2523 * we use the default value.
e28fff26 2524 */
820a5f3e
AE
2525 rbd_dev->snap_name = dup_token(&buf, &len);
2526 if (!rbd_dev->snap_name)
2527 goto out_err;
2528 if (!len) {
2529 /* Replace the empty name with the default */
2530 kfree(rbd_dev->snap_name);
2531 rbd_dev->snap_name
2532 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2533 if (!rbd_dev->snap_name)
2534 goto out_err;
2535
e28fff26
AE
2536 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2537 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2538 }
e28fff26 2539
a725f65e 2540 return 0;
d22f76e7
AE
2541
2542out_err:
0bed54dc 2543 kfree(rbd_dev->header_name);
d78fd7ae 2544 rbd_dev->header_name = NULL;
0bed54dc 2545 kfree(rbd_dev->image_name);
d78fd7ae
AE
2546 rbd_dev->image_name = NULL;
2547 rbd_dev->image_name_len = 0;
d22f76e7
AE
2548 kfree(rbd_dev->pool_name);
2549 rbd_dev->pool_name = NULL;
2550
2551 return ret;
a725f65e
AE
2552}
2553
59c2be1e
YS
2554static ssize_t rbd_add(struct bus_type *bus,
2555 const char *buf,
2556 size_t count)
602adf40 2557{
cb8627c7
AE
2558 char *options;
2559 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2560 const char *mon_addrs = NULL;
2561 size_t mon_addrs_size = 0;
27cc2594
AE
2562 struct ceph_osd_client *osdc;
2563 int rc = -ENOMEM;
602adf40
YS
2564
2565 if (!try_module_get(THIS_MODULE))
2566 return -ENODEV;
2567
60571c7d 2568 options = kmalloc(count, GFP_KERNEL);
602adf40 2569 if (!options)
27cc2594 2570 goto err_nomem;
cb8627c7
AE
2571 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2572 if (!rbd_dev)
2573 goto err_nomem;
602adf40
YS
2574
2575 /* static rbd_device initialization */
2576 spin_lock_init(&rbd_dev->lock);
2577 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2578 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2579 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2580
d184f6bf 2581 /* generate unique id: find highest unique id, add one */
e2839308 2582 rbd_dev_id_get(rbd_dev);
602adf40 2583
a725f65e 2584 /* Fill in the device name, now that we have its id. */
81a89793
AE
2585 BUILD_BUG_ON(DEV_NAME_LEN
2586 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
de71a297 2587 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
a725f65e 2588
602adf40 2589 /* parse add command */
7ef3214a 2590 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2591 options, count);
a725f65e 2592 if (rc)
f0f8cef5 2593 goto err_put_id;
e124a82f 2594
f8c38929
AE
2595 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2596 if (rc < 0)
f0f8cef5 2597 goto err_put_id;
602adf40 2598
602adf40 2599 /* pick the pool */
1dbb4399 2600 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2601 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2602 if (rc < 0)
2603 goto err_out_client;
9bb2f334 2604 rbd_dev->pool_id = rc;
602adf40
YS
2605
2606 /* register our block device */
27cc2594
AE
2607 rc = register_blkdev(0, rbd_dev->name);
2608 if (rc < 0)
602adf40 2609 goto err_out_client;
27cc2594 2610 rbd_dev->major = rc;
602adf40 2611
dfc5606d
YS
2612 rc = rbd_bus_add_dev(rbd_dev);
2613 if (rc)
766fc439
YS
2614 goto err_out_blkdev;
2615
32eec68d
AE
2616 /*
2617 * At this point cleanup in the event of an error is the job
2618 * of the sysfs code (initiated by rbd_bus_del_dev()).
2619 *
2620 * Set up and announce blkdev mapping.
2621 */
602adf40
YS
2622 rc = rbd_init_disk(rbd_dev);
2623 if (rc)
766fc439 2624 goto err_out_bus;
602adf40 2625
59c2be1e
YS
2626 rc = rbd_init_watch_dev(rbd_dev);
2627 if (rc)
2628 goto err_out_bus;
2629
602adf40
YS
2630 return count;
2631
766fc439 2632err_out_bus:
766fc439
YS
2633 /* this will also clean up rest of rbd_dev stuff */
2634
2635 rbd_bus_del_dev(rbd_dev);
2636 kfree(options);
766fc439
YS
2637 return rc;
2638
602adf40
YS
2639err_out_blkdev:
2640 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2641err_out_client:
2642 rbd_put_client(rbd_dev);
f0f8cef5 2643err_put_id:
cb8627c7 2644 if (rbd_dev->pool_name) {
820a5f3e 2645 kfree(rbd_dev->snap_name);
0bed54dc
AE
2646 kfree(rbd_dev->header_name);
2647 kfree(rbd_dev->image_name);
cb8627c7
AE
2648 kfree(rbd_dev->pool_name);
2649 }
e2839308 2650 rbd_dev_id_put(rbd_dev);
27cc2594 2651err_nomem:
27cc2594 2652 kfree(rbd_dev);
cb8627c7 2653 kfree(options);
27cc2594 2654
602adf40
YS
2655 dout("Error adding device %s\n", buf);
2656 module_put(THIS_MODULE);
27cc2594
AE
2657
2658 return (ssize_t) rc;
602adf40
YS
2659}
2660
de71a297 2661static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
2662{
2663 struct list_head *tmp;
2664 struct rbd_device *rbd_dev;
2665
e124a82f 2666 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2667 list_for_each(tmp, &rbd_dev_list) {
2668 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 2669 if (rbd_dev->dev_id == dev_id) {
e124a82f 2670 spin_unlock(&rbd_dev_list_lock);
602adf40 2671 return rbd_dev;
e124a82f 2672 }
602adf40 2673 }
e124a82f 2674 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2675 return NULL;
2676}
2677
dfc5606d 2678static void rbd_dev_release(struct device *dev)
602adf40 2679{
593a9e7b 2680 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2681
1dbb4399
AE
2682 if (rbd_dev->watch_request) {
2683 struct ceph_client *client = rbd_dev->rbd_client->client;
2684
2685 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2686 rbd_dev->watch_request);
1dbb4399 2687 }
59c2be1e 2688 if (rbd_dev->watch_event)
070c633f 2689 rbd_req_sync_unwatch(rbd_dev);
59c2be1e 2690
602adf40
YS
2691 rbd_put_client(rbd_dev);
2692
2693 /* clean up and free blkdev */
2694 rbd_free_disk(rbd_dev);
2695 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2696
2697 /* done with the id, and with the rbd_dev */
820a5f3e 2698 kfree(rbd_dev->snap_name);
0bed54dc 2699 kfree(rbd_dev->header_name);
d22f76e7 2700 kfree(rbd_dev->pool_name);
0bed54dc 2701 kfree(rbd_dev->image_name);
e2839308 2702 rbd_dev_id_put(rbd_dev);
602adf40
YS
2703 kfree(rbd_dev);
2704
2705 /* release module ref */
2706 module_put(THIS_MODULE);
602adf40
YS
2707}
2708
dfc5606d
YS
2709static ssize_t rbd_remove(struct bus_type *bus,
2710 const char *buf,
2711 size_t count)
602adf40
YS
2712{
2713 struct rbd_device *rbd_dev = NULL;
2714 int target_id, rc;
2715 unsigned long ul;
2716 int ret = count;
2717
2718 rc = strict_strtoul(buf, 10, &ul);
2719 if (rc)
2720 return rc;
2721
2722 /* convert to int; abort if we lost anything in the conversion */
2723 target_id = (int) ul;
2724 if (target_id != ul)
2725 return -EINVAL;
2726
2727 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2728
2729 rbd_dev = __rbd_get_dev(target_id);
2730 if (!rbd_dev) {
2731 ret = -ENOENT;
2732 goto done;
2733 }
2734
dfc5606d
YS
2735 __rbd_remove_all_snaps(rbd_dev);
2736 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2737
2738done:
2739 mutex_unlock(&ctl_mutex);
aafb230e 2740
602adf40
YS
2741 return ret;
2742}
2743
dfc5606d
YS
2744static ssize_t rbd_snap_add(struct device *dev,
2745 struct device_attribute *attr,
2746 const char *buf,
2747 size_t count)
602adf40 2748{
593a9e7b 2749 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2750 int ret;
2751 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2752 if (!name)
2753 return -ENOMEM;
2754
dfc5606d 2755 snprintf(name, count, "%s", buf);
602adf40
YS
2756
2757 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2758
602adf40
YS
2759 ret = rbd_header_add_snap(rbd_dev,
2760 name, GFP_KERNEL);
2761 if (ret < 0)
59c2be1e 2762 goto err_unlock;
602adf40 2763
b813623a 2764 ret = __rbd_refresh_header(rbd_dev, NULL);
602adf40 2765 if (ret < 0)
59c2be1e
YS
2766 goto err_unlock;
2767
2768 /* shouldn't hold ctl_mutex when notifying.. notify might
2769 trigger a watch callback that would need to get that mutex */
2770 mutex_unlock(&ctl_mutex);
2771
2772 /* make a best effort, don't error if failed */
4cb16250 2773 rbd_req_sync_notify(rbd_dev);
602adf40
YS
2774
2775 ret = count;
59c2be1e
YS
2776 kfree(name);
2777 return ret;
2778
2779err_unlock:
602adf40 2780 mutex_unlock(&ctl_mutex);
602adf40
YS
2781 kfree(name);
2782 return ret;
2783}
2784
602adf40
YS
2785/*
2786 * create control files in sysfs
dfc5606d 2787 * /sys/bus/rbd/...
602adf40
YS
2788 */
2789static int rbd_sysfs_init(void)
2790{
dfc5606d 2791 int ret;
602adf40 2792
fed4c143 2793 ret = device_register(&rbd_root_dev);
21079786 2794 if (ret < 0)
dfc5606d 2795 return ret;
602adf40 2796
fed4c143
AE
2797 ret = bus_register(&rbd_bus_type);
2798 if (ret < 0)
2799 device_unregister(&rbd_root_dev);
602adf40 2800
602adf40
YS
2801 return ret;
2802}
2803
2804static void rbd_sysfs_cleanup(void)
2805{
dfc5606d 2806 bus_unregister(&rbd_bus_type);
fed4c143 2807 device_unregister(&rbd_root_dev);
602adf40
YS
2808}
2809
2810int __init rbd_init(void)
2811{
2812 int rc;
2813
2814 rc = rbd_sysfs_init();
2815 if (rc)
2816 return rc;
f0f8cef5 2817 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2818 return 0;
2819}
2820
2821void __exit rbd_exit(void)
2822{
2823 rbd_sysfs_cleanup();
2824}
2825
2826module_init(rbd_init);
2827module_exit(rbd_exit);
2828
2829MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2830MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2831MODULE_DESCRIPTION("rados block device");
2832
2833/* following authorship retained from original osdblk.c */
2834MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2835
2836MODULE_LICENSE("GPL");