rbd: more cleanup in rbd_header_from_disk()
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
59c2be1e
YS
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40 83 struct ceph_snap_context *snapc;
602adf40
YS
84 u32 total_snaps;
85
86 char *snap_names;
87 u64 *snap_sizes;
59c2be1e
YS
88
89 u64 obj_version;
90};
91
92struct rbd_options {
93 int notify_timeout;
602adf40
YS
94};
95
96/*
f0f8cef5 97 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
98 */
99struct rbd_client {
100 struct ceph_client *client;
59c2be1e 101 struct rbd_options *rbd_opts;
602adf40
YS
102 struct kref kref;
103 struct list_head node;
104};
105
106/*
f0f8cef5 107 * a request completion status
602adf40 108 */
1fec7093
YS
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
602adf40
YS
123};
124
f0f8cef5
AE
125/*
126 * a single io request
127 */
128struct rbd_request {
129 struct request *rq; /* blk layer request */
130 struct bio *bio; /* cloned bio */
131 struct page **pages; /* list of used pages */
132 u64 len;
133 int coll_index;
134 struct rbd_req_coll *coll;
135};
136
dfc5606d
YS
137struct rbd_snap {
138 struct device dev;
139 const char *name;
3591538f 140 u64 size;
dfc5606d
YS
141 struct list_head node;
142 u64 id;
143};
144
602adf40
YS
145/*
146 * a single device
147 */
148struct rbd_device {
de71a297 149 int dev_id; /* blkdev unique id */
602adf40
YS
150
151 int major; /* blkdev assigned major */
152 struct gendisk *disk; /* blkdev's gendisk and rq */
153 struct request_queue *q;
154
602adf40
YS
155 struct rbd_client *rbd_client;
156
157 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
158
159 spinlock_t lock; /* queue lock */
160
161 struct rbd_image_header header;
0bed54dc
AE
162 char *image_name;
163 size_t image_name_len;
164 char *header_name;
d22f76e7 165 char *pool_name;
9bb2f334 166 int pool_id;
602adf40 167
59c2be1e
YS
168 struct ceph_osd_event *watch_event;
169 struct ceph_osd_request *watch_request;
170
c666601a
JD
171 /* protects updating the header */
172 struct rw_semaphore header_rwsem;
e88a36ec 173 /* name of the snapshot this device reads from */
820a5f3e 174 char *snap_name;
e88a36ec 175 /* id of the snapshot this device reads from */
77dfe99f 176 u64 snap_id; /* current snapshot id */
e88a36ec
JD
177 /* whether the snap_id this device reads from still exists */
178 bool snap_exists;
179 int read_only;
602adf40
YS
180
181 struct list_head node;
dfc5606d
YS
182
183 /* list of snapshots */
184 struct list_head snaps;
185
186 /* sysfs related */
187 struct device dev;
188};
189
602adf40 190static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 191
602adf40 192static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
193static DEFINE_SPINLOCK(rbd_dev_list_lock);
194
432b8587
AE
195static LIST_HEAD(rbd_client_list); /* clients */
196static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 197
dfc5606d
YS
198static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199static void rbd_dev_release(struct device *dev);
dfc5606d
YS
200static ssize_t rbd_snap_add(struct device *dev,
201 struct device_attribute *attr,
202 const char *buf,
203 size_t count);
14e7085d 204static void __rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 205
f0f8cef5
AE
206static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207 size_t count);
208static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
209 size_t count);
210
211static struct bus_attribute rbd_bus_attrs[] = {
212 __ATTR(add, S_IWUSR, NULL, rbd_add),
213 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
214 __ATTR_NULL
215};
216
217static struct bus_type rbd_bus_type = {
218 .name = "rbd",
219 .bus_attrs = rbd_bus_attrs,
220};
221
222static void rbd_root_dev_release(struct device *dev)
223{
224}
225
226static struct device rbd_root_dev = {
227 .init_name = "rbd",
228 .release = rbd_root_dev_release,
229};
230
dfc5606d 231
dfc5606d
YS
232static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233{
234 return get_device(&rbd_dev->dev);
235}
236
237static void rbd_put_dev(struct rbd_device *rbd_dev)
238{
239 put_device(&rbd_dev->dev);
240}
602adf40 241
1fe5e993 242static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 243
602adf40
YS
244static int rbd_open(struct block_device *bdev, fmode_t mode)
245{
f0f8cef5 246 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 247
602adf40
YS
248 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
249 return -EROFS;
250
340c7a2b
AE
251 rbd_get_dev(rbd_dev);
252 set_device_ro(bdev, rbd_dev->read_only);
253
602adf40
YS
254 return 0;
255}
256
dfc5606d
YS
257static int rbd_release(struct gendisk *disk, fmode_t mode)
258{
259 struct rbd_device *rbd_dev = disk->private_data;
260
261 rbd_put_dev(rbd_dev);
262
263 return 0;
264}
265
602adf40
YS
266static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
268 .open = rbd_open,
dfc5606d 269 .release = rbd_release,
602adf40
YS
270};
271
272/*
273 * Initialize an rbd client instance.
43ae4701 274 * We own *ceph_opts.
602adf40 275 */
43ae4701 276static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
59c2be1e 277 struct rbd_options *rbd_opts)
602adf40
YS
278{
279 struct rbd_client *rbdc;
280 int ret = -ENOMEM;
281
282 dout("rbd_client_create\n");
283 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
284 if (!rbdc)
285 goto out_opt;
286
287 kref_init(&rbdc->kref);
288 INIT_LIST_HEAD(&rbdc->node);
289
bc534d86
AE
290 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
291
43ae4701 292 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 293 if (IS_ERR(rbdc->client))
bc534d86 294 goto out_mutex;
43ae4701 295 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
296
297 ret = ceph_open_session(rbdc->client);
298 if (ret < 0)
299 goto out_err;
300
59c2be1e
YS
301 rbdc->rbd_opts = rbd_opts;
302
432b8587 303 spin_lock(&rbd_client_list_lock);
602adf40 304 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 305 spin_unlock(&rbd_client_list_lock);
602adf40 306
bc534d86
AE
307 mutex_unlock(&ctl_mutex);
308
602adf40
YS
309 dout("rbd_client_create created %p\n", rbdc);
310 return rbdc;
311
312out_err:
313 ceph_destroy_client(rbdc->client);
bc534d86
AE
314out_mutex:
315 mutex_unlock(&ctl_mutex);
602adf40
YS
316 kfree(rbdc);
317out_opt:
43ae4701
AE
318 if (ceph_opts)
319 ceph_destroy_options(ceph_opts);
28f259b7 320 return ERR_PTR(ret);
602adf40
YS
321}
322
323/*
1f7ba331
AE
324 * Find a ceph client with specific addr and configuration. If
325 * found, bump its reference count.
602adf40 326 */
1f7ba331 327static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
328{
329 struct rbd_client *client_node;
1f7ba331 330 bool found = false;
602adf40 331
43ae4701 332 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
333 return NULL;
334
1f7ba331
AE
335 spin_lock(&rbd_client_list_lock);
336 list_for_each_entry(client_node, &rbd_client_list, node) {
337 if (!ceph_compare_options(ceph_opts, client_node->client)) {
338 kref_get(&client_node->kref);
339 found = true;
340 break;
341 }
342 }
343 spin_unlock(&rbd_client_list_lock);
344
345 return found ? client_node : NULL;
602adf40
YS
346}
347
59c2be1e
YS
348/*
349 * mount options
350 */
351enum {
352 Opt_notify_timeout,
353 Opt_last_int,
354 /* int args above */
355 Opt_last_string,
356 /* string args above */
357};
358
43ae4701 359static match_table_t rbd_opts_tokens = {
59c2be1e
YS
360 {Opt_notify_timeout, "notify_timeout=%d"},
361 /* int args above */
362 /* string args above */
363 {-1, NULL}
364};
365
366static int parse_rbd_opts_token(char *c, void *private)
367{
43ae4701 368 struct rbd_options *rbd_opts = private;
59c2be1e
YS
369 substring_t argstr[MAX_OPT_ARGS];
370 int token, intval, ret;
371
43ae4701 372 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
373 if (token < 0)
374 return -EINVAL;
375
376 if (token < Opt_last_int) {
377 ret = match_int(&argstr[0], &intval);
378 if (ret < 0) {
379 pr_err("bad mount option arg (not int) "
380 "at '%s'\n", c);
381 return ret;
382 }
383 dout("got int token %d val %d\n", token, intval);
384 } else if (token > Opt_last_int && token < Opt_last_string) {
385 dout("got string token %d val %s\n", token,
386 argstr[0].from);
387 } else {
388 dout("got token %d\n", token);
389 }
390
391 switch (token) {
392 case Opt_notify_timeout:
43ae4701 393 rbd_opts->notify_timeout = intval;
59c2be1e
YS
394 break;
395 default:
396 BUG_ON(token);
397 }
398 return 0;
399}
400
602adf40
YS
401/*
402 * Get a ceph client with specific addr and configuration, if one does
403 * not exist create it.
404 */
5214ecc4
AE
405static struct rbd_client *rbd_get_client(const char *mon_addr,
406 size_t mon_addr_len,
407 char *options)
602adf40
YS
408{
409 struct rbd_client *rbdc;
43ae4701 410 struct ceph_options *ceph_opts;
59c2be1e
YS
411 struct rbd_options *rbd_opts;
412
413 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
414 if (!rbd_opts)
d720bcb0 415 return ERR_PTR(-ENOMEM);
59c2be1e
YS
416
417 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 418
43ae4701
AE
419 ceph_opts = ceph_parse_options(options, mon_addr,
420 mon_addr + mon_addr_len,
421 parse_rbd_opts_token, rbd_opts);
422 if (IS_ERR(ceph_opts)) {
d720bcb0 423 kfree(rbd_opts);
43ae4701 424 return ERR_CAST(ceph_opts);
ee57741c 425 }
602adf40 426
1f7ba331 427 rbdc = rbd_client_find(ceph_opts);
602adf40 428 if (rbdc) {
602adf40 429 /* using an existing client */
43ae4701 430 ceph_destroy_options(ceph_opts);
e6994d3d
AE
431 kfree(rbd_opts);
432
d720bcb0 433 return rbdc;
602adf40 434 }
602adf40 435
43ae4701 436 rbdc = rbd_client_create(ceph_opts, rbd_opts);
d720bcb0
AE
437 if (IS_ERR(rbdc))
438 kfree(rbd_opts);
602adf40 439
d720bcb0 440 return rbdc;
602adf40
YS
441}
442
443/*
444 * Destroy ceph client
d23a4b3f 445 *
432b8587 446 * Caller must hold rbd_client_list_lock.
602adf40
YS
447 */
448static void rbd_client_release(struct kref *kref)
449{
450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 453 spin_lock(&rbd_client_list_lock);
602adf40 454 list_del(&rbdc->node);
cd9d9f5d 455 spin_unlock(&rbd_client_list_lock);
602adf40
YS
456
457 ceph_destroy_client(rbdc->client);
59c2be1e 458 kfree(rbdc->rbd_opts);
602adf40
YS
459 kfree(rbdc);
460}
461
462/*
463 * Drop reference to ceph client node. If it's not referenced anymore, release
464 * it.
465 */
466static void rbd_put_client(struct rbd_device *rbd_dev)
467{
468 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
469 rbd_dev->rbd_client = NULL;
602adf40
YS
470}
471
1fec7093
YS
472/*
473 * Destroy requests collection
474 */
475static void rbd_coll_release(struct kref *kref)
476{
477 struct rbd_req_coll *coll =
478 container_of(kref, struct rbd_req_coll, kref);
479
480 dout("rbd_coll_release %p\n", coll);
481 kfree(coll);
482}
602adf40 483
8e94af8e
AE
484static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
485{
103a150f
AE
486 size_t size;
487 u32 snap_count;
488
489 /* The header has to start with the magic rbd header text */
490 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
491 return false;
492
493 /*
494 * The size of a snapshot header has to fit in a size_t, and
495 * that limits the number of snapshots.
496 */
497 snap_count = le32_to_cpu(ondisk->snap_count);
498 size = SIZE_MAX - sizeof (struct ceph_snap_context);
499 if (snap_count > size / sizeof (__le64))
500 return false;
501
502 /*
503 * Not only that, but the size of the entire the snapshot
504 * header must also be representable in a size_t.
505 */
506 size -= snap_count * sizeof (__le64);
507 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
508 return false;
509
510 return true;
8e94af8e
AE
511}
512
602adf40
YS
513/*
514 * Create a new header structure, translate header format from the on-disk
515 * header.
516 */
517static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 518 struct rbd_image_header_ondisk *ondisk)
602adf40 519{
ccece235 520 u32 snap_count;
58c17b0e 521 size_t len;
d2bb24e5 522 size_t size;
621901d6 523 u32 i;
602adf40 524
6a52325f
AE
525 memset(header, 0, sizeof (*header));
526
103a150f
AE
527 snap_count = le32_to_cpu(ondisk->snap_count);
528
58c17b0e
AE
529 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
530 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 531 if (!header->object_prefix)
602adf40 532 return -ENOMEM;
58c17b0e
AE
533 memcpy(header->object_prefix, ondisk->object_prefix, len);
534 header->object_prefix[len] = '\0';
00f1f36f 535
602adf40 536 if (snap_count) {
f785cc1d
AE
537 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
538
621901d6
AE
539 /* Save a copy of the snapshot names */
540
f785cc1d
AE
541 if (snap_names_len > (u64) SIZE_MAX)
542 return -EIO;
543 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 544 if (!header->snap_names)
6a52325f 545 goto out_err;
f785cc1d
AE
546 /*
547 * Note that rbd_dev_v1_header_read() guarantees
548 * the ondisk buffer we're working with has
549 * snap_names_len bytes beyond the end of the
550 * snapshot id array, this memcpy() is safe.
551 */
552 memcpy(header->snap_names, &ondisk->snaps[snap_count],
553 snap_names_len);
6a52325f 554
621901d6
AE
555 /* Record each snapshot's size */
556
d2bb24e5
AE
557 size = snap_count * sizeof (*header->snap_sizes);
558 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 559 if (!header->snap_sizes)
6a52325f 560 goto out_err;
621901d6
AE
561 for (i = 0; i < snap_count; i++)
562 header->snap_sizes[i] =
563 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 564 } else {
ccece235 565 WARN_ON(ondisk->snap_names_len);
602adf40
YS
566 header->snap_names = NULL;
567 header->snap_sizes = NULL;
568 }
849b4260 569
602adf40
YS
570 header->image_size = le64_to_cpu(ondisk->image_size);
571 header->obj_order = ondisk->options.order;
572 header->crypt_type = ondisk->options.crypt_type;
573 header->comp_type = ondisk->options.comp_type;
6a52325f
AE
574 header->total_snaps = snap_count;
575
621901d6
AE
576 /* Allocate and fill in the snapshot context */
577
6a52325f
AE
578 size = sizeof (struct ceph_snap_context);
579 size += snap_count * sizeof (header->snapc->snaps[0]);
580 header->snapc = kzalloc(size, GFP_KERNEL);
581 if (!header->snapc)
582 goto out_err;
602adf40
YS
583
584 atomic_set(&header->snapc->nref, 1);
505cbb9b 585 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 586 header->snapc->num_snaps = snap_count;
621901d6
AE
587 for (i = 0; i < snap_count; i++)
588 header->snapc->snaps[i] =
589 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
590
591 return 0;
592
6a52325f 593out_err:
849b4260 594 kfree(header->snap_sizes);
ccece235 595 header->snap_sizes = NULL;
602adf40 596 kfree(header->snap_names);
ccece235 597 header->snap_names = NULL;
6a52325f
AE
598 kfree(header->object_prefix);
599 header->object_prefix = NULL;
ccece235 600
00f1f36f 601 return -ENOMEM;
602adf40
YS
602}
603
602adf40
YS
604static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
605 u64 *seq, u64 *size)
606{
607 int i;
608 char *p = header->snap_names;
609
00f1f36f
AE
610 for (i = 0; i < header->total_snaps; i++) {
611 if (!strcmp(snap_name, p)) {
602adf40 612
00f1f36f 613 /* Found it. Pass back its id and/or size */
602adf40 614
00f1f36f
AE
615 if (seq)
616 *seq = header->snapc->snaps[i];
617 if (size)
618 *size = header->snap_sizes[i];
619 return i;
620 }
621 p += strlen(p) + 1; /* Skip ahead to the next name */
622 }
623 return -ENOENT;
602adf40
YS
624}
625
0ce1a794 626static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 627{
78dc447d 628 int ret;
602adf40 629
0ce1a794 630 down_write(&rbd_dev->header_rwsem);
602adf40 631
0ce1a794 632 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 633 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 634 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 635 rbd_dev->snap_exists = false;
0ce1a794 636 rbd_dev->read_only = 0;
602adf40 637 if (size)
78dc447d 638 *size = rbd_dev->header.image_size;
602adf40 639 } else {
78dc447d
AE
640 u64 snap_id = 0;
641
642 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
643 &snap_id, size);
602adf40
YS
644 if (ret < 0)
645 goto done;
78dc447d 646 rbd_dev->snap_id = snap_id;
e88a36ec 647 rbd_dev->snap_exists = true;
0ce1a794 648 rbd_dev->read_only = 1;
602adf40
YS
649 }
650
651 ret = 0;
652done:
0ce1a794 653 up_write(&rbd_dev->header_rwsem);
602adf40
YS
654 return ret;
655}
656
657static void rbd_header_free(struct rbd_image_header *header)
658{
849b4260 659 kfree(header->object_prefix);
d78fd7ae 660 header->object_prefix = NULL;
602adf40 661 kfree(header->snap_sizes);
d78fd7ae 662 header->snap_sizes = NULL;
849b4260 663 kfree(header->snap_names);
d78fd7ae 664 header->snap_names = NULL;
d1d25646 665 ceph_put_snap_context(header->snapc);
d78fd7ae 666 header->snapc = NULL;
602adf40
YS
667}
668
669/*
670 * get the actual striped segment name, offset and length
671 */
672static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 673 const char *object_prefix,
602adf40
YS
674 u64 ofs, u64 len,
675 char *seg_name, u64 *segofs)
676{
677 u64 seg = ofs >> header->obj_order;
678
679 if (seg_name)
680 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 681 "%s.%012llx", object_prefix, seg);
602adf40
YS
682
683 ofs = ofs & ((1 << header->obj_order) - 1);
684 len = min_t(u64, len, (1 << header->obj_order) - ofs);
685
686 if (segofs)
687 *segofs = ofs;
688
689 return len;
690}
691
1fec7093
YS
692static int rbd_get_num_segments(struct rbd_image_header *header,
693 u64 ofs, u64 len)
694{
695 u64 start_seg = ofs >> header->obj_order;
696 u64 end_seg = (ofs + len - 1) >> header->obj_order;
697 return end_seg - start_seg + 1;
698}
699
029bcbd8
JD
700/*
701 * returns the size of an object in the image
702 */
703static u64 rbd_obj_bytes(struct rbd_image_header *header)
704{
705 return 1 << header->obj_order;
706}
707
602adf40
YS
708/*
709 * bio helpers
710 */
711
712static void bio_chain_put(struct bio *chain)
713{
714 struct bio *tmp;
715
716 while (chain) {
717 tmp = chain;
718 chain = chain->bi_next;
719 bio_put(tmp);
720 }
721}
722
723/*
724 * zeros a bio chain, starting at specific offset
725 */
726static void zero_bio_chain(struct bio *chain, int start_ofs)
727{
728 struct bio_vec *bv;
729 unsigned long flags;
730 void *buf;
731 int i;
732 int pos = 0;
733
734 while (chain) {
735 bio_for_each_segment(bv, chain, i) {
736 if (pos + bv->bv_len > start_ofs) {
737 int remainder = max(start_ofs - pos, 0);
738 buf = bvec_kmap_irq(bv, &flags);
739 memset(buf + remainder, 0,
740 bv->bv_len - remainder);
85b5aaa6 741 bvec_kunmap_irq(buf, &flags);
602adf40
YS
742 }
743 pos += bv->bv_len;
744 }
745
746 chain = chain->bi_next;
747 }
748}
749
750/*
751 * bio_chain_clone - clone a chain of bios up to a certain length.
752 * might return a bio_pair that will need to be released.
753 */
754static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
755 struct bio_pair **bp,
756 int len, gfp_t gfpmask)
757{
758 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
759 int total = 0;
760
761 if (*bp) {
762 bio_pair_release(*bp);
763 *bp = NULL;
764 }
765
766 while (old_chain && (total < len)) {
767 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
768 if (!tmp)
769 goto err_out;
770
771 if (total + old_chain->bi_size > len) {
772 struct bio_pair *bp;
773
774 /*
775 * this split can only happen with a single paged bio,
776 * split_bio will BUG_ON if this is not the case
777 */
778 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
779 "bi_size=%u\n",
780 total, len - total, old_chain->bi_size);
602adf40
YS
781
782 /* split the bio. We'll release it either in the next
783 call, or it will have to be released outside */
593a9e7b 784 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
785 if (!bp)
786 goto err_out;
787
788 __bio_clone(tmp, &bp->bio1);
789
790 *next = &bp->bio2;
791 } else {
792 __bio_clone(tmp, old_chain);
793 *next = old_chain->bi_next;
794 }
795
796 tmp->bi_bdev = NULL;
797 gfpmask &= ~__GFP_WAIT;
798 tmp->bi_next = NULL;
799
800 if (!new_chain) {
801 new_chain = tail = tmp;
802 } else {
803 tail->bi_next = tmp;
804 tail = tmp;
805 }
806 old_chain = old_chain->bi_next;
807
808 total += tmp->bi_size;
809 }
810
811 BUG_ON(total < len);
812
813 if (tail)
814 tail->bi_next = NULL;
815
816 *old = old_chain;
817
818 return new_chain;
819
820err_out:
821 dout("bio_chain_clone with err\n");
822 bio_chain_put(new_chain);
823 return NULL;
824}
825
826/*
827 * helpers for osd request op vectors.
828 */
57cfc106
AE
829static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
830 int opcode, u32 payload_len)
602adf40 831{
57cfc106
AE
832 struct ceph_osd_req_op *ops;
833
834 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
835 if (!ops)
836 return NULL;
837
838 ops[0].op = opcode;
839
602adf40
YS
840 /*
841 * op extent offset and length will be set later on
842 * in calc_raw_layout()
843 */
57cfc106
AE
844 ops[0].payload_len = payload_len;
845
846 return ops;
602adf40
YS
847}
848
849static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
850{
851 kfree(ops);
852}
853
1fec7093
YS
854static void rbd_coll_end_req_index(struct request *rq,
855 struct rbd_req_coll *coll,
856 int index,
857 int ret, u64 len)
858{
859 struct request_queue *q;
860 int min, max, i;
861
bd919d45
AE
862 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
863 coll, index, ret, (unsigned long long) len);
1fec7093
YS
864
865 if (!rq)
866 return;
867
868 if (!coll) {
869 blk_end_request(rq, ret, len);
870 return;
871 }
872
873 q = rq->q;
874
875 spin_lock_irq(q->queue_lock);
876 coll->status[index].done = 1;
877 coll->status[index].rc = ret;
878 coll->status[index].bytes = len;
879 max = min = coll->num_done;
880 while (max < coll->total && coll->status[max].done)
881 max++;
882
883 for (i = min; i<max; i++) {
884 __blk_end_request(rq, coll->status[i].rc,
885 coll->status[i].bytes);
886 coll->num_done++;
887 kref_put(&coll->kref, rbd_coll_release);
888 }
889 spin_unlock_irq(q->queue_lock);
890}
891
892static void rbd_coll_end_req(struct rbd_request *req,
893 int ret, u64 len)
894{
895 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
896}
897
602adf40
YS
898/*
899 * Send ceph osd request
900 */
901static int rbd_do_request(struct request *rq,
0ce1a794 902 struct rbd_device *rbd_dev,
602adf40
YS
903 struct ceph_snap_context *snapc,
904 u64 snapid,
aded07ea 905 const char *object_name, u64 ofs, u64 len,
602adf40
YS
906 struct bio *bio,
907 struct page **pages,
908 int num_pages,
909 int flags,
910 struct ceph_osd_req_op *ops,
1fec7093
YS
911 struct rbd_req_coll *coll,
912 int coll_index,
602adf40 913 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
914 struct ceph_msg *msg),
915 struct ceph_osd_request **linger_req,
916 u64 *ver)
602adf40
YS
917{
918 struct ceph_osd_request *req;
919 struct ceph_file_layout *layout;
920 int ret;
921 u64 bno;
922 struct timespec mtime = CURRENT_TIME;
923 struct rbd_request *req_data;
924 struct ceph_osd_request_head *reqhead;
1dbb4399 925 struct ceph_osd_client *osdc;
602adf40 926
602adf40 927 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
928 if (!req_data) {
929 if (coll)
930 rbd_coll_end_req_index(rq, coll, coll_index,
931 -ENOMEM, len);
932 return -ENOMEM;
933 }
934
935 if (coll) {
936 req_data->coll = coll;
937 req_data->coll_index = coll_index;
938 }
602adf40 939
bd919d45
AE
940 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
941 (unsigned long long) ofs, (unsigned long long) len);
602adf40 942
0ce1a794 943 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
944 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
945 false, GFP_NOIO, pages, bio);
4ad12621 946 if (!req) {
4ad12621 947 ret = -ENOMEM;
602adf40
YS
948 goto done_pages;
949 }
950
951 req->r_callback = rbd_cb;
952
953 req_data->rq = rq;
954 req_data->bio = bio;
955 req_data->pages = pages;
956 req_data->len = len;
957
958 req->r_priv = req_data;
959
960 reqhead = req->r_request->front.iov_base;
961 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
962
aded07ea 963 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
964 req->r_oid_len = strlen(req->r_oid);
965
966 layout = &req->r_file_layout;
967 memset(layout, 0, sizeof(*layout));
968 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
969 layout->fl_stripe_count = cpu_to_le32(1);
970 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 971 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
972 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
973 req, ops);
602adf40
YS
974
975 ceph_osdc_build_request(req, ofs, &len,
976 ops,
977 snapc,
978 &mtime,
979 req->r_oid, req->r_oid_len);
602adf40 980
59c2be1e 981 if (linger_req) {
1dbb4399 982 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
983 *linger_req = req;
984 }
985
1dbb4399 986 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
987 if (ret < 0)
988 goto done_err;
989
990 if (!rbd_cb) {
1dbb4399 991 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
992 if (ver)
993 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
994 dout("reassert_ver=%llu\n",
995 (unsigned long long)
996 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
997 ceph_osdc_put_request(req);
998 }
999 return ret;
1000
1001done_err:
1002 bio_chain_put(req_data->bio);
1003 ceph_osdc_put_request(req);
1004done_pages:
1fec7093 1005 rbd_coll_end_req(req_data, ret, len);
602adf40 1006 kfree(req_data);
602adf40
YS
1007 return ret;
1008}
1009
1010/*
1011 * Ceph osd op callback
1012 */
1013static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1014{
1015 struct rbd_request *req_data = req->r_priv;
1016 struct ceph_osd_reply_head *replyhead;
1017 struct ceph_osd_op *op;
1018 __s32 rc;
1019 u64 bytes;
1020 int read_op;
1021
1022 /* parse reply */
1023 replyhead = msg->front.iov_base;
1024 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1025 op = (void *)(replyhead + 1);
1026 rc = le32_to_cpu(replyhead->result);
1027 bytes = le64_to_cpu(op->extent.length);
895cfcc8 1028 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 1029
bd919d45
AE
1030 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1031 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
1032
1033 if (rc == -ENOENT && read_op) {
1034 zero_bio_chain(req_data->bio, 0);
1035 rc = 0;
1036 } else if (rc == 0 && read_op && bytes < req_data->len) {
1037 zero_bio_chain(req_data->bio, bytes);
1038 bytes = req_data->len;
1039 }
1040
1fec7093 1041 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1042
1043 if (req_data->bio)
1044 bio_chain_put(req_data->bio);
1045
1046 ceph_osdc_put_request(req);
1047 kfree(req_data);
1048}
1049
59c2be1e
YS
1050static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1051{
1052 ceph_osdc_put_request(req);
1053}
1054
602adf40
YS
1055/*
1056 * Do a synchronous ceph osd operation
1057 */
0ce1a794 1058static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1059 struct ceph_snap_context *snapc,
1060 u64 snapid,
602adf40 1061 int flags,
913d2fdc 1062 struct ceph_osd_req_op *ops,
aded07ea 1063 const char *object_name,
602adf40 1064 u64 ofs, u64 len,
59c2be1e
YS
1065 char *buf,
1066 struct ceph_osd_request **linger_req,
1067 u64 *ver)
602adf40
YS
1068{
1069 int ret;
1070 struct page **pages;
1071 int num_pages;
913d2fdc
AE
1072
1073 BUG_ON(ops == NULL);
602adf40
YS
1074
1075 num_pages = calc_pages_for(ofs , len);
1076 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1077 if (IS_ERR(pages))
1078 return PTR_ERR(pages);
602adf40 1079
0ce1a794 1080 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1081 object_name, ofs, len, NULL,
602adf40
YS
1082 pages, num_pages,
1083 flags,
1084 ops,
1fec7093 1085 NULL, 0,
59c2be1e
YS
1086 NULL,
1087 linger_req, ver);
602adf40 1088 if (ret < 0)
913d2fdc 1089 goto done;
602adf40
YS
1090
1091 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1092 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1093
602adf40
YS
1094done:
1095 ceph_release_page_vector(pages, num_pages);
1096 return ret;
1097}
1098
1099/*
1100 * Do an asynchronous ceph osd operation
1101 */
1102static int rbd_do_op(struct request *rq,
0ce1a794 1103 struct rbd_device *rbd_dev,
602adf40
YS
1104 struct ceph_snap_context *snapc,
1105 u64 snapid,
d1f57ea6 1106 int opcode, int flags,
602adf40 1107 u64 ofs, u64 len,
1fec7093
YS
1108 struct bio *bio,
1109 struct rbd_req_coll *coll,
1110 int coll_index)
602adf40
YS
1111{
1112 char *seg_name;
1113 u64 seg_ofs;
1114 u64 seg_len;
1115 int ret;
1116 struct ceph_osd_req_op *ops;
1117 u32 payload_len;
1118
1119 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1120 if (!seg_name)
1121 return -ENOMEM;
1122
1123 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1124 rbd_dev->header.object_prefix,
602adf40
YS
1125 ofs, len,
1126 seg_name, &seg_ofs);
602adf40
YS
1127
1128 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1129
57cfc106
AE
1130 ret = -ENOMEM;
1131 ops = rbd_create_rw_ops(1, opcode, payload_len);
1132 if (!ops)
602adf40
YS
1133 goto done;
1134
1135 /* we've taken care of segment sizes earlier when we
1136 cloned the bios. We should never have a segment
1137 truncated at this point */
1138 BUG_ON(seg_len < len);
1139
1140 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1141 seg_name, seg_ofs, seg_len,
1142 bio,
1143 NULL, 0,
1144 flags,
1145 ops,
1fec7093 1146 coll, coll_index,
59c2be1e 1147 rbd_req_cb, 0, NULL);
11f77002
SW
1148
1149 rbd_destroy_ops(ops);
602adf40
YS
1150done:
1151 kfree(seg_name);
1152 return ret;
1153}
1154
1155/*
1156 * Request async osd write
1157 */
1158static int rbd_req_write(struct request *rq,
1159 struct rbd_device *rbd_dev,
1160 struct ceph_snap_context *snapc,
1161 u64 ofs, u64 len,
1fec7093
YS
1162 struct bio *bio,
1163 struct rbd_req_coll *coll,
1164 int coll_index)
602adf40
YS
1165{
1166 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1167 CEPH_OSD_OP_WRITE,
1168 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1169 ofs, len, bio, coll, coll_index);
602adf40
YS
1170}
1171
1172/*
1173 * Request async osd read
1174 */
1175static int rbd_req_read(struct request *rq,
1176 struct rbd_device *rbd_dev,
1177 u64 snapid,
1178 u64 ofs, u64 len,
1fec7093
YS
1179 struct bio *bio,
1180 struct rbd_req_coll *coll,
1181 int coll_index)
602adf40
YS
1182{
1183 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1184 snapid,
602adf40
YS
1185 CEPH_OSD_OP_READ,
1186 CEPH_OSD_FLAG_READ,
1fec7093 1187 ofs, len, bio, coll, coll_index);
602adf40
YS
1188}
1189
1190/*
1191 * Request sync osd read
1192 */
0ce1a794 1193static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40 1194 u64 snapid,
aded07ea 1195 const char *object_name,
602adf40 1196 u64 ofs, u64 len,
59c2be1e
YS
1197 char *buf,
1198 u64 *ver)
602adf40 1199{
913d2fdc
AE
1200 struct ceph_osd_req_op *ops;
1201 int ret;
1202
1203 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1204 if (!ops)
1205 return -ENOMEM;
1206
1207 ret = rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1208 snapid,
602adf40 1209 CEPH_OSD_FLAG_READ,
913d2fdc
AE
1210 ops, object_name, ofs, len, buf, NULL, ver);
1211 rbd_destroy_ops(ops);
1212
1213 return ret;
602adf40
YS
1214}
1215
1216/*
59c2be1e
YS
1217 * Request sync osd watch
1218 */
0ce1a794 1219static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1220 u64 ver,
7f0a24d8 1221 u64 notify_id)
59c2be1e
YS
1222{
1223 struct ceph_osd_req_op *ops;
11f77002
SW
1224 int ret;
1225
57cfc106
AE
1226 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1227 if (!ops)
1228 return -ENOMEM;
59c2be1e 1229
a71b891b 1230 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1231 ops[0].watch.cookie = notify_id;
1232 ops[0].watch.flag = 0;
1233
0ce1a794 1234 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1235 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1236 NULL, 0,
59c2be1e
YS
1237 CEPH_OSD_FLAG_READ,
1238 ops,
1fec7093 1239 NULL, 0,
59c2be1e
YS
1240 rbd_simple_req_cb, 0, NULL);
1241
1242 rbd_destroy_ops(ops);
1243 return ret;
1244}
1245
1246static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1247{
0ce1a794 1248 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1249 u64 hver;
13143d2d
SW
1250 int rc;
1251
0ce1a794 1252 if (!rbd_dev)
59c2be1e
YS
1253 return;
1254
bd919d45
AE
1255 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1256 rbd_dev->header_name, (unsigned long long) notify_id,
1257 (unsigned int) opcode);
1fe5e993 1258 rc = rbd_refresh_header(rbd_dev, &hver);
13143d2d 1259 if (rc)
f0f8cef5 1260 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1261 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1262
7f0a24d8 1263 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1264}
1265
1266/*
1267 * Request sync osd watch
1268 */
0e6f322d 1269static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
59c2be1e
YS
1270{
1271 struct ceph_osd_req_op *ops;
0ce1a794 1272 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
57cfc106 1273 int ret;
59c2be1e 1274
57cfc106
AE
1275 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1276 if (!ops)
1277 return -ENOMEM;
59c2be1e
YS
1278
1279 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1280 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1281 if (ret < 0)
1282 goto fail;
1283
0e6f322d 1284 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
0ce1a794 1285 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1286 ops[0].watch.flag = 1;
1287
0ce1a794 1288 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1289 CEPH_NOSNAP,
59c2be1e
YS
1290 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1291 ops,
0e6f322d
AE
1292 rbd_dev->header_name,
1293 0, 0, NULL,
0ce1a794 1294 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1295
1296 if (ret < 0)
1297 goto fail_event;
1298
1299 rbd_destroy_ops(ops);
1300 return 0;
1301
1302fail_event:
0ce1a794
AE
1303 ceph_osdc_cancel_event(rbd_dev->watch_event);
1304 rbd_dev->watch_event = NULL;
59c2be1e
YS
1305fail:
1306 rbd_destroy_ops(ops);
1307 return ret;
1308}
1309
79e3057c
YS
1310/*
1311 * Request sync osd unwatch
1312 */
070c633f 1313static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
79e3057c
YS
1314{
1315 struct ceph_osd_req_op *ops;
57cfc106 1316 int ret;
79e3057c 1317
57cfc106
AE
1318 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1319 if (!ops)
1320 return -ENOMEM;
79e3057c
YS
1321
1322 ops[0].watch.ver = 0;
0ce1a794 1323 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1324 ops[0].watch.flag = 0;
1325
0ce1a794 1326 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c 1327 CEPH_NOSNAP,
79e3057c
YS
1328 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1329 ops,
070c633f
AE
1330 rbd_dev->header_name,
1331 0, 0, NULL, NULL, NULL);
1332
79e3057c
YS
1333
1334 rbd_destroy_ops(ops);
0ce1a794
AE
1335 ceph_osdc_cancel_event(rbd_dev->watch_event);
1336 rbd_dev->watch_event = NULL;
79e3057c
YS
1337 return ret;
1338}
1339
59c2be1e 1340struct rbd_notify_info {
0ce1a794 1341 struct rbd_device *rbd_dev;
59c2be1e
YS
1342};
1343
1344static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1345{
0ce1a794
AE
1346 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1347 if (!rbd_dev)
59c2be1e
YS
1348 return;
1349
bd919d45
AE
1350 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1351 rbd_dev->header_name, (unsigned long long) notify_id,
1352 (unsigned int) opcode);
59c2be1e
YS
1353}
1354
1355/*
1356 * Request sync osd notify
1357 */
4cb16250 1358static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
59c2be1e
YS
1359{
1360 struct ceph_osd_req_op *ops;
0ce1a794 1361 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1362 struct ceph_osd_event *event;
1363 struct rbd_notify_info info;
1364 int payload_len = sizeof(u32) + sizeof(u32);
1365 int ret;
1366
57cfc106
AE
1367 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1368 if (!ops)
1369 return -ENOMEM;
59c2be1e 1370
0ce1a794 1371 info.rbd_dev = rbd_dev;
59c2be1e
YS
1372
1373 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1374 (void *)&info, &event);
1375 if (ret < 0)
1376 goto fail;
1377
1378 ops[0].watch.ver = 1;
1379 ops[0].watch.flag = 1;
1380 ops[0].watch.cookie = event->cookie;
1381 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1382 ops[0].watch.timeout = 12;
1383
0ce1a794 1384 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1385 CEPH_NOSNAP,
59c2be1e
YS
1386 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1387 ops,
4cb16250
AE
1388 rbd_dev->header_name,
1389 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1390 if (ret < 0)
1391 goto fail_event;
1392
1393 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1394 dout("ceph_osdc_wait_event returned %d\n", ret);
1395 rbd_destroy_ops(ops);
1396 return 0;
1397
1398fail_event:
1399 ceph_osdc_cancel_event(event);
1400fail:
1401 rbd_destroy_ops(ops);
1402 return ret;
1403}
1404
602adf40
YS
1405/*
1406 * Request sync osd read
1407 */
0ce1a794 1408static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1409 const char *object_name,
1410 const char *class_name,
1411 const char *method_name,
602adf40 1412 const char *data,
59c2be1e
YS
1413 int len,
1414 u64 *ver)
602adf40
YS
1415{
1416 struct ceph_osd_req_op *ops;
aded07ea
AE
1417 int class_name_len = strlen(class_name);
1418 int method_name_len = strlen(method_name);
57cfc106
AE
1419 int ret;
1420
1421 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
aded07ea 1422 class_name_len + method_name_len + len);
57cfc106
AE
1423 if (!ops)
1424 return -ENOMEM;
602adf40 1425
aded07ea
AE
1426 ops[0].cls.class_name = class_name;
1427 ops[0].cls.class_len = (__u8) class_name_len;
1428 ops[0].cls.method_name = method_name;
1429 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1430 ops[0].cls.argc = 0;
1431 ops[0].cls.indata = data;
1432 ops[0].cls.indata_len = len;
1433
0ce1a794 1434 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40 1435 CEPH_NOSNAP,
602adf40
YS
1436 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1437 ops,
d1f57ea6 1438 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1439
1440 rbd_destroy_ops(ops);
1441
1442 dout("cls_exec returned %d\n", ret);
1443 return ret;
1444}
1445
1fec7093
YS
1446static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1447{
1448 struct rbd_req_coll *coll =
1449 kzalloc(sizeof(struct rbd_req_coll) +
1450 sizeof(struct rbd_req_status) * num_reqs,
1451 GFP_ATOMIC);
1452
1453 if (!coll)
1454 return NULL;
1455 coll->total = num_reqs;
1456 kref_init(&coll->kref);
1457 return coll;
1458}
1459
602adf40
YS
1460/*
1461 * block device queue callback
1462 */
1463static void rbd_rq_fn(struct request_queue *q)
1464{
1465 struct rbd_device *rbd_dev = q->queuedata;
1466 struct request *rq;
1467 struct bio_pair *bp = NULL;
1468
00f1f36f 1469 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1470 struct bio *bio;
1471 struct bio *rq_bio, *next_bio = NULL;
1472 bool do_write;
bd919d45
AE
1473 unsigned int size;
1474 u64 op_size = 0;
602adf40 1475 u64 ofs;
1fec7093
YS
1476 int num_segs, cur_seg = 0;
1477 struct rbd_req_coll *coll;
d1d25646 1478 struct ceph_snap_context *snapc;
602adf40
YS
1479
1480 /* peek at request from block layer */
1481 if (!rq)
1482 break;
1483
1484 dout("fetched request\n");
1485
1486 /* filter out block requests we don't understand */
1487 if ((rq->cmd_type != REQ_TYPE_FS)) {
1488 __blk_end_request_all(rq, 0);
00f1f36f 1489 continue;
602adf40
YS
1490 }
1491
1492 /* deduce our operation (read, write) */
1493 do_write = (rq_data_dir(rq) == WRITE);
1494
1495 size = blk_rq_bytes(rq);
593a9e7b 1496 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1497 rq_bio = rq->bio;
1498 if (do_write && rbd_dev->read_only) {
1499 __blk_end_request_all(rq, -EROFS);
00f1f36f 1500 continue;
602adf40
YS
1501 }
1502
1503 spin_unlock_irq(q->queue_lock);
1504
d1d25646 1505 down_read(&rbd_dev->header_rwsem);
e88a36ec 1506
d1d25646 1507 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1508 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1509 dout("request for non-existent snapshot");
1510 spin_lock_irq(q->queue_lock);
1511 __blk_end_request_all(rq, -ENXIO);
1512 continue;
e88a36ec
JD
1513 }
1514
d1d25646
JD
1515 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1516
1517 up_read(&rbd_dev->header_rwsem);
1518
602adf40
YS
1519 dout("%s 0x%x bytes at 0x%llx\n",
1520 do_write ? "write" : "read",
bd919d45 1521 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1522
1fec7093
YS
1523 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1524 coll = rbd_alloc_coll(num_segs);
1525 if (!coll) {
1526 spin_lock_irq(q->queue_lock);
1527 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1528 ceph_put_snap_context(snapc);
00f1f36f 1529 continue;
1fec7093
YS
1530 }
1531
602adf40
YS
1532 do {
1533 /* a bio clone to be passed down to OSD req */
bd919d45 1534 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
602adf40 1535 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1536 rbd_dev->header.object_prefix,
602adf40
YS
1537 ofs, size,
1538 NULL, NULL);
1fec7093 1539 kref_get(&coll->kref);
602adf40
YS
1540 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1541 op_size, GFP_ATOMIC);
1542 if (!bio) {
1fec7093
YS
1543 rbd_coll_end_req_index(rq, coll, cur_seg,
1544 -ENOMEM, op_size);
1545 goto next_seg;
602adf40
YS
1546 }
1547
1fec7093 1548
602adf40
YS
1549 /* init OSD command: write or read */
1550 if (do_write)
1551 rbd_req_write(rq, rbd_dev,
d1d25646 1552 snapc,
602adf40 1553 ofs,
1fec7093
YS
1554 op_size, bio,
1555 coll, cur_seg);
602adf40
YS
1556 else
1557 rbd_req_read(rq, rbd_dev,
77dfe99f 1558 rbd_dev->snap_id,
602adf40 1559 ofs,
1fec7093
YS
1560 op_size, bio,
1561 coll, cur_seg);
602adf40 1562
1fec7093 1563next_seg:
602adf40
YS
1564 size -= op_size;
1565 ofs += op_size;
1566
1fec7093 1567 cur_seg++;
602adf40
YS
1568 rq_bio = next_bio;
1569 } while (size > 0);
1fec7093 1570 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1571
1572 if (bp)
1573 bio_pair_release(bp);
602adf40 1574 spin_lock_irq(q->queue_lock);
d1d25646
JD
1575
1576 ceph_put_snap_context(snapc);
602adf40
YS
1577 }
1578}
1579
1580/*
1581 * a queue callback. Makes sure that we don't create a bio that spans across
1582 * multiple osd objects. One exception would be with a single page bios,
1583 * which we handle later at bio_chain_clone
1584 */
1585static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1586 struct bio_vec *bvec)
1587{
1588 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1589 unsigned int chunk_sectors;
1590 sector_t sector;
1591 unsigned int bio_sectors;
602adf40
YS
1592 int max;
1593
593a9e7b
AE
1594 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1595 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1596 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1597
602adf40 1598 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1599 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1600 if (max < 0)
1601 max = 0; /* bio_add cannot handle a negative return */
1602 if (max <= bvec->bv_len && bio_sectors == 0)
1603 return bvec->bv_len;
1604 return max;
1605}
1606
1607static void rbd_free_disk(struct rbd_device *rbd_dev)
1608{
1609 struct gendisk *disk = rbd_dev->disk;
1610
1611 if (!disk)
1612 return;
1613
1614 rbd_header_free(&rbd_dev->header);
1615
1616 if (disk->flags & GENHD_FL_UP)
1617 del_gendisk(disk);
1618 if (disk->queue)
1619 blk_cleanup_queue(disk->queue);
1620 put_disk(disk);
1621}
1622
1623/*
4156d998
AE
1624 * Read the complete header for the given rbd device.
1625 *
1626 * Returns a pointer to a dynamically-allocated buffer containing
1627 * the complete and validated header. Caller can pass the address
1628 * of a variable that will be filled in with the version of the
1629 * header object at the time it was read.
1630 *
1631 * Returns a pointer-coded errno if a failure occurs.
602adf40 1632 */
4156d998
AE
1633static struct rbd_image_header_ondisk *
1634rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 1635{
4156d998 1636 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 1637 u32 snap_count = 0;
4156d998
AE
1638 u64 names_size = 0;
1639 u32 want_count;
1640 int ret;
602adf40 1641
00f1f36f 1642 /*
4156d998
AE
1643 * The complete header will include an array of its 64-bit
1644 * snapshot ids, followed by the names of those snapshots as
1645 * a contiguous block of NUL-terminated strings. Note that
1646 * the number of snapshots could change by the time we read
1647 * it in, in which case we re-read it.
00f1f36f 1648 */
4156d998
AE
1649 do {
1650 size_t size;
1651
1652 kfree(ondisk);
1653
1654 size = sizeof (*ondisk);
1655 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1656 size += names_size;
1657 ondisk = kmalloc(size, GFP_KERNEL);
1658 if (!ondisk)
1659 return ERR_PTR(-ENOMEM);
1660
1661 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
0bed54dc 1662 rbd_dev->header_name,
4156d998
AE
1663 0, size,
1664 (char *) ondisk, version);
1665
1666 if (ret < 0)
1667 goto out_err;
1668 if (WARN_ON((size_t) ret < size)) {
1669 ret = -ENXIO;
1670 pr_warning("short header read for image %s"
1671 " (want %zd got %d)\n",
1672 rbd_dev->image_name, size, ret);
1673 goto out_err;
1674 }
1675 if (!rbd_dev_ondisk_valid(ondisk)) {
1676 ret = -ENXIO;
1677 pr_warning("invalid header for image %s\n",
1678 rbd_dev->image_name);
1679 goto out_err;
81e759fb 1680 }
602adf40 1681
4156d998
AE
1682 names_size = le64_to_cpu(ondisk->snap_names_len);
1683 want_count = snap_count;
1684 snap_count = le32_to_cpu(ondisk->snap_count);
1685 } while (snap_count != want_count);
00f1f36f 1686
4156d998 1687 return ondisk;
00f1f36f 1688
4156d998
AE
1689out_err:
1690 kfree(ondisk);
1691
1692 return ERR_PTR(ret);
1693}
1694
1695/*
1696 * reload the ondisk the header
1697 */
1698static int rbd_read_header(struct rbd_device *rbd_dev,
1699 struct rbd_image_header *header)
1700{
1701 struct rbd_image_header_ondisk *ondisk;
1702 u64 ver = 0;
1703 int ret;
602adf40 1704
4156d998
AE
1705 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1706 if (IS_ERR(ondisk))
1707 return PTR_ERR(ondisk);
1708 ret = rbd_header_from_disk(header, ondisk);
1709 if (ret >= 0)
1710 header->obj_version = ver;
1711 kfree(ondisk);
1712
1713 return ret;
602adf40
YS
1714}
1715
1716/*
1717 * create a snapshot
1718 */
0ce1a794 1719static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1720 const char *snap_name,
1721 gfp_t gfp_flags)
1722{
1723 int name_len = strlen(snap_name);
1724 u64 new_snapid;
1725 int ret;
916d4d67 1726 void *data, *p, *e;
1dbb4399 1727 struct ceph_mon_client *monc;
602adf40
YS
1728
1729 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1730 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1731 return -EINVAL;
1732
0ce1a794
AE
1733 monc = &rbd_dev->rbd_client->client->monc;
1734 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1735 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1736 if (ret < 0)
1737 return ret;
1738
1739 data = kmalloc(name_len + 16, gfp_flags);
1740 if (!data)
1741 return -ENOMEM;
1742
916d4d67
SW
1743 p = data;
1744 e = data + name_len + 16;
602adf40 1745
916d4d67
SW
1746 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1747 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1748
0bed54dc 1749 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1750 "rbd", "snap_add",
d67d4be5 1751 data, p - data, NULL);
602adf40 1752
916d4d67 1753 kfree(data);
602adf40 1754
505cbb9b 1755 return ret < 0 ? ret : 0;
602adf40
YS
1756bad:
1757 return -ERANGE;
1758}
1759
dfc5606d
YS
1760static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1761{
1762 struct rbd_snap *snap;
a0593290 1763 struct rbd_snap *next;
dfc5606d 1764
a0593290 1765 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
14e7085d 1766 __rbd_remove_snap_dev(snap);
dfc5606d
YS
1767}
1768
602adf40
YS
1769/*
1770 * only read the first part of the ondisk header, without the snaps info
1771 */
b813623a 1772static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
1773{
1774 int ret;
1775 struct rbd_image_header h;
602adf40
YS
1776
1777 ret = rbd_read_header(rbd_dev, &h);
1778 if (ret < 0)
1779 return ret;
1780
a51aa0c0
JD
1781 down_write(&rbd_dev->header_rwsem);
1782
9db4b3e3 1783 /* resized? */
474ef7ce
JD
1784 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1785 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1786
1787 dout("setting size to %llu sectors", (unsigned long long) size);
1788 set_capacity(rbd_dev->disk, size);
1789 }
9db4b3e3 1790
849b4260 1791 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1792 kfree(rbd_dev->header.snap_sizes);
849b4260 1793 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1794 /* osd requests may still refer to snapc */
1795 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1796
b813623a
AE
1797 if (hver)
1798 *hver = h.obj_version;
a71b891b 1799 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1800 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1801 rbd_dev->header.total_snaps = h.total_snaps;
1802 rbd_dev->header.snapc = h.snapc;
1803 rbd_dev->header.snap_names = h.snap_names;
1804 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1805 /* Free the extra copy of the object prefix */
1806 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1807 kfree(h.object_prefix);
1808
dfc5606d
YS
1809 ret = __rbd_init_snaps_header(rbd_dev);
1810
c666601a 1811 up_write(&rbd_dev->header_rwsem);
602adf40 1812
dfc5606d 1813 return ret;
602adf40
YS
1814}
1815
1fe5e993
AE
1816static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1817{
1818 int ret;
1819
1820 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1821 ret = __rbd_refresh_header(rbd_dev, hver);
1822 mutex_unlock(&ctl_mutex);
1823
1824 return ret;
1825}
1826
602adf40
YS
1827static int rbd_init_disk(struct rbd_device *rbd_dev)
1828{
1829 struct gendisk *disk;
1830 struct request_queue *q;
1831 int rc;
593a9e7b 1832 u64 segment_size;
602adf40
YS
1833 u64 total_size = 0;
1834
1835 /* contact OSD, request size info about the object being mapped */
1836 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1837 if (rc)
1838 return rc;
1839
dfc5606d
YS
1840 /* no need to lock here, as rbd_dev is not registered yet */
1841 rc = __rbd_init_snaps_header(rbd_dev);
1842 if (rc)
1843 return rc;
1844
cc9d734c 1845 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1846 if (rc)
1847 return rc;
1848
1849 /* create gendisk info */
1850 rc = -ENOMEM;
1851 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1852 if (!disk)
1853 goto out;
1854
f0f8cef5 1855 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 1856 rbd_dev->dev_id);
602adf40
YS
1857 disk->major = rbd_dev->major;
1858 disk->first_minor = 0;
1859 disk->fops = &rbd_bd_ops;
1860 disk->private_data = rbd_dev;
1861
1862 /* init rq */
1863 rc = -ENOMEM;
1864 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1865 if (!q)
1866 goto out_disk;
029bcbd8 1867
593a9e7b
AE
1868 /* We use the default size, but let's be explicit about it. */
1869 blk_queue_physical_block_size(q, SECTOR_SIZE);
1870
029bcbd8 1871 /* set io sizes to object size */
593a9e7b
AE
1872 segment_size = rbd_obj_bytes(&rbd_dev->header);
1873 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1874 blk_queue_max_segment_size(q, segment_size);
1875 blk_queue_io_min(q, segment_size);
1876 blk_queue_io_opt(q, segment_size);
029bcbd8 1877
602adf40
YS
1878 blk_queue_merge_bvec(q, rbd_merge_bvec);
1879 disk->queue = q;
1880
1881 q->queuedata = rbd_dev;
1882
1883 rbd_dev->disk = disk;
1884 rbd_dev->q = q;
1885
1886 /* finally, announce the disk to the world */
593a9e7b 1887 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1888 add_disk(disk);
1889
1890 pr_info("%s: added with size 0x%llx\n",
1891 disk->disk_name, (unsigned long long)total_size);
1892 return 0;
1893
1894out_disk:
1895 put_disk(disk);
1896out:
1897 return rc;
1898}
1899
dfc5606d
YS
1900/*
1901 sysfs
1902*/
1903
593a9e7b
AE
1904static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1905{
1906 return container_of(dev, struct rbd_device, dev);
1907}
1908
dfc5606d
YS
1909static ssize_t rbd_size_show(struct device *dev,
1910 struct device_attribute *attr, char *buf)
1911{
593a9e7b 1912 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1913 sector_t size;
1914
1915 down_read(&rbd_dev->header_rwsem);
1916 size = get_capacity(rbd_dev->disk);
1917 up_read(&rbd_dev->header_rwsem);
dfc5606d 1918
a51aa0c0 1919 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1920}
1921
1922static ssize_t rbd_major_show(struct device *dev,
1923 struct device_attribute *attr, char *buf)
1924{
593a9e7b 1925 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1926
dfc5606d
YS
1927 return sprintf(buf, "%d\n", rbd_dev->major);
1928}
1929
1930static ssize_t rbd_client_id_show(struct device *dev,
1931 struct device_attribute *attr, char *buf)
602adf40 1932{
593a9e7b 1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1934
1dbb4399
AE
1935 return sprintf(buf, "client%lld\n",
1936 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1937}
1938
dfc5606d
YS
1939static ssize_t rbd_pool_show(struct device *dev,
1940 struct device_attribute *attr, char *buf)
602adf40 1941{
593a9e7b 1942 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1943
1944 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1945}
1946
9bb2f334
AE
1947static ssize_t rbd_pool_id_show(struct device *dev,
1948 struct device_attribute *attr, char *buf)
1949{
1950 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951
1952 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1953}
1954
dfc5606d
YS
1955static ssize_t rbd_name_show(struct device *dev,
1956 struct device_attribute *attr, char *buf)
1957{
593a9e7b 1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1959
0bed54dc 1960 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1961}
1962
1963static ssize_t rbd_snap_show(struct device *dev,
1964 struct device_attribute *attr,
1965 char *buf)
1966{
593a9e7b 1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1968
1969 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1970}
1971
1972static ssize_t rbd_image_refresh(struct device *dev,
1973 struct device_attribute *attr,
1974 const char *buf,
1975 size_t size)
1976{
593a9e7b 1977 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 1978 int ret;
602adf40 1979
1fe5e993 1980 ret = rbd_refresh_header(rbd_dev, NULL);
b813623a
AE
1981
1982 return ret < 0 ? ret : size;
dfc5606d 1983}
602adf40 1984
dfc5606d
YS
1985static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1986static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1987static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1988static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1989static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1990static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1991static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1992static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1993static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1994
1995static struct attribute *rbd_attrs[] = {
1996 &dev_attr_size.attr,
1997 &dev_attr_major.attr,
1998 &dev_attr_client_id.attr,
1999 &dev_attr_pool.attr,
9bb2f334 2000 &dev_attr_pool_id.attr,
dfc5606d
YS
2001 &dev_attr_name.attr,
2002 &dev_attr_current_snap.attr,
2003 &dev_attr_refresh.attr,
2004 &dev_attr_create_snap.attr,
dfc5606d
YS
2005 NULL
2006};
2007
2008static struct attribute_group rbd_attr_group = {
2009 .attrs = rbd_attrs,
2010};
2011
2012static const struct attribute_group *rbd_attr_groups[] = {
2013 &rbd_attr_group,
2014 NULL
2015};
2016
2017static void rbd_sysfs_dev_release(struct device *dev)
2018{
2019}
2020
2021static struct device_type rbd_device_type = {
2022 .name = "rbd",
2023 .groups = rbd_attr_groups,
2024 .release = rbd_sysfs_dev_release,
2025};
2026
2027
2028/*
2029 sysfs - snapshots
2030*/
2031
2032static ssize_t rbd_snap_size_show(struct device *dev,
2033 struct device_attribute *attr,
2034 char *buf)
2035{
2036 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2037
3591538f 2038 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2039}
2040
2041static ssize_t rbd_snap_id_show(struct device *dev,
2042 struct device_attribute *attr,
2043 char *buf)
2044{
2045 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2046
3591538f 2047 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2048}
2049
2050static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2051static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2052
2053static struct attribute *rbd_snap_attrs[] = {
2054 &dev_attr_snap_size.attr,
2055 &dev_attr_snap_id.attr,
2056 NULL,
2057};
2058
2059static struct attribute_group rbd_snap_attr_group = {
2060 .attrs = rbd_snap_attrs,
2061};
2062
2063static void rbd_snap_dev_release(struct device *dev)
2064{
2065 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2066 kfree(snap->name);
2067 kfree(snap);
2068}
2069
2070static const struct attribute_group *rbd_snap_attr_groups[] = {
2071 &rbd_snap_attr_group,
2072 NULL
2073};
2074
2075static struct device_type rbd_snap_device_type = {
2076 .groups = rbd_snap_attr_groups,
2077 .release = rbd_snap_dev_release,
2078};
2079
14e7085d 2080static void __rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2081{
2082 list_del(&snap->node);
2083 device_unregister(&snap->dev);
2084}
2085
14e7085d 2086static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2087 struct device *parent)
2088{
2089 struct device *dev = &snap->dev;
2090 int ret;
2091
2092 dev->type = &rbd_snap_device_type;
2093 dev->parent = parent;
2094 dev->release = rbd_snap_dev_release;
2095 dev_set_name(dev, "snap_%s", snap->name);
2096 ret = device_register(dev);
2097
2098 return ret;
2099}
2100
4e891e0a
AE
2101static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2102 int i, const char *name)
dfc5606d 2103{
4e891e0a 2104 struct rbd_snap *snap;
dfc5606d 2105 int ret;
4e891e0a
AE
2106
2107 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2108 if (!snap)
4e891e0a
AE
2109 return ERR_PTR(-ENOMEM);
2110
2111 ret = -ENOMEM;
dfc5606d 2112 snap->name = kstrdup(name, GFP_KERNEL);
4e891e0a
AE
2113 if (!snap->name)
2114 goto err;
2115
dfc5606d
YS
2116 snap->size = rbd_dev->header.snap_sizes[i];
2117 snap->id = rbd_dev->header.snapc->snaps[i];
2118 if (device_is_registered(&rbd_dev->dev)) {
14e7085d 2119 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d
YS
2120 if (ret < 0)
2121 goto err;
2122 }
4e891e0a
AE
2123
2124 return snap;
2125
dfc5606d
YS
2126err:
2127 kfree(snap->name);
2128 kfree(snap);
4e891e0a
AE
2129
2130 return ERR_PTR(ret);
dfc5606d
YS
2131}
2132
2133/*
35938150
AE
2134 * Scan the rbd device's current snapshot list and compare it to the
2135 * newly-received snapshot context. Remove any existing snapshots
2136 * not present in the new snapshot context. Add a new snapshot for
2137 * any snaphots in the snapshot context not in the current list.
2138 * And verify there are no changes to snapshots we already know
2139 * about.
2140 *
2141 * Assumes the snapshots in the snapshot context are sorted by
2142 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2143 * are also maintained in that order.)
dfc5606d
YS
2144 */
2145static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2146{
35938150
AE
2147 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2148 const u32 snap_count = snapc->num_snaps;
2149 char *snap_name = rbd_dev->header.snap_names;
2150 struct list_head *head = &rbd_dev->snaps;
2151 struct list_head *links = head->next;
2152 u32 index = 0;
dfc5606d 2153
35938150
AE
2154 while (index < snap_count || links != head) {
2155 u64 snap_id;
2156 struct rbd_snap *snap;
dfc5606d 2157
35938150
AE
2158 snap_id = index < snap_count ? snapc->snaps[index]
2159 : CEPH_NOSNAP;
2160 snap = links != head ? list_entry(links, struct rbd_snap, node)
2161 : NULL;
2162 BUG_ON(snap && snap->id == CEPH_NOSNAP);
dfc5606d 2163
35938150
AE
2164 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2165 struct list_head *next = links->next;
dfc5606d 2166
35938150 2167 /* Existing snapshot not in the new snap context */
dfc5606d 2168
35938150 2169 if (rbd_dev->snap_id == snap->id)
e88a36ec 2170 rbd_dev->snap_exists = false;
35938150
AE
2171 __rbd_remove_snap_dev(snap);
2172
2173 /* Done with this list entry; advance */
2174
2175 links = next;
dfc5606d
YS
2176 continue;
2177 }
35938150
AE
2178
2179 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2180 struct rbd_snap *new_snap;
2181
2182 /* We haven't seen this snapshot before */
2183
2184 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2185 snap_name);
2186 if (IS_ERR(new_snap))
2187 return PTR_ERR(new_snap);
2188
2189 /* New goes before existing, or at end of list */
2190
2191 if (snap)
2192 list_add_tail(&new_snap->node, &snap->node);
2193 else
523f3258 2194 list_add_tail(&new_snap->node, head);
35938150
AE
2195 } else {
2196 /* Already have this one */
2197
2198 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2199 BUG_ON(strcmp(snap->name, snap_name));
2200
2201 /* Done with this list entry; advance */
2202
2203 links = links->next;
dfc5606d 2204 }
35938150
AE
2205
2206 /* Advance to the next entry in the snapshot context */
2207
2208 index++;
2209 snap_name += strlen(snap_name) + 1;
dfc5606d
YS
2210 }
2211
2212 return 0;
2213}
2214
dfc5606d
YS
2215static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2216{
f0f8cef5 2217 int ret;
dfc5606d
YS
2218 struct device *dev;
2219 struct rbd_snap *snap;
2220
2221 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2222 dev = &rbd_dev->dev;
2223
2224 dev->bus = &rbd_bus_type;
2225 dev->type = &rbd_device_type;
2226 dev->parent = &rbd_root_dev;
2227 dev->release = rbd_dev_release;
de71a297 2228 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d
YS
2229 ret = device_register(dev);
2230 if (ret < 0)
f0f8cef5 2231 goto out;
dfc5606d
YS
2232
2233 list_for_each_entry(snap, &rbd_dev->snaps, node) {
14e7085d 2234 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d 2235 if (ret < 0)
602adf40
YS
2236 break;
2237 }
f0f8cef5 2238out:
dfc5606d
YS
2239 mutex_unlock(&ctl_mutex);
2240 return ret;
602adf40
YS
2241}
2242
dfc5606d
YS
2243static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2244{
2245 device_unregister(&rbd_dev->dev);
2246}
2247
59c2be1e
YS
2248static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2249{
2250 int ret, rc;
2251
2252 do {
0e6f322d 2253 ret = rbd_req_sync_watch(rbd_dev);
59c2be1e 2254 if (ret == -ERANGE) {
1fe5e993 2255 rc = rbd_refresh_header(rbd_dev, NULL);
59c2be1e
YS
2256 if (rc < 0)
2257 return rc;
2258 }
2259 } while (ret == -ERANGE);
2260
2261 return ret;
2262}
2263
1ddbe94e
AE
2264static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2265
2266/*
499afd5b
AE
2267 * Get a unique rbd identifier for the given new rbd_dev, and add
2268 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2269 */
499afd5b 2270static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2271{
de71a297 2272 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
499afd5b
AE
2273
2274 spin_lock(&rbd_dev_list_lock);
2275 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2276 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2277}
b7f23c36 2278
1ddbe94e 2279/*
499afd5b
AE
2280 * Remove an rbd_dev from the global list, and record that its
2281 * identifier is no longer in use.
1ddbe94e 2282 */
499afd5b 2283static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2284{
d184f6bf 2285 struct list_head *tmp;
de71a297 2286 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
2287 int max_id;
2288
2289 BUG_ON(rbd_id < 1);
499afd5b
AE
2290
2291 spin_lock(&rbd_dev_list_lock);
2292 list_del_init(&rbd_dev->node);
d184f6bf
AE
2293
2294 /*
2295 * If the id being "put" is not the current maximum, there
2296 * is nothing special we need to do.
2297 */
2298 if (rbd_id != atomic64_read(&rbd_id_max)) {
2299 spin_unlock(&rbd_dev_list_lock);
2300 return;
2301 }
2302
2303 /*
2304 * We need to update the current maximum id. Search the
2305 * list to find out what it is. We're more likely to find
2306 * the maximum at the end, so search the list backward.
2307 */
2308 max_id = 0;
2309 list_for_each_prev(tmp, &rbd_dev_list) {
2310 struct rbd_device *rbd_dev;
2311
2312 rbd_dev = list_entry(tmp, struct rbd_device, node);
2313 if (rbd_id > max_id)
2314 max_id = rbd_id;
2315 }
499afd5b 2316 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2317
1ddbe94e 2318 /*
d184f6bf
AE
2319 * The max id could have been updated by rbd_id_get(), in
2320 * which case it now accurately reflects the new maximum.
2321 * Be careful not to overwrite the maximum value in that
2322 * case.
1ddbe94e 2323 */
d184f6bf 2324 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2325}
2326
e28fff26
AE
2327/*
2328 * Skips over white space at *buf, and updates *buf to point to the
2329 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2330 * the token (string of non-white space characters) found. Note
2331 * that *buf must be terminated with '\0'.
e28fff26
AE
2332 */
2333static inline size_t next_token(const char **buf)
2334{
2335 /*
2336 * These are the characters that produce nonzero for
2337 * isspace() in the "C" and "POSIX" locales.
2338 */
2339 const char *spaces = " \f\n\r\t\v";
2340
2341 *buf += strspn(*buf, spaces); /* Find start of token */
2342
2343 return strcspn(*buf, spaces); /* Return token length */
2344}
2345
2346/*
2347 * Finds the next token in *buf, and if the provided token buffer is
2348 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2349 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2350 * must be terminated with '\0' on entry.
e28fff26
AE
2351 *
2352 * Returns the length of the token found (not including the '\0').
2353 * Return value will be 0 if no token is found, and it will be >=
2354 * token_size if the token would not fit.
2355 *
593a9e7b 2356 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2357 * found token. Note that this occurs even if the token buffer is
2358 * too small to hold it.
2359 */
2360static inline size_t copy_token(const char **buf,
2361 char *token,
2362 size_t token_size)
2363{
2364 size_t len;
2365
2366 len = next_token(buf);
2367 if (len < token_size) {
2368 memcpy(token, *buf, len);
2369 *(token + len) = '\0';
2370 }
2371 *buf += len;
2372
2373 return len;
2374}
2375
ea3352f4
AE
2376/*
2377 * Finds the next token in *buf, dynamically allocates a buffer big
2378 * enough to hold a copy of it, and copies the token into the new
2379 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2380 * that a duplicate buffer is created even for a zero-length token.
2381 *
2382 * Returns a pointer to the newly-allocated duplicate, or a null
2383 * pointer if memory for the duplicate was not available. If
2384 * the lenp argument is a non-null pointer, the length of the token
2385 * (not including the '\0') is returned in *lenp.
2386 *
2387 * If successful, the *buf pointer will be updated to point beyond
2388 * the end of the found token.
2389 *
2390 * Note: uses GFP_KERNEL for allocation.
2391 */
2392static inline char *dup_token(const char **buf, size_t *lenp)
2393{
2394 char *dup;
2395 size_t len;
2396
2397 len = next_token(buf);
2398 dup = kmalloc(len + 1, GFP_KERNEL);
2399 if (!dup)
2400 return NULL;
2401
2402 memcpy(dup, *buf, len);
2403 *(dup + len) = '\0';
2404 *buf += len;
2405
2406 if (lenp)
2407 *lenp = len;
2408
2409 return dup;
2410}
2411
a725f65e 2412/*
0bed54dc 2413 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2414 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2415 * on the list of monitor addresses and other options provided via
2416 * /sys/bus/rbd/add.
d22f76e7
AE
2417 *
2418 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2419 */
2420static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2421 const char *buf,
7ef3214a 2422 const char **mon_addrs,
5214ecc4 2423 size_t *mon_addrs_size,
e28fff26 2424 char *options,
0bed54dc 2425 size_t options_size)
e28fff26 2426{
d22f76e7
AE
2427 size_t len;
2428 int ret;
e28fff26
AE
2429
2430 /* The first four tokens are required */
2431
7ef3214a
AE
2432 len = next_token(&buf);
2433 if (!len)
a725f65e 2434 return -EINVAL;
5214ecc4 2435 *mon_addrs_size = len + 1;
7ef3214a
AE
2436 *mon_addrs = buf;
2437
2438 buf += len;
a725f65e 2439
e28fff26
AE
2440 len = copy_token(&buf, options, options_size);
2441 if (!len || len >= options_size)
2442 return -EINVAL;
2443
bf3e5ae1 2444 ret = -ENOMEM;
d22f76e7
AE
2445 rbd_dev->pool_name = dup_token(&buf, NULL);
2446 if (!rbd_dev->pool_name)
d22f76e7 2447 goto out_err;
e28fff26 2448
0bed54dc
AE
2449 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2450 if (!rbd_dev->image_name)
bf3e5ae1 2451 goto out_err;
a725f65e 2452
cb8627c7
AE
2453 /* Create the name of the header object */
2454
0bed54dc 2455 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2456 + sizeof (RBD_SUFFIX),
2457 GFP_KERNEL);
0bed54dc 2458 if (!rbd_dev->header_name)
cb8627c7 2459 goto out_err;
0bed54dc 2460 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2461
e28fff26 2462 /*
820a5f3e
AE
2463 * The snapshot name is optional. If none is is supplied,
2464 * we use the default value.
e28fff26 2465 */
820a5f3e
AE
2466 rbd_dev->snap_name = dup_token(&buf, &len);
2467 if (!rbd_dev->snap_name)
2468 goto out_err;
2469 if (!len) {
2470 /* Replace the empty name with the default */
2471 kfree(rbd_dev->snap_name);
2472 rbd_dev->snap_name
2473 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2474 if (!rbd_dev->snap_name)
2475 goto out_err;
2476
e28fff26
AE
2477 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2478 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2479 }
e28fff26 2480
a725f65e 2481 return 0;
d22f76e7
AE
2482
2483out_err:
0bed54dc 2484 kfree(rbd_dev->header_name);
d78fd7ae 2485 rbd_dev->header_name = NULL;
0bed54dc 2486 kfree(rbd_dev->image_name);
d78fd7ae
AE
2487 rbd_dev->image_name = NULL;
2488 rbd_dev->image_name_len = 0;
d22f76e7
AE
2489 kfree(rbd_dev->pool_name);
2490 rbd_dev->pool_name = NULL;
2491
2492 return ret;
a725f65e
AE
2493}
2494
59c2be1e
YS
2495static ssize_t rbd_add(struct bus_type *bus,
2496 const char *buf,
2497 size_t count)
602adf40 2498{
cb8627c7
AE
2499 char *options;
2500 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2501 const char *mon_addrs = NULL;
2502 size_t mon_addrs_size = 0;
27cc2594
AE
2503 struct ceph_osd_client *osdc;
2504 int rc = -ENOMEM;
602adf40
YS
2505
2506 if (!try_module_get(THIS_MODULE))
2507 return -ENODEV;
2508
60571c7d 2509 options = kmalloc(count, GFP_KERNEL);
602adf40 2510 if (!options)
27cc2594 2511 goto err_nomem;
cb8627c7
AE
2512 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2513 if (!rbd_dev)
2514 goto err_nomem;
602adf40
YS
2515
2516 /* static rbd_device initialization */
2517 spin_lock_init(&rbd_dev->lock);
2518 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2519 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2520 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2521
d184f6bf 2522 /* generate unique id: find highest unique id, add one */
499afd5b 2523 rbd_id_get(rbd_dev);
602adf40 2524
a725f65e 2525 /* Fill in the device name, now that we have its id. */
81a89793
AE
2526 BUILD_BUG_ON(DEV_NAME_LEN
2527 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
de71a297 2528 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
a725f65e 2529
602adf40 2530 /* parse add command */
7ef3214a 2531 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2532 options, count);
a725f65e 2533 if (rc)
f0f8cef5 2534 goto err_put_id;
e124a82f 2535
5214ecc4
AE
2536 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2537 options);
d720bcb0
AE
2538 if (IS_ERR(rbd_dev->rbd_client)) {
2539 rc = PTR_ERR(rbd_dev->rbd_client);
d78fd7ae 2540 rbd_dev->rbd_client = NULL;
f0f8cef5 2541 goto err_put_id;
d720bcb0 2542 }
602adf40 2543
602adf40 2544 /* pick the pool */
1dbb4399 2545 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2546 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2547 if (rc < 0)
2548 goto err_out_client;
9bb2f334 2549 rbd_dev->pool_id = rc;
602adf40
YS
2550
2551 /* register our block device */
27cc2594
AE
2552 rc = register_blkdev(0, rbd_dev->name);
2553 if (rc < 0)
602adf40 2554 goto err_out_client;
27cc2594 2555 rbd_dev->major = rc;
602adf40 2556
dfc5606d
YS
2557 rc = rbd_bus_add_dev(rbd_dev);
2558 if (rc)
766fc439
YS
2559 goto err_out_blkdev;
2560
32eec68d
AE
2561 /*
2562 * At this point cleanup in the event of an error is the job
2563 * of the sysfs code (initiated by rbd_bus_del_dev()).
2564 *
2565 * Set up and announce blkdev mapping.
2566 */
602adf40
YS
2567 rc = rbd_init_disk(rbd_dev);
2568 if (rc)
766fc439 2569 goto err_out_bus;
602adf40 2570
59c2be1e
YS
2571 rc = rbd_init_watch_dev(rbd_dev);
2572 if (rc)
2573 goto err_out_bus;
2574
602adf40
YS
2575 return count;
2576
766fc439 2577err_out_bus:
766fc439
YS
2578 /* this will also clean up rest of rbd_dev stuff */
2579
2580 rbd_bus_del_dev(rbd_dev);
2581 kfree(options);
766fc439
YS
2582 return rc;
2583
602adf40
YS
2584err_out_blkdev:
2585 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2586err_out_client:
2587 rbd_put_client(rbd_dev);
f0f8cef5 2588err_put_id:
cb8627c7 2589 if (rbd_dev->pool_name) {
820a5f3e 2590 kfree(rbd_dev->snap_name);
0bed54dc
AE
2591 kfree(rbd_dev->header_name);
2592 kfree(rbd_dev->image_name);
cb8627c7
AE
2593 kfree(rbd_dev->pool_name);
2594 }
499afd5b 2595 rbd_id_put(rbd_dev);
27cc2594 2596err_nomem:
27cc2594 2597 kfree(rbd_dev);
cb8627c7 2598 kfree(options);
27cc2594 2599
602adf40
YS
2600 dout("Error adding device %s\n", buf);
2601 module_put(THIS_MODULE);
27cc2594
AE
2602
2603 return (ssize_t) rc;
602adf40
YS
2604}
2605
de71a297 2606static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
2607{
2608 struct list_head *tmp;
2609 struct rbd_device *rbd_dev;
2610
e124a82f 2611 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2612 list_for_each(tmp, &rbd_dev_list) {
2613 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 2614 if (rbd_dev->dev_id == dev_id) {
e124a82f 2615 spin_unlock(&rbd_dev_list_lock);
602adf40 2616 return rbd_dev;
e124a82f 2617 }
602adf40 2618 }
e124a82f 2619 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2620 return NULL;
2621}
2622
dfc5606d 2623static void rbd_dev_release(struct device *dev)
602adf40 2624{
593a9e7b 2625 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2626
1dbb4399
AE
2627 if (rbd_dev->watch_request) {
2628 struct ceph_client *client = rbd_dev->rbd_client->client;
2629
2630 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2631 rbd_dev->watch_request);
1dbb4399 2632 }
59c2be1e 2633 if (rbd_dev->watch_event)
070c633f 2634 rbd_req_sync_unwatch(rbd_dev);
59c2be1e 2635
602adf40
YS
2636 rbd_put_client(rbd_dev);
2637
2638 /* clean up and free blkdev */
2639 rbd_free_disk(rbd_dev);
2640 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2641
2642 /* done with the id, and with the rbd_dev */
820a5f3e 2643 kfree(rbd_dev->snap_name);
0bed54dc 2644 kfree(rbd_dev->header_name);
d22f76e7 2645 kfree(rbd_dev->pool_name);
0bed54dc 2646 kfree(rbd_dev->image_name);
32eec68d 2647 rbd_id_put(rbd_dev);
602adf40
YS
2648 kfree(rbd_dev);
2649
2650 /* release module ref */
2651 module_put(THIS_MODULE);
602adf40
YS
2652}
2653
dfc5606d
YS
2654static ssize_t rbd_remove(struct bus_type *bus,
2655 const char *buf,
2656 size_t count)
602adf40
YS
2657{
2658 struct rbd_device *rbd_dev = NULL;
2659 int target_id, rc;
2660 unsigned long ul;
2661 int ret = count;
2662
2663 rc = strict_strtoul(buf, 10, &ul);
2664 if (rc)
2665 return rc;
2666
2667 /* convert to int; abort if we lost anything in the conversion */
2668 target_id = (int) ul;
2669 if (target_id != ul)
2670 return -EINVAL;
2671
2672 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2673
2674 rbd_dev = __rbd_get_dev(target_id);
2675 if (!rbd_dev) {
2676 ret = -ENOENT;
2677 goto done;
2678 }
2679
dfc5606d
YS
2680 __rbd_remove_all_snaps(rbd_dev);
2681 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2682
2683done:
2684 mutex_unlock(&ctl_mutex);
2685 return ret;
2686}
2687
dfc5606d
YS
2688static ssize_t rbd_snap_add(struct device *dev,
2689 struct device_attribute *attr,
2690 const char *buf,
2691 size_t count)
602adf40 2692{
593a9e7b 2693 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2694 int ret;
2695 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2696 if (!name)
2697 return -ENOMEM;
2698
dfc5606d 2699 snprintf(name, count, "%s", buf);
602adf40
YS
2700
2701 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2702
602adf40
YS
2703 ret = rbd_header_add_snap(rbd_dev,
2704 name, GFP_KERNEL);
2705 if (ret < 0)
59c2be1e 2706 goto err_unlock;
602adf40 2707
b813623a 2708 ret = __rbd_refresh_header(rbd_dev, NULL);
602adf40 2709 if (ret < 0)
59c2be1e
YS
2710 goto err_unlock;
2711
2712 /* shouldn't hold ctl_mutex when notifying.. notify might
2713 trigger a watch callback that would need to get that mutex */
2714 mutex_unlock(&ctl_mutex);
2715
2716 /* make a best effort, don't error if failed */
4cb16250 2717 rbd_req_sync_notify(rbd_dev);
602adf40
YS
2718
2719 ret = count;
59c2be1e
YS
2720 kfree(name);
2721 return ret;
2722
2723err_unlock:
602adf40 2724 mutex_unlock(&ctl_mutex);
602adf40
YS
2725 kfree(name);
2726 return ret;
2727}
2728
602adf40
YS
2729/*
2730 * create control files in sysfs
dfc5606d 2731 * /sys/bus/rbd/...
602adf40
YS
2732 */
2733static int rbd_sysfs_init(void)
2734{
dfc5606d 2735 int ret;
602adf40 2736
fed4c143 2737 ret = device_register(&rbd_root_dev);
21079786 2738 if (ret < 0)
dfc5606d 2739 return ret;
602adf40 2740
fed4c143
AE
2741 ret = bus_register(&rbd_bus_type);
2742 if (ret < 0)
2743 device_unregister(&rbd_root_dev);
602adf40 2744
602adf40
YS
2745 return ret;
2746}
2747
2748static void rbd_sysfs_cleanup(void)
2749{
dfc5606d 2750 bus_unregister(&rbd_bus_type);
fed4c143 2751 device_unregister(&rbd_root_dev);
602adf40
YS
2752}
2753
2754int __init rbd_init(void)
2755{
2756 int rc;
2757
2758 rc = rbd_sysfs_init();
2759 if (rc)
2760 return rc;
f0f8cef5 2761 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2762 return 0;
2763}
2764
2765void __exit rbd_exit(void)
2766{
2767 rbd_sysfs_cleanup();
2768}
2769
2770module_init(rbd_init);
2771module_exit(rbd_exit);
2772
2773MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2774MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2775MODULE_DESCRIPTION("rados block device");
2776
2777/* following authorship retained from original osdblk.c */
2778MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2779
2780MODULE_LICENSE("GPL");