rbd: kill notify_timeout option
[linux-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
cc0538b6 72#define RBD_READ_ONLY_DEFAULT false
59c2be1e 73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40 83 struct ceph_snap_context *snapc;
602adf40
YS
84 u32 total_snaps;
85
86 char *snap_names;
87 u64 *snap_sizes;
59c2be1e
YS
88
89 u64 obj_version;
90};
91
92struct rbd_options {
cc0538b6 93 bool read_only;
602adf40
YS
94};
95
96/*
f0f8cef5 97 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
98 */
99struct rbd_client {
100 struct ceph_client *client;
101 struct kref kref;
102 struct list_head node;
103};
104
105/*
f0f8cef5 106 * a request completion status
602adf40 107 */
1fec7093
YS
108struct rbd_req_status {
109 int done;
110 int rc;
111 u64 bytes;
112};
113
114/*
115 * a collection of requests
116 */
117struct rbd_req_coll {
118 int total;
119 int num_done;
120 struct kref kref;
121 struct rbd_req_status status[0];
602adf40
YS
122};
123
f0f8cef5
AE
124/*
125 * a single io request
126 */
127struct rbd_request {
128 struct request *rq; /* blk layer request */
129 struct bio *bio; /* cloned bio */
130 struct page **pages; /* list of used pages */
131 u64 len;
132 int coll_index;
133 struct rbd_req_coll *coll;
134};
135
dfc5606d
YS
136struct rbd_snap {
137 struct device dev;
138 const char *name;
3591538f 139 u64 size;
dfc5606d
YS
140 struct list_head node;
141 u64 id;
142};
143
602adf40
YS
144/*
145 * a single device
146 */
147struct rbd_device {
de71a297 148 int dev_id; /* blkdev unique id */
602adf40
YS
149
150 int major; /* blkdev assigned major */
151 struct gendisk *disk; /* blkdev's gendisk and rq */
152 struct request_queue *q;
153
f8c38929 154 struct rbd_options rbd_opts;
602adf40
YS
155 struct rbd_client *rbd_client;
156
157 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
158
159 spinlock_t lock; /* queue lock */
160
161 struct rbd_image_header header;
0bed54dc
AE
162 char *image_name;
163 size_t image_name_len;
164 char *header_name;
d22f76e7 165 char *pool_name;
9bb2f334 166 int pool_id;
602adf40 167
59c2be1e
YS
168 struct ceph_osd_event *watch_event;
169 struct ceph_osd_request *watch_request;
170
c666601a
JD
171 /* protects updating the header */
172 struct rw_semaphore header_rwsem;
e88a36ec 173 /* name of the snapshot this device reads from */
820a5f3e 174 char *snap_name;
e88a36ec 175 /* id of the snapshot this device reads from */
77dfe99f 176 u64 snap_id; /* current snapshot id */
e88a36ec
JD
177 /* whether the snap_id this device reads from still exists */
178 bool snap_exists;
cc0538b6 179 bool read_only;
602adf40
YS
180
181 struct list_head node;
dfc5606d
YS
182
183 /* list of snapshots */
184 struct list_head snaps;
185
186 /* sysfs related */
187 struct device dev;
188};
189
602adf40 190static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 191
602adf40 192static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
193static DEFINE_SPINLOCK(rbd_dev_list_lock);
194
432b8587
AE
195static LIST_HEAD(rbd_client_list); /* clients */
196static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 197
dfc5606d
YS
198static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199static void rbd_dev_release(struct device *dev);
dfc5606d
YS
200static ssize_t rbd_snap_add(struct device *dev,
201 struct device_attribute *attr,
202 const char *buf,
203 size_t count);
14e7085d 204static void __rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 205
f0f8cef5
AE
206static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207 size_t count);
208static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
209 size_t count);
210
211static struct bus_attribute rbd_bus_attrs[] = {
212 __ATTR(add, S_IWUSR, NULL, rbd_add),
213 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
214 __ATTR_NULL
215};
216
217static struct bus_type rbd_bus_type = {
218 .name = "rbd",
219 .bus_attrs = rbd_bus_attrs,
220};
221
222static void rbd_root_dev_release(struct device *dev)
223{
224}
225
226static struct device rbd_root_dev = {
227 .init_name = "rbd",
228 .release = rbd_root_dev_release,
229};
230
dfc5606d 231
dfc5606d
YS
232static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233{
234 return get_device(&rbd_dev->dev);
235}
236
237static void rbd_put_dev(struct rbd_device *rbd_dev)
238{
239 put_device(&rbd_dev->dev);
240}
602adf40 241
1fe5e993 242static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 243
602adf40
YS
244static int rbd_open(struct block_device *bdev, fmode_t mode)
245{
f0f8cef5 246 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 247
602adf40
YS
248 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
249 return -EROFS;
250
340c7a2b
AE
251 rbd_get_dev(rbd_dev);
252 set_device_ro(bdev, rbd_dev->read_only);
253
602adf40
YS
254 return 0;
255}
256
dfc5606d
YS
257static int rbd_release(struct gendisk *disk, fmode_t mode)
258{
259 struct rbd_device *rbd_dev = disk->private_data;
260
261 rbd_put_dev(rbd_dev);
262
263 return 0;
264}
265
602adf40
YS
266static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
268 .open = rbd_open,
dfc5606d 269 .release = rbd_release,
602adf40
YS
270};
271
272/*
273 * Initialize an rbd client instance.
43ae4701 274 * We own *ceph_opts.
602adf40 275 */
f8c38929 276static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
277{
278 struct rbd_client *rbdc;
279 int ret = -ENOMEM;
280
281 dout("rbd_client_create\n");
282 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
283 if (!rbdc)
284 goto out_opt;
285
286 kref_init(&rbdc->kref);
287 INIT_LIST_HEAD(&rbdc->node);
288
bc534d86
AE
289 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
290
43ae4701 291 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 292 if (IS_ERR(rbdc->client))
bc534d86 293 goto out_mutex;
43ae4701 294 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
295
296 ret = ceph_open_session(rbdc->client);
297 if (ret < 0)
298 goto out_err;
299
432b8587 300 spin_lock(&rbd_client_list_lock);
602adf40 301 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 302 spin_unlock(&rbd_client_list_lock);
602adf40 303
bc534d86
AE
304 mutex_unlock(&ctl_mutex);
305
602adf40
YS
306 dout("rbd_client_create created %p\n", rbdc);
307 return rbdc;
308
309out_err:
310 ceph_destroy_client(rbdc->client);
bc534d86
AE
311out_mutex:
312 mutex_unlock(&ctl_mutex);
602adf40
YS
313 kfree(rbdc);
314out_opt:
43ae4701
AE
315 if (ceph_opts)
316 ceph_destroy_options(ceph_opts);
28f259b7 317 return ERR_PTR(ret);
602adf40
YS
318}
319
320/*
1f7ba331
AE
321 * Find a ceph client with specific addr and configuration. If
322 * found, bump its reference count.
602adf40 323 */
1f7ba331 324static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
325{
326 struct rbd_client *client_node;
1f7ba331 327 bool found = false;
602adf40 328
43ae4701 329 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
330 return NULL;
331
1f7ba331
AE
332 spin_lock(&rbd_client_list_lock);
333 list_for_each_entry(client_node, &rbd_client_list, node) {
334 if (!ceph_compare_options(ceph_opts, client_node->client)) {
335 kref_get(&client_node->kref);
336 found = true;
337 break;
338 }
339 }
340 spin_unlock(&rbd_client_list_lock);
341
342 return found ? client_node : NULL;
602adf40
YS
343}
344
59c2be1e
YS
345/*
346 * mount options
347 */
348enum {
59c2be1e
YS
349 Opt_last_int,
350 /* int args above */
351 Opt_last_string,
352 /* string args above */
cc0538b6
AE
353 Opt_read_only,
354 Opt_read_write,
355 /* Boolean args above */
356 Opt_last_bool,
59c2be1e
YS
357};
358
43ae4701 359static match_table_t rbd_opts_tokens = {
59c2be1e
YS
360 /* int args above */
361 /* string args above */
cc0538b6
AE
362 {Opt_read_only, "read_only"},
363 {Opt_read_only, "ro"}, /* Alternate spelling */
364 {Opt_read_write, "read_write"},
365 {Opt_read_write, "rw"}, /* Alternate spelling */
366 /* Boolean args above */
59c2be1e
YS
367 {-1, NULL}
368};
369
370static int parse_rbd_opts_token(char *c, void *private)
371{
43ae4701 372 struct rbd_options *rbd_opts = private;
59c2be1e
YS
373 substring_t argstr[MAX_OPT_ARGS];
374 int token, intval, ret;
375
43ae4701 376 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
377 if (token < 0)
378 return -EINVAL;
379
380 if (token < Opt_last_int) {
381 ret = match_int(&argstr[0], &intval);
382 if (ret < 0) {
383 pr_err("bad mount option arg (not int) "
384 "at '%s'\n", c);
385 return ret;
386 }
387 dout("got int token %d val %d\n", token, intval);
388 } else if (token > Opt_last_int && token < Opt_last_string) {
389 dout("got string token %d val %s\n", token,
390 argstr[0].from);
cc0538b6
AE
391 } else if (token > Opt_last_string && token < Opt_last_bool) {
392 dout("got Boolean token %d\n", token);
59c2be1e
YS
393 } else {
394 dout("got token %d\n", token);
395 }
396
397 switch (token) {
cc0538b6
AE
398 case Opt_read_only:
399 rbd_opts->read_only = true;
400 break;
401 case Opt_read_write:
402 rbd_opts->read_only = false;
403 break;
59c2be1e
YS
404 default:
405 BUG_ON(token);
406 }
407 return 0;
408}
409
602adf40
YS
410/*
411 * Get a ceph client with specific addr and configuration, if one does
412 * not exist create it.
413 */
f8c38929
AE
414static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
415 size_t mon_addr_len, char *options)
602adf40 416{
f8c38929 417 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
43ae4701 418 struct ceph_options *ceph_opts;
f8c38929 419 struct rbd_client *rbdc;
59c2be1e 420
cc0538b6 421 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
602adf40 422
43ae4701
AE
423 ceph_opts = ceph_parse_options(options, mon_addr,
424 mon_addr + mon_addr_len,
425 parse_rbd_opts_token, rbd_opts);
f8c38929
AE
426 if (IS_ERR(ceph_opts))
427 return PTR_ERR(ceph_opts);
602adf40 428
1f7ba331 429 rbdc = rbd_client_find(ceph_opts);
602adf40 430 if (rbdc) {
602adf40 431 /* using an existing client */
43ae4701 432 ceph_destroy_options(ceph_opts);
f8c38929
AE
433 } else {
434 rbdc = rbd_client_create(ceph_opts);
435 if (IS_ERR(rbdc))
436 return PTR_ERR(rbdc);
602adf40 437 }
f8c38929 438 rbd_dev->rbd_client = rbdc;
602adf40 439
f8c38929 440 return 0;
602adf40
YS
441}
442
443/*
444 * Destroy ceph client
d23a4b3f 445 *
432b8587 446 * Caller must hold rbd_client_list_lock.
602adf40
YS
447 */
448static void rbd_client_release(struct kref *kref)
449{
450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 453 spin_lock(&rbd_client_list_lock);
602adf40 454 list_del(&rbdc->node);
cd9d9f5d 455 spin_unlock(&rbd_client_list_lock);
602adf40
YS
456
457 ceph_destroy_client(rbdc->client);
458 kfree(rbdc);
459}
460
461/*
462 * Drop reference to ceph client node. If it's not referenced anymore, release
463 * it.
464 */
465static void rbd_put_client(struct rbd_device *rbd_dev)
466{
467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 rbd_dev->rbd_client = NULL;
602adf40
YS
469}
470
1fec7093
YS
471/*
472 * Destroy requests collection
473 */
474static void rbd_coll_release(struct kref *kref)
475{
476 struct rbd_req_coll *coll =
477 container_of(kref, struct rbd_req_coll, kref);
478
479 dout("rbd_coll_release %p\n", coll);
480 kfree(coll);
481}
602adf40 482
8e94af8e
AE
483static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484{
103a150f
AE
485 size_t size;
486 u32 snap_count;
487
488 /* The header has to start with the magic rbd header text */
489 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
490 return false;
491
492 /*
493 * The size of a snapshot header has to fit in a size_t, and
494 * that limits the number of snapshots.
495 */
496 snap_count = le32_to_cpu(ondisk->snap_count);
497 size = SIZE_MAX - sizeof (struct ceph_snap_context);
498 if (snap_count > size / sizeof (__le64))
499 return false;
500
501 /*
502 * Not only that, but the size of the entire the snapshot
503 * header must also be representable in a size_t.
504 */
505 size -= snap_count * sizeof (__le64);
506 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
507 return false;
508
509 return true;
8e94af8e
AE
510}
511
602adf40
YS
512/*
513 * Create a new header structure, translate header format from the on-disk
514 * header.
515 */
516static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 517 struct rbd_image_header_ondisk *ondisk)
602adf40 518{
ccece235 519 u32 snap_count;
58c17b0e 520 size_t len;
d2bb24e5 521 size_t size;
621901d6 522 u32 i;
602adf40 523
6a52325f
AE
524 memset(header, 0, sizeof (*header));
525
103a150f
AE
526 snap_count = le32_to_cpu(ondisk->snap_count);
527
58c17b0e
AE
528 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
529 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 530 if (!header->object_prefix)
602adf40 531 return -ENOMEM;
58c17b0e
AE
532 memcpy(header->object_prefix, ondisk->object_prefix, len);
533 header->object_prefix[len] = '\0';
00f1f36f 534
602adf40 535 if (snap_count) {
f785cc1d
AE
536 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
537
621901d6
AE
538 /* Save a copy of the snapshot names */
539
f785cc1d
AE
540 if (snap_names_len > (u64) SIZE_MAX)
541 return -EIO;
542 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 543 if (!header->snap_names)
6a52325f 544 goto out_err;
f785cc1d
AE
545 /*
546 * Note that rbd_dev_v1_header_read() guarantees
547 * the ondisk buffer we're working with has
548 * snap_names_len bytes beyond the end of the
549 * snapshot id array, this memcpy() is safe.
550 */
551 memcpy(header->snap_names, &ondisk->snaps[snap_count],
552 snap_names_len);
6a52325f 553
621901d6
AE
554 /* Record each snapshot's size */
555
d2bb24e5
AE
556 size = snap_count * sizeof (*header->snap_sizes);
557 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 558 if (!header->snap_sizes)
6a52325f 559 goto out_err;
621901d6
AE
560 for (i = 0; i < snap_count; i++)
561 header->snap_sizes[i] =
562 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 563 } else {
ccece235 564 WARN_ON(ondisk->snap_names_len);
602adf40
YS
565 header->snap_names = NULL;
566 header->snap_sizes = NULL;
567 }
849b4260 568
602adf40
YS
569 header->image_size = le64_to_cpu(ondisk->image_size);
570 header->obj_order = ondisk->options.order;
571 header->crypt_type = ondisk->options.crypt_type;
572 header->comp_type = ondisk->options.comp_type;
6a52325f
AE
573 header->total_snaps = snap_count;
574
621901d6
AE
575 /* Allocate and fill in the snapshot context */
576
6a52325f
AE
577 size = sizeof (struct ceph_snap_context);
578 size += snap_count * sizeof (header->snapc->snaps[0]);
579 header->snapc = kzalloc(size, GFP_KERNEL);
580 if (!header->snapc)
581 goto out_err;
602adf40
YS
582
583 atomic_set(&header->snapc->nref, 1);
505cbb9b 584 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 585 header->snapc->num_snaps = snap_count;
621901d6
AE
586 for (i = 0; i < snap_count; i++)
587 header->snapc->snaps[i] =
588 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
589
590 return 0;
591
6a52325f 592out_err:
849b4260 593 kfree(header->snap_sizes);
ccece235 594 header->snap_sizes = NULL;
602adf40 595 kfree(header->snap_names);
ccece235 596 header->snap_names = NULL;
6a52325f
AE
597 kfree(header->object_prefix);
598 header->object_prefix = NULL;
ccece235 599
00f1f36f 600 return -ENOMEM;
602adf40
YS
601}
602
602adf40
YS
603static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
604 u64 *seq, u64 *size)
605{
606 int i;
607 char *p = header->snap_names;
608
00f1f36f
AE
609 for (i = 0; i < header->total_snaps; i++) {
610 if (!strcmp(snap_name, p)) {
602adf40 611
00f1f36f 612 /* Found it. Pass back its id and/or size */
602adf40 613
00f1f36f
AE
614 if (seq)
615 *seq = header->snapc->snaps[i];
616 if (size)
617 *size = header->snap_sizes[i];
618 return i;
619 }
620 p += strlen(p) + 1; /* Skip ahead to the next name */
621 }
622 return -ENOENT;
602adf40
YS
623}
624
0ce1a794 625static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 626{
78dc447d 627 int ret;
602adf40 628
0ce1a794 629 down_write(&rbd_dev->header_rwsem);
602adf40 630
0ce1a794 631 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 632 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 633 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 634 rbd_dev->snap_exists = false;
cc0538b6 635 rbd_dev->read_only = rbd_dev->rbd_opts.read_only;
602adf40 636 if (size)
78dc447d 637 *size = rbd_dev->header.image_size;
602adf40 638 } else {
78dc447d
AE
639 u64 snap_id = 0;
640
641 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
642 &snap_id, size);
602adf40
YS
643 if (ret < 0)
644 goto done;
78dc447d 645 rbd_dev->snap_id = snap_id;
e88a36ec 646 rbd_dev->snap_exists = true;
cc0538b6 647 rbd_dev->read_only = true; /* No choice for snapshots */
602adf40
YS
648 }
649
650 ret = 0;
651done:
0ce1a794 652 up_write(&rbd_dev->header_rwsem);
602adf40
YS
653 return ret;
654}
655
656static void rbd_header_free(struct rbd_image_header *header)
657{
849b4260 658 kfree(header->object_prefix);
d78fd7ae 659 header->object_prefix = NULL;
602adf40 660 kfree(header->snap_sizes);
d78fd7ae 661 header->snap_sizes = NULL;
849b4260 662 kfree(header->snap_names);
d78fd7ae 663 header->snap_names = NULL;
d1d25646 664 ceph_put_snap_context(header->snapc);
d78fd7ae 665 header->snapc = NULL;
602adf40
YS
666}
667
668/*
669 * get the actual striped segment name, offset and length
670 */
671static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 672 const char *object_prefix,
602adf40
YS
673 u64 ofs, u64 len,
674 char *seg_name, u64 *segofs)
675{
676 u64 seg = ofs >> header->obj_order;
677
678 if (seg_name)
679 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 680 "%s.%012llx", object_prefix, seg);
602adf40
YS
681
682 ofs = ofs & ((1 << header->obj_order) - 1);
683 len = min_t(u64, len, (1 << header->obj_order) - ofs);
684
685 if (segofs)
686 *segofs = ofs;
687
688 return len;
689}
690
1fec7093
YS
691static int rbd_get_num_segments(struct rbd_image_header *header,
692 u64 ofs, u64 len)
693{
694 u64 start_seg = ofs >> header->obj_order;
695 u64 end_seg = (ofs + len - 1) >> header->obj_order;
696 return end_seg - start_seg + 1;
697}
698
029bcbd8
JD
699/*
700 * returns the size of an object in the image
701 */
702static u64 rbd_obj_bytes(struct rbd_image_header *header)
703{
704 return 1 << header->obj_order;
705}
706
602adf40
YS
707/*
708 * bio helpers
709 */
710
711static void bio_chain_put(struct bio *chain)
712{
713 struct bio *tmp;
714
715 while (chain) {
716 tmp = chain;
717 chain = chain->bi_next;
718 bio_put(tmp);
719 }
720}
721
722/*
723 * zeros a bio chain, starting at specific offset
724 */
725static void zero_bio_chain(struct bio *chain, int start_ofs)
726{
727 struct bio_vec *bv;
728 unsigned long flags;
729 void *buf;
730 int i;
731 int pos = 0;
732
733 while (chain) {
734 bio_for_each_segment(bv, chain, i) {
735 if (pos + bv->bv_len > start_ofs) {
736 int remainder = max(start_ofs - pos, 0);
737 buf = bvec_kmap_irq(bv, &flags);
738 memset(buf + remainder, 0,
739 bv->bv_len - remainder);
85b5aaa6 740 bvec_kunmap_irq(buf, &flags);
602adf40
YS
741 }
742 pos += bv->bv_len;
743 }
744
745 chain = chain->bi_next;
746 }
747}
748
749/*
750 * bio_chain_clone - clone a chain of bios up to a certain length.
751 * might return a bio_pair that will need to be released.
752 */
753static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
754 struct bio_pair **bp,
755 int len, gfp_t gfpmask)
756{
757 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
758 int total = 0;
759
760 if (*bp) {
761 bio_pair_release(*bp);
762 *bp = NULL;
763 }
764
765 while (old_chain && (total < len)) {
766 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
767 if (!tmp)
768 goto err_out;
769
770 if (total + old_chain->bi_size > len) {
771 struct bio_pair *bp;
772
773 /*
774 * this split can only happen with a single paged bio,
775 * split_bio will BUG_ON if this is not the case
776 */
777 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
778 "bi_size=%u\n",
779 total, len - total, old_chain->bi_size);
602adf40
YS
780
781 /* split the bio. We'll release it either in the next
782 call, or it will have to be released outside */
593a9e7b 783 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
784 if (!bp)
785 goto err_out;
786
787 __bio_clone(tmp, &bp->bio1);
788
789 *next = &bp->bio2;
790 } else {
791 __bio_clone(tmp, old_chain);
792 *next = old_chain->bi_next;
793 }
794
795 tmp->bi_bdev = NULL;
796 gfpmask &= ~__GFP_WAIT;
797 tmp->bi_next = NULL;
798
799 if (!new_chain) {
800 new_chain = tail = tmp;
801 } else {
802 tail->bi_next = tmp;
803 tail = tmp;
804 }
805 old_chain = old_chain->bi_next;
806
807 total += tmp->bi_size;
808 }
809
810 BUG_ON(total < len);
811
812 if (tail)
813 tail->bi_next = NULL;
814
815 *old = old_chain;
816
817 return new_chain;
818
819err_out:
820 dout("bio_chain_clone with err\n");
821 bio_chain_put(new_chain);
822 return NULL;
823}
824
825/*
826 * helpers for osd request op vectors.
827 */
57cfc106
AE
828static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
829 int opcode, u32 payload_len)
602adf40 830{
57cfc106
AE
831 struct ceph_osd_req_op *ops;
832
833 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
834 if (!ops)
835 return NULL;
836
837 ops[0].op = opcode;
838
602adf40
YS
839 /*
840 * op extent offset and length will be set later on
841 * in calc_raw_layout()
842 */
57cfc106
AE
843 ops[0].payload_len = payload_len;
844
845 return ops;
602adf40
YS
846}
847
848static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
849{
850 kfree(ops);
851}
852
1fec7093
YS
853static void rbd_coll_end_req_index(struct request *rq,
854 struct rbd_req_coll *coll,
855 int index,
856 int ret, u64 len)
857{
858 struct request_queue *q;
859 int min, max, i;
860
bd919d45
AE
861 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
862 coll, index, ret, (unsigned long long) len);
1fec7093
YS
863
864 if (!rq)
865 return;
866
867 if (!coll) {
868 blk_end_request(rq, ret, len);
869 return;
870 }
871
872 q = rq->q;
873
874 spin_lock_irq(q->queue_lock);
875 coll->status[index].done = 1;
876 coll->status[index].rc = ret;
877 coll->status[index].bytes = len;
878 max = min = coll->num_done;
879 while (max < coll->total && coll->status[max].done)
880 max++;
881
882 for (i = min; i<max; i++) {
883 __blk_end_request(rq, coll->status[i].rc,
884 coll->status[i].bytes);
885 coll->num_done++;
886 kref_put(&coll->kref, rbd_coll_release);
887 }
888 spin_unlock_irq(q->queue_lock);
889}
890
891static void rbd_coll_end_req(struct rbd_request *req,
892 int ret, u64 len)
893{
894 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
895}
896
602adf40
YS
897/*
898 * Send ceph osd request
899 */
900static int rbd_do_request(struct request *rq,
0ce1a794 901 struct rbd_device *rbd_dev,
602adf40
YS
902 struct ceph_snap_context *snapc,
903 u64 snapid,
aded07ea 904 const char *object_name, u64 ofs, u64 len,
602adf40
YS
905 struct bio *bio,
906 struct page **pages,
907 int num_pages,
908 int flags,
909 struct ceph_osd_req_op *ops,
1fec7093
YS
910 struct rbd_req_coll *coll,
911 int coll_index,
602adf40 912 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
913 struct ceph_msg *msg),
914 struct ceph_osd_request **linger_req,
915 u64 *ver)
602adf40
YS
916{
917 struct ceph_osd_request *req;
918 struct ceph_file_layout *layout;
919 int ret;
920 u64 bno;
921 struct timespec mtime = CURRENT_TIME;
922 struct rbd_request *req_data;
923 struct ceph_osd_request_head *reqhead;
1dbb4399 924 struct ceph_osd_client *osdc;
602adf40 925
602adf40 926 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
927 if (!req_data) {
928 if (coll)
929 rbd_coll_end_req_index(rq, coll, coll_index,
930 -ENOMEM, len);
931 return -ENOMEM;
932 }
933
934 if (coll) {
935 req_data->coll = coll;
936 req_data->coll_index = coll_index;
937 }
602adf40 938
bd919d45
AE
939 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
940 (unsigned long long) ofs, (unsigned long long) len);
602adf40 941
0ce1a794 942 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
943 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
944 false, GFP_NOIO, pages, bio);
4ad12621 945 if (!req) {
4ad12621 946 ret = -ENOMEM;
602adf40
YS
947 goto done_pages;
948 }
949
950 req->r_callback = rbd_cb;
951
952 req_data->rq = rq;
953 req_data->bio = bio;
954 req_data->pages = pages;
955 req_data->len = len;
956
957 req->r_priv = req_data;
958
959 reqhead = req->r_request->front.iov_base;
960 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
961
aded07ea 962 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
963 req->r_oid_len = strlen(req->r_oid);
964
965 layout = &req->r_file_layout;
966 memset(layout, 0, sizeof(*layout));
967 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
968 layout->fl_stripe_count = cpu_to_le32(1);
969 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 970 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
971 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
972 req, ops);
602adf40
YS
973
974 ceph_osdc_build_request(req, ofs, &len,
975 ops,
976 snapc,
977 &mtime,
978 req->r_oid, req->r_oid_len);
602adf40 979
59c2be1e 980 if (linger_req) {
1dbb4399 981 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
982 *linger_req = req;
983 }
984
1dbb4399 985 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
986 if (ret < 0)
987 goto done_err;
988
989 if (!rbd_cb) {
1dbb4399 990 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
991 if (ver)
992 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
993 dout("reassert_ver=%llu\n",
994 (unsigned long long)
995 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
996 ceph_osdc_put_request(req);
997 }
998 return ret;
999
1000done_err:
1001 bio_chain_put(req_data->bio);
1002 ceph_osdc_put_request(req);
1003done_pages:
1fec7093 1004 rbd_coll_end_req(req_data, ret, len);
602adf40 1005 kfree(req_data);
602adf40
YS
1006 return ret;
1007}
1008
1009/*
1010 * Ceph osd op callback
1011 */
1012static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1013{
1014 struct rbd_request *req_data = req->r_priv;
1015 struct ceph_osd_reply_head *replyhead;
1016 struct ceph_osd_op *op;
1017 __s32 rc;
1018 u64 bytes;
1019 int read_op;
1020
1021 /* parse reply */
1022 replyhead = msg->front.iov_base;
1023 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1024 op = (void *)(replyhead + 1);
1025 rc = le32_to_cpu(replyhead->result);
1026 bytes = le64_to_cpu(op->extent.length);
895cfcc8 1027 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 1028
bd919d45
AE
1029 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1030 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
1031
1032 if (rc == -ENOENT && read_op) {
1033 zero_bio_chain(req_data->bio, 0);
1034 rc = 0;
1035 } else if (rc == 0 && read_op && bytes < req_data->len) {
1036 zero_bio_chain(req_data->bio, bytes);
1037 bytes = req_data->len;
1038 }
1039
1fec7093 1040 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1041
1042 if (req_data->bio)
1043 bio_chain_put(req_data->bio);
1044
1045 ceph_osdc_put_request(req);
1046 kfree(req_data);
1047}
1048
59c2be1e
YS
1049static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1050{
1051 ceph_osdc_put_request(req);
1052}
1053
602adf40
YS
1054/*
1055 * Do a synchronous ceph osd operation
1056 */
0ce1a794 1057static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1058 struct ceph_snap_context *snapc,
1059 u64 snapid,
602adf40 1060 int flags,
913d2fdc 1061 struct ceph_osd_req_op *ops,
aded07ea 1062 const char *object_name,
602adf40 1063 u64 ofs, u64 len,
59c2be1e
YS
1064 char *buf,
1065 struct ceph_osd_request **linger_req,
1066 u64 *ver)
602adf40
YS
1067{
1068 int ret;
1069 struct page **pages;
1070 int num_pages;
913d2fdc
AE
1071
1072 BUG_ON(ops == NULL);
602adf40
YS
1073
1074 num_pages = calc_pages_for(ofs , len);
1075 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1076 if (IS_ERR(pages))
1077 return PTR_ERR(pages);
602adf40 1078
0ce1a794 1079 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1080 object_name, ofs, len, NULL,
602adf40
YS
1081 pages, num_pages,
1082 flags,
1083 ops,
1fec7093 1084 NULL, 0,
59c2be1e
YS
1085 NULL,
1086 linger_req, ver);
602adf40 1087 if (ret < 0)
913d2fdc 1088 goto done;
602adf40
YS
1089
1090 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1091 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1092
602adf40
YS
1093done:
1094 ceph_release_page_vector(pages, num_pages);
1095 return ret;
1096}
1097
1098/*
1099 * Do an asynchronous ceph osd operation
1100 */
1101static int rbd_do_op(struct request *rq,
0ce1a794 1102 struct rbd_device *rbd_dev,
602adf40
YS
1103 struct ceph_snap_context *snapc,
1104 u64 snapid,
d1f57ea6 1105 int opcode, int flags,
602adf40 1106 u64 ofs, u64 len,
1fec7093
YS
1107 struct bio *bio,
1108 struct rbd_req_coll *coll,
1109 int coll_index)
602adf40
YS
1110{
1111 char *seg_name;
1112 u64 seg_ofs;
1113 u64 seg_len;
1114 int ret;
1115 struct ceph_osd_req_op *ops;
1116 u32 payload_len;
1117
1118 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1119 if (!seg_name)
1120 return -ENOMEM;
1121
1122 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1123 rbd_dev->header.object_prefix,
602adf40
YS
1124 ofs, len,
1125 seg_name, &seg_ofs);
602adf40
YS
1126
1127 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1128
57cfc106
AE
1129 ret = -ENOMEM;
1130 ops = rbd_create_rw_ops(1, opcode, payload_len);
1131 if (!ops)
602adf40
YS
1132 goto done;
1133
1134 /* we've taken care of segment sizes earlier when we
1135 cloned the bios. We should never have a segment
1136 truncated at this point */
1137 BUG_ON(seg_len < len);
1138
1139 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1140 seg_name, seg_ofs, seg_len,
1141 bio,
1142 NULL, 0,
1143 flags,
1144 ops,
1fec7093 1145 coll, coll_index,
59c2be1e 1146 rbd_req_cb, 0, NULL);
11f77002
SW
1147
1148 rbd_destroy_ops(ops);
602adf40
YS
1149done:
1150 kfree(seg_name);
1151 return ret;
1152}
1153
1154/*
1155 * Request async osd write
1156 */
1157static int rbd_req_write(struct request *rq,
1158 struct rbd_device *rbd_dev,
1159 struct ceph_snap_context *snapc,
1160 u64 ofs, u64 len,
1fec7093
YS
1161 struct bio *bio,
1162 struct rbd_req_coll *coll,
1163 int coll_index)
602adf40
YS
1164{
1165 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1166 CEPH_OSD_OP_WRITE,
1167 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1168 ofs, len, bio, coll, coll_index);
602adf40
YS
1169}
1170
1171/*
1172 * Request async osd read
1173 */
1174static int rbd_req_read(struct request *rq,
1175 struct rbd_device *rbd_dev,
1176 u64 snapid,
1177 u64 ofs, u64 len,
1fec7093
YS
1178 struct bio *bio,
1179 struct rbd_req_coll *coll,
1180 int coll_index)
602adf40
YS
1181{
1182 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1183 snapid,
602adf40
YS
1184 CEPH_OSD_OP_READ,
1185 CEPH_OSD_FLAG_READ,
1fec7093 1186 ofs, len, bio, coll, coll_index);
602adf40
YS
1187}
1188
1189/*
1190 * Request sync osd read
1191 */
0ce1a794 1192static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40 1193 u64 snapid,
aded07ea 1194 const char *object_name,
602adf40 1195 u64 ofs, u64 len,
59c2be1e
YS
1196 char *buf,
1197 u64 *ver)
602adf40 1198{
913d2fdc
AE
1199 struct ceph_osd_req_op *ops;
1200 int ret;
1201
1202 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1203 if (!ops)
1204 return -ENOMEM;
1205
1206 ret = rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1207 snapid,
602adf40 1208 CEPH_OSD_FLAG_READ,
913d2fdc
AE
1209 ops, object_name, ofs, len, buf, NULL, ver);
1210 rbd_destroy_ops(ops);
1211
1212 return ret;
602adf40
YS
1213}
1214
1215/*
59c2be1e
YS
1216 * Request sync osd watch
1217 */
0ce1a794 1218static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1219 u64 ver,
7f0a24d8 1220 u64 notify_id)
59c2be1e
YS
1221{
1222 struct ceph_osd_req_op *ops;
11f77002
SW
1223 int ret;
1224
57cfc106
AE
1225 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1226 if (!ops)
1227 return -ENOMEM;
59c2be1e 1228
a71b891b 1229 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1230 ops[0].watch.cookie = notify_id;
1231 ops[0].watch.flag = 0;
1232
0ce1a794 1233 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1234 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1235 NULL, 0,
59c2be1e
YS
1236 CEPH_OSD_FLAG_READ,
1237 ops,
1fec7093 1238 NULL, 0,
59c2be1e
YS
1239 rbd_simple_req_cb, 0, NULL);
1240
1241 rbd_destroy_ops(ops);
1242 return ret;
1243}
1244
1245static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1246{
0ce1a794 1247 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1248 u64 hver;
13143d2d
SW
1249 int rc;
1250
0ce1a794 1251 if (!rbd_dev)
59c2be1e
YS
1252 return;
1253
bd919d45
AE
1254 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1255 rbd_dev->header_name, (unsigned long long) notify_id,
1256 (unsigned int) opcode);
1fe5e993 1257 rc = rbd_refresh_header(rbd_dev, &hver);
13143d2d 1258 if (rc)
f0f8cef5 1259 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1260 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1261
7f0a24d8 1262 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1263}
1264
1265/*
1266 * Request sync osd watch
1267 */
0e6f322d 1268static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
59c2be1e
YS
1269{
1270 struct ceph_osd_req_op *ops;
0ce1a794 1271 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
57cfc106 1272 int ret;
59c2be1e 1273
57cfc106
AE
1274 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1275 if (!ops)
1276 return -ENOMEM;
59c2be1e
YS
1277
1278 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1279 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1280 if (ret < 0)
1281 goto fail;
1282
0e6f322d 1283 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
0ce1a794 1284 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1285 ops[0].watch.flag = 1;
1286
0ce1a794 1287 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1288 CEPH_NOSNAP,
59c2be1e
YS
1289 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1290 ops,
0e6f322d
AE
1291 rbd_dev->header_name,
1292 0, 0, NULL,
0ce1a794 1293 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1294
1295 if (ret < 0)
1296 goto fail_event;
1297
1298 rbd_destroy_ops(ops);
1299 return 0;
1300
1301fail_event:
0ce1a794
AE
1302 ceph_osdc_cancel_event(rbd_dev->watch_event);
1303 rbd_dev->watch_event = NULL;
59c2be1e
YS
1304fail:
1305 rbd_destroy_ops(ops);
1306 return ret;
1307}
1308
79e3057c
YS
1309/*
1310 * Request sync osd unwatch
1311 */
070c633f 1312static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
79e3057c
YS
1313{
1314 struct ceph_osd_req_op *ops;
57cfc106 1315 int ret;
79e3057c 1316
57cfc106
AE
1317 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1318 if (!ops)
1319 return -ENOMEM;
79e3057c
YS
1320
1321 ops[0].watch.ver = 0;
0ce1a794 1322 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1323 ops[0].watch.flag = 0;
1324
0ce1a794 1325 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c 1326 CEPH_NOSNAP,
79e3057c
YS
1327 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1328 ops,
070c633f
AE
1329 rbd_dev->header_name,
1330 0, 0, NULL, NULL, NULL);
1331
79e3057c
YS
1332
1333 rbd_destroy_ops(ops);
0ce1a794
AE
1334 ceph_osdc_cancel_event(rbd_dev->watch_event);
1335 rbd_dev->watch_event = NULL;
79e3057c
YS
1336 return ret;
1337}
1338
59c2be1e 1339struct rbd_notify_info {
0ce1a794 1340 struct rbd_device *rbd_dev;
59c2be1e
YS
1341};
1342
1343static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1344{
0ce1a794
AE
1345 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1346 if (!rbd_dev)
59c2be1e
YS
1347 return;
1348
bd919d45
AE
1349 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1350 rbd_dev->header_name, (unsigned long long) notify_id,
1351 (unsigned int) opcode);
59c2be1e
YS
1352}
1353
1354/*
1355 * Request sync osd notify
1356 */
4cb16250 1357static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
59c2be1e
YS
1358{
1359 struct ceph_osd_req_op *ops;
0ce1a794 1360 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1361 struct ceph_osd_event *event;
1362 struct rbd_notify_info info;
1363 int payload_len = sizeof(u32) + sizeof(u32);
1364 int ret;
1365
57cfc106
AE
1366 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1367 if (!ops)
1368 return -ENOMEM;
59c2be1e 1369
0ce1a794 1370 info.rbd_dev = rbd_dev;
59c2be1e
YS
1371
1372 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1373 (void *)&info, &event);
1374 if (ret < 0)
1375 goto fail;
1376
1377 ops[0].watch.ver = 1;
1378 ops[0].watch.flag = 1;
1379 ops[0].watch.cookie = event->cookie;
1380 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1381 ops[0].watch.timeout = 12;
1382
0ce1a794 1383 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1384 CEPH_NOSNAP,
59c2be1e
YS
1385 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1386 ops,
4cb16250
AE
1387 rbd_dev->header_name,
1388 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1389 if (ret < 0)
1390 goto fail_event;
1391
1392 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1393 dout("ceph_osdc_wait_event returned %d\n", ret);
1394 rbd_destroy_ops(ops);
1395 return 0;
1396
1397fail_event:
1398 ceph_osdc_cancel_event(event);
1399fail:
1400 rbd_destroy_ops(ops);
1401 return ret;
1402}
1403
602adf40
YS
1404/*
1405 * Request sync osd read
1406 */
0ce1a794 1407static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1408 const char *object_name,
1409 const char *class_name,
1410 const char *method_name,
602adf40 1411 const char *data,
59c2be1e
YS
1412 int len,
1413 u64 *ver)
602adf40
YS
1414{
1415 struct ceph_osd_req_op *ops;
aded07ea
AE
1416 int class_name_len = strlen(class_name);
1417 int method_name_len = strlen(method_name);
57cfc106
AE
1418 int ret;
1419
1420 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
aded07ea 1421 class_name_len + method_name_len + len);
57cfc106
AE
1422 if (!ops)
1423 return -ENOMEM;
602adf40 1424
aded07ea
AE
1425 ops[0].cls.class_name = class_name;
1426 ops[0].cls.class_len = (__u8) class_name_len;
1427 ops[0].cls.method_name = method_name;
1428 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1429 ops[0].cls.argc = 0;
1430 ops[0].cls.indata = data;
1431 ops[0].cls.indata_len = len;
1432
0ce1a794 1433 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40 1434 CEPH_NOSNAP,
602adf40
YS
1435 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1436 ops,
d1f57ea6 1437 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1438
1439 rbd_destroy_ops(ops);
1440
1441 dout("cls_exec returned %d\n", ret);
1442 return ret;
1443}
1444
1fec7093
YS
1445static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1446{
1447 struct rbd_req_coll *coll =
1448 kzalloc(sizeof(struct rbd_req_coll) +
1449 sizeof(struct rbd_req_status) * num_reqs,
1450 GFP_ATOMIC);
1451
1452 if (!coll)
1453 return NULL;
1454 coll->total = num_reqs;
1455 kref_init(&coll->kref);
1456 return coll;
1457}
1458
602adf40
YS
1459/*
1460 * block device queue callback
1461 */
1462static void rbd_rq_fn(struct request_queue *q)
1463{
1464 struct rbd_device *rbd_dev = q->queuedata;
1465 struct request *rq;
1466 struct bio_pair *bp = NULL;
1467
00f1f36f 1468 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1469 struct bio *bio;
1470 struct bio *rq_bio, *next_bio = NULL;
1471 bool do_write;
bd919d45
AE
1472 unsigned int size;
1473 u64 op_size = 0;
602adf40 1474 u64 ofs;
1fec7093
YS
1475 int num_segs, cur_seg = 0;
1476 struct rbd_req_coll *coll;
d1d25646 1477 struct ceph_snap_context *snapc;
602adf40
YS
1478
1479 /* peek at request from block layer */
1480 if (!rq)
1481 break;
1482
1483 dout("fetched request\n");
1484
1485 /* filter out block requests we don't understand */
1486 if ((rq->cmd_type != REQ_TYPE_FS)) {
1487 __blk_end_request_all(rq, 0);
00f1f36f 1488 continue;
602adf40
YS
1489 }
1490
1491 /* deduce our operation (read, write) */
1492 do_write = (rq_data_dir(rq) == WRITE);
1493
1494 size = blk_rq_bytes(rq);
593a9e7b 1495 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1496 rq_bio = rq->bio;
1497 if (do_write && rbd_dev->read_only) {
1498 __blk_end_request_all(rq, -EROFS);
00f1f36f 1499 continue;
602adf40
YS
1500 }
1501
1502 spin_unlock_irq(q->queue_lock);
1503
d1d25646 1504 down_read(&rbd_dev->header_rwsem);
e88a36ec 1505
d1d25646 1506 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1507 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1508 dout("request for non-existent snapshot");
1509 spin_lock_irq(q->queue_lock);
1510 __blk_end_request_all(rq, -ENXIO);
1511 continue;
e88a36ec
JD
1512 }
1513
d1d25646
JD
1514 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1515
1516 up_read(&rbd_dev->header_rwsem);
1517
602adf40
YS
1518 dout("%s 0x%x bytes at 0x%llx\n",
1519 do_write ? "write" : "read",
bd919d45 1520 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1521
1fec7093
YS
1522 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1523 coll = rbd_alloc_coll(num_segs);
1524 if (!coll) {
1525 spin_lock_irq(q->queue_lock);
1526 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1527 ceph_put_snap_context(snapc);
00f1f36f 1528 continue;
1fec7093
YS
1529 }
1530
602adf40
YS
1531 do {
1532 /* a bio clone to be passed down to OSD req */
bd919d45 1533 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
602adf40 1534 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1535 rbd_dev->header.object_prefix,
602adf40
YS
1536 ofs, size,
1537 NULL, NULL);
1fec7093 1538 kref_get(&coll->kref);
602adf40
YS
1539 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1540 op_size, GFP_ATOMIC);
1541 if (!bio) {
1fec7093
YS
1542 rbd_coll_end_req_index(rq, coll, cur_seg,
1543 -ENOMEM, op_size);
1544 goto next_seg;
602adf40
YS
1545 }
1546
1fec7093 1547
602adf40
YS
1548 /* init OSD command: write or read */
1549 if (do_write)
1550 rbd_req_write(rq, rbd_dev,
d1d25646 1551 snapc,
602adf40 1552 ofs,
1fec7093
YS
1553 op_size, bio,
1554 coll, cur_seg);
602adf40
YS
1555 else
1556 rbd_req_read(rq, rbd_dev,
77dfe99f 1557 rbd_dev->snap_id,
602adf40 1558 ofs,
1fec7093
YS
1559 op_size, bio,
1560 coll, cur_seg);
602adf40 1561
1fec7093 1562next_seg:
602adf40
YS
1563 size -= op_size;
1564 ofs += op_size;
1565
1fec7093 1566 cur_seg++;
602adf40
YS
1567 rq_bio = next_bio;
1568 } while (size > 0);
1fec7093 1569 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1570
1571 if (bp)
1572 bio_pair_release(bp);
602adf40 1573 spin_lock_irq(q->queue_lock);
d1d25646
JD
1574
1575 ceph_put_snap_context(snapc);
602adf40
YS
1576 }
1577}
1578
1579/*
1580 * a queue callback. Makes sure that we don't create a bio that spans across
1581 * multiple osd objects. One exception would be with a single page bios,
1582 * which we handle later at bio_chain_clone
1583 */
1584static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1585 struct bio_vec *bvec)
1586{
1587 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1588 unsigned int chunk_sectors;
1589 sector_t sector;
1590 unsigned int bio_sectors;
602adf40
YS
1591 int max;
1592
593a9e7b
AE
1593 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1594 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1595 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1596
602adf40 1597 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1598 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1599 if (max < 0)
1600 max = 0; /* bio_add cannot handle a negative return */
1601 if (max <= bvec->bv_len && bio_sectors == 0)
1602 return bvec->bv_len;
1603 return max;
1604}
1605
1606static void rbd_free_disk(struct rbd_device *rbd_dev)
1607{
1608 struct gendisk *disk = rbd_dev->disk;
1609
1610 if (!disk)
1611 return;
1612
1613 rbd_header_free(&rbd_dev->header);
1614
1615 if (disk->flags & GENHD_FL_UP)
1616 del_gendisk(disk);
1617 if (disk->queue)
1618 blk_cleanup_queue(disk->queue);
1619 put_disk(disk);
1620}
1621
1622/*
4156d998
AE
1623 * Read the complete header for the given rbd device.
1624 *
1625 * Returns a pointer to a dynamically-allocated buffer containing
1626 * the complete and validated header. Caller can pass the address
1627 * of a variable that will be filled in with the version of the
1628 * header object at the time it was read.
1629 *
1630 * Returns a pointer-coded errno if a failure occurs.
602adf40 1631 */
4156d998
AE
1632static struct rbd_image_header_ondisk *
1633rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 1634{
4156d998 1635 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 1636 u32 snap_count = 0;
4156d998
AE
1637 u64 names_size = 0;
1638 u32 want_count;
1639 int ret;
602adf40 1640
00f1f36f 1641 /*
4156d998
AE
1642 * The complete header will include an array of its 64-bit
1643 * snapshot ids, followed by the names of those snapshots as
1644 * a contiguous block of NUL-terminated strings. Note that
1645 * the number of snapshots could change by the time we read
1646 * it in, in which case we re-read it.
00f1f36f 1647 */
4156d998
AE
1648 do {
1649 size_t size;
1650
1651 kfree(ondisk);
1652
1653 size = sizeof (*ondisk);
1654 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1655 size += names_size;
1656 ondisk = kmalloc(size, GFP_KERNEL);
1657 if (!ondisk)
1658 return ERR_PTR(-ENOMEM);
1659
1660 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
0bed54dc 1661 rbd_dev->header_name,
4156d998
AE
1662 0, size,
1663 (char *) ondisk, version);
1664
1665 if (ret < 0)
1666 goto out_err;
1667 if (WARN_ON((size_t) ret < size)) {
1668 ret = -ENXIO;
1669 pr_warning("short header read for image %s"
1670 " (want %zd got %d)\n",
1671 rbd_dev->image_name, size, ret);
1672 goto out_err;
1673 }
1674 if (!rbd_dev_ondisk_valid(ondisk)) {
1675 ret = -ENXIO;
1676 pr_warning("invalid header for image %s\n",
1677 rbd_dev->image_name);
1678 goto out_err;
81e759fb 1679 }
602adf40 1680
4156d998
AE
1681 names_size = le64_to_cpu(ondisk->snap_names_len);
1682 want_count = snap_count;
1683 snap_count = le32_to_cpu(ondisk->snap_count);
1684 } while (snap_count != want_count);
00f1f36f 1685
4156d998 1686 return ondisk;
00f1f36f 1687
4156d998
AE
1688out_err:
1689 kfree(ondisk);
1690
1691 return ERR_PTR(ret);
1692}
1693
1694/*
1695 * reload the ondisk the header
1696 */
1697static int rbd_read_header(struct rbd_device *rbd_dev,
1698 struct rbd_image_header *header)
1699{
1700 struct rbd_image_header_ondisk *ondisk;
1701 u64 ver = 0;
1702 int ret;
602adf40 1703
4156d998
AE
1704 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1705 if (IS_ERR(ondisk))
1706 return PTR_ERR(ondisk);
1707 ret = rbd_header_from_disk(header, ondisk);
1708 if (ret >= 0)
1709 header->obj_version = ver;
1710 kfree(ondisk);
1711
1712 return ret;
602adf40
YS
1713}
1714
1715/*
1716 * create a snapshot
1717 */
0ce1a794 1718static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1719 const char *snap_name,
1720 gfp_t gfp_flags)
1721{
1722 int name_len = strlen(snap_name);
1723 u64 new_snapid;
1724 int ret;
916d4d67 1725 void *data, *p, *e;
1dbb4399 1726 struct ceph_mon_client *monc;
602adf40
YS
1727
1728 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1729 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1730 return -EINVAL;
1731
0ce1a794
AE
1732 monc = &rbd_dev->rbd_client->client->monc;
1733 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1734 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1735 if (ret < 0)
1736 return ret;
1737
1738 data = kmalloc(name_len + 16, gfp_flags);
1739 if (!data)
1740 return -ENOMEM;
1741
916d4d67
SW
1742 p = data;
1743 e = data + name_len + 16;
602adf40 1744
916d4d67
SW
1745 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1746 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1747
0bed54dc 1748 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1749 "rbd", "snap_add",
d67d4be5 1750 data, p - data, NULL);
602adf40 1751
916d4d67 1752 kfree(data);
602adf40 1753
505cbb9b 1754 return ret < 0 ? ret : 0;
602adf40
YS
1755bad:
1756 return -ERANGE;
1757}
1758
dfc5606d
YS
1759static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1760{
1761 struct rbd_snap *snap;
a0593290 1762 struct rbd_snap *next;
dfc5606d 1763
a0593290 1764 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
14e7085d 1765 __rbd_remove_snap_dev(snap);
dfc5606d
YS
1766}
1767
602adf40
YS
1768/*
1769 * only read the first part of the ondisk header, without the snaps info
1770 */
b813623a 1771static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
1772{
1773 int ret;
1774 struct rbd_image_header h;
602adf40
YS
1775
1776 ret = rbd_read_header(rbd_dev, &h);
1777 if (ret < 0)
1778 return ret;
1779
a51aa0c0
JD
1780 down_write(&rbd_dev->header_rwsem);
1781
9db4b3e3 1782 /* resized? */
474ef7ce
JD
1783 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1784 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1785
1786 dout("setting size to %llu sectors", (unsigned long long) size);
1787 set_capacity(rbd_dev->disk, size);
1788 }
9db4b3e3 1789
849b4260 1790 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1791 kfree(rbd_dev->header.snap_sizes);
849b4260 1792 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1793 /* osd requests may still refer to snapc */
1794 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1795
b813623a
AE
1796 if (hver)
1797 *hver = h.obj_version;
a71b891b 1798 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1799 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1800 rbd_dev->header.total_snaps = h.total_snaps;
1801 rbd_dev->header.snapc = h.snapc;
1802 rbd_dev->header.snap_names = h.snap_names;
1803 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1804 /* Free the extra copy of the object prefix */
1805 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1806 kfree(h.object_prefix);
1807
dfc5606d
YS
1808 ret = __rbd_init_snaps_header(rbd_dev);
1809
c666601a 1810 up_write(&rbd_dev->header_rwsem);
602adf40 1811
dfc5606d 1812 return ret;
602adf40
YS
1813}
1814
1fe5e993
AE
1815static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1816{
1817 int ret;
1818
1819 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1820 ret = __rbd_refresh_header(rbd_dev, hver);
1821 mutex_unlock(&ctl_mutex);
1822
1823 return ret;
1824}
1825
602adf40
YS
1826static int rbd_init_disk(struct rbd_device *rbd_dev)
1827{
1828 struct gendisk *disk;
1829 struct request_queue *q;
1830 int rc;
593a9e7b 1831 u64 segment_size;
602adf40
YS
1832 u64 total_size = 0;
1833
1834 /* contact OSD, request size info about the object being mapped */
1835 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1836 if (rc)
1837 return rc;
1838
dfc5606d
YS
1839 /* no need to lock here, as rbd_dev is not registered yet */
1840 rc = __rbd_init_snaps_header(rbd_dev);
1841 if (rc)
1842 return rc;
1843
cc9d734c 1844 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1845 if (rc)
1846 return rc;
1847
1848 /* create gendisk info */
1849 rc = -ENOMEM;
1850 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1851 if (!disk)
1852 goto out;
1853
f0f8cef5 1854 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 1855 rbd_dev->dev_id);
602adf40
YS
1856 disk->major = rbd_dev->major;
1857 disk->first_minor = 0;
1858 disk->fops = &rbd_bd_ops;
1859 disk->private_data = rbd_dev;
1860
1861 /* init rq */
1862 rc = -ENOMEM;
1863 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1864 if (!q)
1865 goto out_disk;
029bcbd8 1866
593a9e7b
AE
1867 /* We use the default size, but let's be explicit about it. */
1868 blk_queue_physical_block_size(q, SECTOR_SIZE);
1869
029bcbd8 1870 /* set io sizes to object size */
593a9e7b
AE
1871 segment_size = rbd_obj_bytes(&rbd_dev->header);
1872 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1873 blk_queue_max_segment_size(q, segment_size);
1874 blk_queue_io_min(q, segment_size);
1875 blk_queue_io_opt(q, segment_size);
029bcbd8 1876
602adf40
YS
1877 blk_queue_merge_bvec(q, rbd_merge_bvec);
1878 disk->queue = q;
1879
1880 q->queuedata = rbd_dev;
1881
1882 rbd_dev->disk = disk;
1883 rbd_dev->q = q;
1884
1885 /* finally, announce the disk to the world */
593a9e7b 1886 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1887 add_disk(disk);
1888
1889 pr_info("%s: added with size 0x%llx\n",
1890 disk->disk_name, (unsigned long long)total_size);
1891 return 0;
1892
1893out_disk:
1894 put_disk(disk);
1895out:
1896 return rc;
1897}
1898
dfc5606d
YS
1899/*
1900 sysfs
1901*/
1902
593a9e7b
AE
1903static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1904{
1905 return container_of(dev, struct rbd_device, dev);
1906}
1907
dfc5606d
YS
1908static ssize_t rbd_size_show(struct device *dev,
1909 struct device_attribute *attr, char *buf)
1910{
593a9e7b 1911 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1912 sector_t size;
1913
1914 down_read(&rbd_dev->header_rwsem);
1915 size = get_capacity(rbd_dev->disk);
1916 up_read(&rbd_dev->header_rwsem);
dfc5606d 1917
a51aa0c0 1918 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1919}
1920
1921static ssize_t rbd_major_show(struct device *dev,
1922 struct device_attribute *attr, char *buf)
1923{
593a9e7b 1924 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1925
dfc5606d
YS
1926 return sprintf(buf, "%d\n", rbd_dev->major);
1927}
1928
1929static ssize_t rbd_client_id_show(struct device *dev,
1930 struct device_attribute *attr, char *buf)
602adf40 1931{
593a9e7b 1932 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1933
1dbb4399
AE
1934 return sprintf(buf, "client%lld\n",
1935 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1936}
1937
dfc5606d
YS
1938static ssize_t rbd_pool_show(struct device *dev,
1939 struct device_attribute *attr, char *buf)
602adf40 1940{
593a9e7b 1941 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1942
1943 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1944}
1945
9bb2f334
AE
1946static ssize_t rbd_pool_id_show(struct device *dev,
1947 struct device_attribute *attr, char *buf)
1948{
1949 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1950
1951 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1952}
1953
dfc5606d
YS
1954static ssize_t rbd_name_show(struct device *dev,
1955 struct device_attribute *attr, char *buf)
1956{
593a9e7b 1957 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1958
0bed54dc 1959 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1960}
1961
1962static ssize_t rbd_snap_show(struct device *dev,
1963 struct device_attribute *attr,
1964 char *buf)
1965{
593a9e7b 1966 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1967
1968 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1969}
1970
1971static ssize_t rbd_image_refresh(struct device *dev,
1972 struct device_attribute *attr,
1973 const char *buf,
1974 size_t size)
1975{
593a9e7b 1976 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 1977 int ret;
602adf40 1978
1fe5e993 1979 ret = rbd_refresh_header(rbd_dev, NULL);
b813623a
AE
1980
1981 return ret < 0 ? ret : size;
dfc5606d 1982}
602adf40 1983
dfc5606d
YS
1984static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1985static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1986static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1987static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1988static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1989static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1990static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1991static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1992static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1993
1994static struct attribute *rbd_attrs[] = {
1995 &dev_attr_size.attr,
1996 &dev_attr_major.attr,
1997 &dev_attr_client_id.attr,
1998 &dev_attr_pool.attr,
9bb2f334 1999 &dev_attr_pool_id.attr,
dfc5606d
YS
2000 &dev_attr_name.attr,
2001 &dev_attr_current_snap.attr,
2002 &dev_attr_refresh.attr,
2003 &dev_attr_create_snap.attr,
dfc5606d
YS
2004 NULL
2005};
2006
2007static struct attribute_group rbd_attr_group = {
2008 .attrs = rbd_attrs,
2009};
2010
2011static const struct attribute_group *rbd_attr_groups[] = {
2012 &rbd_attr_group,
2013 NULL
2014};
2015
2016static void rbd_sysfs_dev_release(struct device *dev)
2017{
2018}
2019
2020static struct device_type rbd_device_type = {
2021 .name = "rbd",
2022 .groups = rbd_attr_groups,
2023 .release = rbd_sysfs_dev_release,
2024};
2025
2026
2027/*
2028 sysfs - snapshots
2029*/
2030
2031static ssize_t rbd_snap_size_show(struct device *dev,
2032 struct device_attribute *attr,
2033 char *buf)
2034{
2035 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2036
3591538f 2037 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2038}
2039
2040static ssize_t rbd_snap_id_show(struct device *dev,
2041 struct device_attribute *attr,
2042 char *buf)
2043{
2044 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2045
3591538f 2046 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2047}
2048
2049static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2050static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2051
2052static struct attribute *rbd_snap_attrs[] = {
2053 &dev_attr_snap_size.attr,
2054 &dev_attr_snap_id.attr,
2055 NULL,
2056};
2057
2058static struct attribute_group rbd_snap_attr_group = {
2059 .attrs = rbd_snap_attrs,
2060};
2061
2062static void rbd_snap_dev_release(struct device *dev)
2063{
2064 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2065 kfree(snap->name);
2066 kfree(snap);
2067}
2068
2069static const struct attribute_group *rbd_snap_attr_groups[] = {
2070 &rbd_snap_attr_group,
2071 NULL
2072};
2073
2074static struct device_type rbd_snap_device_type = {
2075 .groups = rbd_snap_attr_groups,
2076 .release = rbd_snap_dev_release,
2077};
2078
14e7085d 2079static void __rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2080{
2081 list_del(&snap->node);
2082 device_unregister(&snap->dev);
2083}
2084
14e7085d 2085static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2086 struct device *parent)
2087{
2088 struct device *dev = &snap->dev;
2089 int ret;
2090
2091 dev->type = &rbd_snap_device_type;
2092 dev->parent = parent;
2093 dev->release = rbd_snap_dev_release;
2094 dev_set_name(dev, "snap_%s", snap->name);
2095 ret = device_register(dev);
2096
2097 return ret;
2098}
2099
4e891e0a
AE
2100static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2101 int i, const char *name)
dfc5606d 2102{
4e891e0a 2103 struct rbd_snap *snap;
dfc5606d 2104 int ret;
4e891e0a
AE
2105
2106 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2107 if (!snap)
4e891e0a
AE
2108 return ERR_PTR(-ENOMEM);
2109
2110 ret = -ENOMEM;
dfc5606d 2111 snap->name = kstrdup(name, GFP_KERNEL);
4e891e0a
AE
2112 if (!snap->name)
2113 goto err;
2114
dfc5606d
YS
2115 snap->size = rbd_dev->header.snap_sizes[i];
2116 snap->id = rbd_dev->header.snapc->snaps[i];
2117 if (device_is_registered(&rbd_dev->dev)) {
14e7085d 2118 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d
YS
2119 if (ret < 0)
2120 goto err;
2121 }
4e891e0a
AE
2122
2123 return snap;
2124
dfc5606d
YS
2125err:
2126 kfree(snap->name);
2127 kfree(snap);
4e891e0a
AE
2128
2129 return ERR_PTR(ret);
dfc5606d
YS
2130}
2131
2132/*
35938150
AE
2133 * Scan the rbd device's current snapshot list and compare it to the
2134 * newly-received snapshot context. Remove any existing snapshots
2135 * not present in the new snapshot context. Add a new snapshot for
2136 * any snaphots in the snapshot context not in the current list.
2137 * And verify there are no changes to snapshots we already know
2138 * about.
2139 *
2140 * Assumes the snapshots in the snapshot context are sorted by
2141 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2142 * are also maintained in that order.)
dfc5606d
YS
2143 */
2144static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2145{
35938150
AE
2146 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2147 const u32 snap_count = snapc->num_snaps;
2148 char *snap_name = rbd_dev->header.snap_names;
2149 struct list_head *head = &rbd_dev->snaps;
2150 struct list_head *links = head->next;
2151 u32 index = 0;
dfc5606d 2152
35938150
AE
2153 while (index < snap_count || links != head) {
2154 u64 snap_id;
2155 struct rbd_snap *snap;
dfc5606d 2156
35938150
AE
2157 snap_id = index < snap_count ? snapc->snaps[index]
2158 : CEPH_NOSNAP;
2159 snap = links != head ? list_entry(links, struct rbd_snap, node)
2160 : NULL;
2161 BUG_ON(snap && snap->id == CEPH_NOSNAP);
dfc5606d 2162
35938150
AE
2163 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2164 struct list_head *next = links->next;
dfc5606d 2165
35938150 2166 /* Existing snapshot not in the new snap context */
dfc5606d 2167
35938150 2168 if (rbd_dev->snap_id == snap->id)
e88a36ec 2169 rbd_dev->snap_exists = false;
35938150
AE
2170 __rbd_remove_snap_dev(snap);
2171
2172 /* Done with this list entry; advance */
2173
2174 links = next;
dfc5606d
YS
2175 continue;
2176 }
35938150
AE
2177
2178 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2179 struct rbd_snap *new_snap;
2180
2181 /* We haven't seen this snapshot before */
2182
2183 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2184 snap_name);
2185 if (IS_ERR(new_snap))
2186 return PTR_ERR(new_snap);
2187
2188 /* New goes before existing, or at end of list */
2189
2190 if (snap)
2191 list_add_tail(&new_snap->node, &snap->node);
2192 else
523f3258 2193 list_add_tail(&new_snap->node, head);
35938150
AE
2194 } else {
2195 /* Already have this one */
2196
2197 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2198 BUG_ON(strcmp(snap->name, snap_name));
2199
2200 /* Done with this list entry; advance */
2201
2202 links = links->next;
dfc5606d 2203 }
35938150
AE
2204
2205 /* Advance to the next entry in the snapshot context */
2206
2207 index++;
2208 snap_name += strlen(snap_name) + 1;
dfc5606d
YS
2209 }
2210
2211 return 0;
2212}
2213
dfc5606d
YS
2214static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2215{
f0f8cef5 2216 int ret;
dfc5606d
YS
2217 struct device *dev;
2218 struct rbd_snap *snap;
2219
2220 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2221 dev = &rbd_dev->dev;
2222
2223 dev->bus = &rbd_bus_type;
2224 dev->type = &rbd_device_type;
2225 dev->parent = &rbd_root_dev;
2226 dev->release = rbd_dev_release;
de71a297 2227 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d
YS
2228 ret = device_register(dev);
2229 if (ret < 0)
f0f8cef5 2230 goto out;
dfc5606d
YS
2231
2232 list_for_each_entry(snap, &rbd_dev->snaps, node) {
14e7085d 2233 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d 2234 if (ret < 0)
602adf40
YS
2235 break;
2236 }
f0f8cef5 2237out:
dfc5606d
YS
2238 mutex_unlock(&ctl_mutex);
2239 return ret;
602adf40
YS
2240}
2241
dfc5606d
YS
2242static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2243{
2244 device_unregister(&rbd_dev->dev);
2245}
2246
59c2be1e
YS
2247static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2248{
2249 int ret, rc;
2250
2251 do {
0e6f322d 2252 ret = rbd_req_sync_watch(rbd_dev);
59c2be1e 2253 if (ret == -ERANGE) {
1fe5e993 2254 rc = rbd_refresh_header(rbd_dev, NULL);
59c2be1e
YS
2255 if (rc < 0)
2256 return rc;
2257 }
2258 } while (ret == -ERANGE);
2259
2260 return ret;
2261}
2262
1ddbe94e
AE
2263static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2264
2265/*
499afd5b
AE
2266 * Get a unique rbd identifier for the given new rbd_dev, and add
2267 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2268 */
499afd5b 2269static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2270{
de71a297 2271 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
499afd5b
AE
2272
2273 spin_lock(&rbd_dev_list_lock);
2274 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2275 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2276}
b7f23c36 2277
1ddbe94e 2278/*
499afd5b
AE
2279 * Remove an rbd_dev from the global list, and record that its
2280 * identifier is no longer in use.
1ddbe94e 2281 */
499afd5b 2282static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2283{
d184f6bf 2284 struct list_head *tmp;
de71a297 2285 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
2286 int max_id;
2287
2288 BUG_ON(rbd_id < 1);
499afd5b
AE
2289
2290 spin_lock(&rbd_dev_list_lock);
2291 list_del_init(&rbd_dev->node);
d184f6bf
AE
2292
2293 /*
2294 * If the id being "put" is not the current maximum, there
2295 * is nothing special we need to do.
2296 */
2297 if (rbd_id != atomic64_read(&rbd_id_max)) {
2298 spin_unlock(&rbd_dev_list_lock);
2299 return;
2300 }
2301
2302 /*
2303 * We need to update the current maximum id. Search the
2304 * list to find out what it is. We're more likely to find
2305 * the maximum at the end, so search the list backward.
2306 */
2307 max_id = 0;
2308 list_for_each_prev(tmp, &rbd_dev_list) {
2309 struct rbd_device *rbd_dev;
2310
2311 rbd_dev = list_entry(tmp, struct rbd_device, node);
2312 if (rbd_id > max_id)
2313 max_id = rbd_id;
2314 }
499afd5b 2315 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2316
1ddbe94e 2317 /*
d184f6bf
AE
2318 * The max id could have been updated by rbd_id_get(), in
2319 * which case it now accurately reflects the new maximum.
2320 * Be careful not to overwrite the maximum value in that
2321 * case.
1ddbe94e 2322 */
d184f6bf 2323 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2324}
2325
e28fff26
AE
2326/*
2327 * Skips over white space at *buf, and updates *buf to point to the
2328 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2329 * the token (string of non-white space characters) found. Note
2330 * that *buf must be terminated with '\0'.
e28fff26
AE
2331 */
2332static inline size_t next_token(const char **buf)
2333{
2334 /*
2335 * These are the characters that produce nonzero for
2336 * isspace() in the "C" and "POSIX" locales.
2337 */
2338 const char *spaces = " \f\n\r\t\v";
2339
2340 *buf += strspn(*buf, spaces); /* Find start of token */
2341
2342 return strcspn(*buf, spaces); /* Return token length */
2343}
2344
2345/*
2346 * Finds the next token in *buf, and if the provided token buffer is
2347 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2348 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2349 * must be terminated with '\0' on entry.
e28fff26
AE
2350 *
2351 * Returns the length of the token found (not including the '\0').
2352 * Return value will be 0 if no token is found, and it will be >=
2353 * token_size if the token would not fit.
2354 *
593a9e7b 2355 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2356 * found token. Note that this occurs even if the token buffer is
2357 * too small to hold it.
2358 */
2359static inline size_t copy_token(const char **buf,
2360 char *token,
2361 size_t token_size)
2362{
2363 size_t len;
2364
2365 len = next_token(buf);
2366 if (len < token_size) {
2367 memcpy(token, *buf, len);
2368 *(token + len) = '\0';
2369 }
2370 *buf += len;
2371
2372 return len;
2373}
2374
ea3352f4
AE
2375/*
2376 * Finds the next token in *buf, dynamically allocates a buffer big
2377 * enough to hold a copy of it, and copies the token into the new
2378 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2379 * that a duplicate buffer is created even for a zero-length token.
2380 *
2381 * Returns a pointer to the newly-allocated duplicate, or a null
2382 * pointer if memory for the duplicate was not available. If
2383 * the lenp argument is a non-null pointer, the length of the token
2384 * (not including the '\0') is returned in *lenp.
2385 *
2386 * If successful, the *buf pointer will be updated to point beyond
2387 * the end of the found token.
2388 *
2389 * Note: uses GFP_KERNEL for allocation.
2390 */
2391static inline char *dup_token(const char **buf, size_t *lenp)
2392{
2393 char *dup;
2394 size_t len;
2395
2396 len = next_token(buf);
2397 dup = kmalloc(len + 1, GFP_KERNEL);
2398 if (!dup)
2399 return NULL;
2400
2401 memcpy(dup, *buf, len);
2402 *(dup + len) = '\0';
2403 *buf += len;
2404
2405 if (lenp)
2406 *lenp = len;
2407
2408 return dup;
2409}
2410
a725f65e 2411/*
0bed54dc 2412 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2413 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2414 * on the list of monitor addresses and other options provided via
2415 * /sys/bus/rbd/add.
d22f76e7
AE
2416 *
2417 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2418 */
2419static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2420 const char *buf,
7ef3214a 2421 const char **mon_addrs,
5214ecc4 2422 size_t *mon_addrs_size,
e28fff26 2423 char *options,
0bed54dc 2424 size_t options_size)
e28fff26 2425{
d22f76e7
AE
2426 size_t len;
2427 int ret;
e28fff26
AE
2428
2429 /* The first four tokens are required */
2430
7ef3214a
AE
2431 len = next_token(&buf);
2432 if (!len)
a725f65e 2433 return -EINVAL;
5214ecc4 2434 *mon_addrs_size = len + 1;
7ef3214a
AE
2435 *mon_addrs = buf;
2436
2437 buf += len;
a725f65e 2438
e28fff26
AE
2439 len = copy_token(&buf, options, options_size);
2440 if (!len || len >= options_size)
2441 return -EINVAL;
2442
bf3e5ae1 2443 ret = -ENOMEM;
d22f76e7
AE
2444 rbd_dev->pool_name = dup_token(&buf, NULL);
2445 if (!rbd_dev->pool_name)
d22f76e7 2446 goto out_err;
e28fff26 2447
0bed54dc
AE
2448 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2449 if (!rbd_dev->image_name)
bf3e5ae1 2450 goto out_err;
a725f65e 2451
cb8627c7
AE
2452 /* Create the name of the header object */
2453
0bed54dc 2454 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2455 + sizeof (RBD_SUFFIX),
2456 GFP_KERNEL);
0bed54dc 2457 if (!rbd_dev->header_name)
cb8627c7 2458 goto out_err;
0bed54dc 2459 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2460
e28fff26 2461 /*
820a5f3e
AE
2462 * The snapshot name is optional. If none is is supplied,
2463 * we use the default value.
e28fff26 2464 */
820a5f3e
AE
2465 rbd_dev->snap_name = dup_token(&buf, &len);
2466 if (!rbd_dev->snap_name)
2467 goto out_err;
2468 if (!len) {
2469 /* Replace the empty name with the default */
2470 kfree(rbd_dev->snap_name);
2471 rbd_dev->snap_name
2472 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2473 if (!rbd_dev->snap_name)
2474 goto out_err;
2475
e28fff26
AE
2476 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2477 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2478 }
e28fff26 2479
a725f65e 2480 return 0;
d22f76e7
AE
2481
2482out_err:
0bed54dc 2483 kfree(rbd_dev->header_name);
d78fd7ae 2484 rbd_dev->header_name = NULL;
0bed54dc 2485 kfree(rbd_dev->image_name);
d78fd7ae
AE
2486 rbd_dev->image_name = NULL;
2487 rbd_dev->image_name_len = 0;
d22f76e7
AE
2488 kfree(rbd_dev->pool_name);
2489 rbd_dev->pool_name = NULL;
2490
2491 return ret;
a725f65e
AE
2492}
2493
59c2be1e
YS
2494static ssize_t rbd_add(struct bus_type *bus,
2495 const char *buf,
2496 size_t count)
602adf40 2497{
cb8627c7
AE
2498 char *options;
2499 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2500 const char *mon_addrs = NULL;
2501 size_t mon_addrs_size = 0;
27cc2594
AE
2502 struct ceph_osd_client *osdc;
2503 int rc = -ENOMEM;
602adf40
YS
2504
2505 if (!try_module_get(THIS_MODULE))
2506 return -ENODEV;
2507
60571c7d 2508 options = kmalloc(count, GFP_KERNEL);
602adf40 2509 if (!options)
27cc2594 2510 goto err_nomem;
cb8627c7
AE
2511 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2512 if (!rbd_dev)
2513 goto err_nomem;
602adf40
YS
2514
2515 /* static rbd_device initialization */
2516 spin_lock_init(&rbd_dev->lock);
2517 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2518 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2519 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2520
d184f6bf 2521 /* generate unique id: find highest unique id, add one */
499afd5b 2522 rbd_id_get(rbd_dev);
602adf40 2523
a725f65e 2524 /* Fill in the device name, now that we have its id. */
81a89793
AE
2525 BUILD_BUG_ON(DEV_NAME_LEN
2526 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
de71a297 2527 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
a725f65e 2528
602adf40 2529 /* parse add command */
7ef3214a 2530 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2531 options, count);
a725f65e 2532 if (rc)
f0f8cef5 2533 goto err_put_id;
e124a82f 2534
f8c38929
AE
2535 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2536 if (rc < 0)
f0f8cef5 2537 goto err_put_id;
602adf40 2538
602adf40 2539 /* pick the pool */
1dbb4399 2540 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2541 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2542 if (rc < 0)
2543 goto err_out_client;
9bb2f334 2544 rbd_dev->pool_id = rc;
602adf40
YS
2545
2546 /* register our block device */
27cc2594
AE
2547 rc = register_blkdev(0, rbd_dev->name);
2548 if (rc < 0)
602adf40 2549 goto err_out_client;
27cc2594 2550 rbd_dev->major = rc;
602adf40 2551
dfc5606d
YS
2552 rc = rbd_bus_add_dev(rbd_dev);
2553 if (rc)
766fc439
YS
2554 goto err_out_blkdev;
2555
32eec68d
AE
2556 /*
2557 * At this point cleanup in the event of an error is the job
2558 * of the sysfs code (initiated by rbd_bus_del_dev()).
2559 *
2560 * Set up and announce blkdev mapping.
2561 */
602adf40
YS
2562 rc = rbd_init_disk(rbd_dev);
2563 if (rc)
766fc439 2564 goto err_out_bus;
602adf40 2565
59c2be1e
YS
2566 rc = rbd_init_watch_dev(rbd_dev);
2567 if (rc)
2568 goto err_out_bus;
2569
602adf40
YS
2570 return count;
2571
766fc439 2572err_out_bus:
766fc439
YS
2573 /* this will also clean up rest of rbd_dev stuff */
2574
2575 rbd_bus_del_dev(rbd_dev);
2576 kfree(options);
766fc439
YS
2577 return rc;
2578
602adf40
YS
2579err_out_blkdev:
2580 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2581err_out_client:
2582 rbd_put_client(rbd_dev);
f0f8cef5 2583err_put_id:
cb8627c7 2584 if (rbd_dev->pool_name) {
820a5f3e 2585 kfree(rbd_dev->snap_name);
0bed54dc
AE
2586 kfree(rbd_dev->header_name);
2587 kfree(rbd_dev->image_name);
cb8627c7
AE
2588 kfree(rbd_dev->pool_name);
2589 }
499afd5b 2590 rbd_id_put(rbd_dev);
27cc2594 2591err_nomem:
27cc2594 2592 kfree(rbd_dev);
cb8627c7 2593 kfree(options);
27cc2594 2594
602adf40
YS
2595 dout("Error adding device %s\n", buf);
2596 module_put(THIS_MODULE);
27cc2594
AE
2597
2598 return (ssize_t) rc;
602adf40
YS
2599}
2600
de71a297 2601static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
2602{
2603 struct list_head *tmp;
2604 struct rbd_device *rbd_dev;
2605
e124a82f 2606 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2607 list_for_each(tmp, &rbd_dev_list) {
2608 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 2609 if (rbd_dev->dev_id == dev_id) {
e124a82f 2610 spin_unlock(&rbd_dev_list_lock);
602adf40 2611 return rbd_dev;
e124a82f 2612 }
602adf40 2613 }
e124a82f 2614 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2615 return NULL;
2616}
2617
dfc5606d 2618static void rbd_dev_release(struct device *dev)
602adf40 2619{
593a9e7b 2620 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2621
1dbb4399
AE
2622 if (rbd_dev->watch_request) {
2623 struct ceph_client *client = rbd_dev->rbd_client->client;
2624
2625 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2626 rbd_dev->watch_request);
1dbb4399 2627 }
59c2be1e 2628 if (rbd_dev->watch_event)
070c633f 2629 rbd_req_sync_unwatch(rbd_dev);
59c2be1e 2630
602adf40
YS
2631 rbd_put_client(rbd_dev);
2632
2633 /* clean up and free blkdev */
2634 rbd_free_disk(rbd_dev);
2635 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2636
2637 /* done with the id, and with the rbd_dev */
820a5f3e 2638 kfree(rbd_dev->snap_name);
0bed54dc 2639 kfree(rbd_dev->header_name);
d22f76e7 2640 kfree(rbd_dev->pool_name);
0bed54dc 2641 kfree(rbd_dev->image_name);
32eec68d 2642 rbd_id_put(rbd_dev);
602adf40
YS
2643 kfree(rbd_dev);
2644
2645 /* release module ref */
2646 module_put(THIS_MODULE);
602adf40
YS
2647}
2648
dfc5606d
YS
2649static ssize_t rbd_remove(struct bus_type *bus,
2650 const char *buf,
2651 size_t count)
602adf40
YS
2652{
2653 struct rbd_device *rbd_dev = NULL;
2654 int target_id, rc;
2655 unsigned long ul;
2656 int ret = count;
2657
2658 rc = strict_strtoul(buf, 10, &ul);
2659 if (rc)
2660 return rc;
2661
2662 /* convert to int; abort if we lost anything in the conversion */
2663 target_id = (int) ul;
2664 if (target_id != ul)
2665 return -EINVAL;
2666
2667 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2668
2669 rbd_dev = __rbd_get_dev(target_id);
2670 if (!rbd_dev) {
2671 ret = -ENOENT;
2672 goto done;
2673 }
2674
dfc5606d
YS
2675 __rbd_remove_all_snaps(rbd_dev);
2676 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2677
2678done:
2679 mutex_unlock(&ctl_mutex);
2680 return ret;
2681}
2682
dfc5606d
YS
2683static ssize_t rbd_snap_add(struct device *dev,
2684 struct device_attribute *attr,
2685 const char *buf,
2686 size_t count)
602adf40 2687{
593a9e7b 2688 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2689 int ret;
2690 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2691 if (!name)
2692 return -ENOMEM;
2693
dfc5606d 2694 snprintf(name, count, "%s", buf);
602adf40
YS
2695
2696 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2697
602adf40
YS
2698 ret = rbd_header_add_snap(rbd_dev,
2699 name, GFP_KERNEL);
2700 if (ret < 0)
59c2be1e 2701 goto err_unlock;
602adf40 2702
b813623a 2703 ret = __rbd_refresh_header(rbd_dev, NULL);
602adf40 2704 if (ret < 0)
59c2be1e
YS
2705 goto err_unlock;
2706
2707 /* shouldn't hold ctl_mutex when notifying.. notify might
2708 trigger a watch callback that would need to get that mutex */
2709 mutex_unlock(&ctl_mutex);
2710
2711 /* make a best effort, don't error if failed */
4cb16250 2712 rbd_req_sync_notify(rbd_dev);
602adf40
YS
2713
2714 ret = count;
59c2be1e
YS
2715 kfree(name);
2716 return ret;
2717
2718err_unlock:
602adf40 2719 mutex_unlock(&ctl_mutex);
602adf40
YS
2720 kfree(name);
2721 return ret;
2722}
2723
602adf40
YS
2724/*
2725 * create control files in sysfs
dfc5606d 2726 * /sys/bus/rbd/...
602adf40
YS
2727 */
2728static int rbd_sysfs_init(void)
2729{
dfc5606d 2730 int ret;
602adf40 2731
fed4c143 2732 ret = device_register(&rbd_root_dev);
21079786 2733 if (ret < 0)
dfc5606d 2734 return ret;
602adf40 2735
fed4c143
AE
2736 ret = bus_register(&rbd_bus_type);
2737 if (ret < 0)
2738 device_unregister(&rbd_root_dev);
602adf40 2739
602adf40
YS
2740 return ret;
2741}
2742
2743static void rbd_sysfs_cleanup(void)
2744{
dfc5606d 2745 bus_unregister(&rbd_bus_type);
fed4c143 2746 device_unregister(&rbd_root_dev);
602adf40
YS
2747}
2748
2749int __init rbd_init(void)
2750{
2751 int rc;
2752
2753 rc = rbd_sysfs_init();
2754 if (rc)
2755 return rc;
f0f8cef5 2756 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2757 return 0;
2758}
2759
2760void __exit rbd_exit(void)
2761{
2762 rbd_sysfs_cleanup();
2763}
2764
2765module_init(rbd_init);
2766module_exit(rbd_exit);
2767
2768MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2769MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2770MODULE_DESCRIPTION("rados block device");
2771
2772/* following authorship retained from original osdblk.c */
2773MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2774
2775MODULE_LICENSE("GPL");