ceph: define snap counts as u32 everywhere
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
59c2be1e
YS
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40
YS
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
602adf40
YS
85 u32 total_snaps;
86
87 char *snap_names;
88 u64 *snap_sizes;
59c2be1e
YS
89
90 u64 obj_version;
91};
92
93struct rbd_options {
94 int notify_timeout;
602adf40
YS
95};
96
97/*
f0f8cef5 98 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
99 */
100struct rbd_client {
101 struct ceph_client *client;
59c2be1e 102 struct rbd_options *rbd_opts;
602adf40
YS
103 struct kref kref;
104 struct list_head node;
105};
106
107/*
f0f8cef5 108 * a request completion status
602adf40 109 */
1fec7093
YS
110struct rbd_req_status {
111 int done;
112 int rc;
113 u64 bytes;
114};
115
116/*
117 * a collection of requests
118 */
119struct rbd_req_coll {
120 int total;
121 int num_done;
122 struct kref kref;
123 struct rbd_req_status status[0];
602adf40
YS
124};
125
f0f8cef5
AE
126/*
127 * a single io request
128 */
129struct rbd_request {
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
133 u64 len;
134 int coll_index;
135 struct rbd_req_coll *coll;
136};
137
dfc5606d
YS
138struct rbd_snap {
139 struct device dev;
140 const char *name;
3591538f 141 u64 size;
dfc5606d
YS
142 struct list_head node;
143 u64 id;
144};
145
602adf40
YS
146/*
147 * a single device
148 */
149struct rbd_device {
150 int id; /* blkdev unique id */
151
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
155
602adf40
YS
156 struct rbd_client *rbd_client;
157
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160 spinlock_t lock; /* queue lock */
161
162 struct rbd_image_header header;
0bed54dc
AE
163 char *image_name;
164 size_t image_name_len;
165 char *header_name;
d22f76e7 166 char *pool_name;
9bb2f334 167 int pool_id;
602adf40 168
59c2be1e
YS
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
171
c666601a
JD
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
e88a36ec 174 /* name of the snapshot this device reads from */
820a5f3e 175 char *snap_name;
e88a36ec 176 /* id of the snapshot this device reads from */
77dfe99f 177 u64 snap_id; /* current snapshot id */
e88a36ec
JD
178 /* whether the snap_id this device reads from still exists */
179 bool snap_exists;
180 int read_only;
602adf40
YS
181
182 struct list_head node;
dfc5606d
YS
183
184 /* list of snapshots */
185 struct list_head snaps;
186
187 /* sysfs related */
188 struct device dev;
189};
190
602adf40 191static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 192
602adf40 193static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
432b8587
AE
196static LIST_HEAD(rbd_client_list); /* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 198
dfc5606d
YS
199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200static void rbd_dev_release(struct device *dev);
dfc5606d
YS
201static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
203 const char *buf,
204 size_t count);
205static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 206 struct rbd_snap *snap);
dfc5606d 207
f0f8cef5
AE
208static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 size_t count);
210static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 size_t count);
212
213static struct bus_attribute rbd_bus_attrs[] = {
214 __ATTR(add, S_IWUSR, NULL, rbd_add),
215 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 __ATTR_NULL
217};
218
219static struct bus_type rbd_bus_type = {
220 .name = "rbd",
221 .bus_attrs = rbd_bus_attrs,
222};
223
224static void rbd_root_dev_release(struct device *dev)
225{
226}
227
228static struct device rbd_root_dev = {
229 .init_name = "rbd",
230 .release = rbd_root_dev_release,
231};
232
dfc5606d 233
dfc5606d
YS
234static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235{
236 return get_device(&rbd_dev->dev);
237}
238
239static void rbd_put_dev(struct rbd_device *rbd_dev)
240{
241 put_device(&rbd_dev->dev);
242}
602adf40 243
263c6ca0 244static int __rbd_refresh_header(struct rbd_device *rbd_dev);
59c2be1e 245
602adf40
YS
246static int rbd_open(struct block_device *bdev, fmode_t mode)
247{
f0f8cef5 248 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 249
dfc5606d
YS
250 rbd_get_dev(rbd_dev);
251
602adf40
YS
252 set_device_ro(bdev, rbd_dev->read_only);
253
254 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
255 return -EROFS;
256
257 return 0;
258}
259
dfc5606d
YS
260static int rbd_release(struct gendisk *disk, fmode_t mode)
261{
262 struct rbd_device *rbd_dev = disk->private_data;
263
264 rbd_put_dev(rbd_dev);
265
266 return 0;
267}
268
602adf40
YS
269static const struct block_device_operations rbd_bd_ops = {
270 .owner = THIS_MODULE,
271 .open = rbd_open,
dfc5606d 272 .release = rbd_release,
602adf40
YS
273};
274
275/*
276 * Initialize an rbd client instance.
43ae4701 277 * We own *ceph_opts.
602adf40 278 */
43ae4701 279static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
59c2be1e 280 struct rbd_options *rbd_opts)
602adf40
YS
281{
282 struct rbd_client *rbdc;
283 int ret = -ENOMEM;
284
285 dout("rbd_client_create\n");
286 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 if (!rbdc)
288 goto out_opt;
289
290 kref_init(&rbdc->kref);
291 INIT_LIST_HEAD(&rbdc->node);
292
bc534d86
AE
293 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
43ae4701 295 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 296 if (IS_ERR(rbdc->client))
bc534d86 297 goto out_mutex;
43ae4701 298 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
299
300 ret = ceph_open_session(rbdc->client);
301 if (ret < 0)
302 goto out_err;
303
59c2be1e
YS
304 rbdc->rbd_opts = rbd_opts;
305
432b8587 306 spin_lock(&rbd_client_list_lock);
602adf40 307 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 308 spin_unlock(&rbd_client_list_lock);
602adf40 309
bc534d86
AE
310 mutex_unlock(&ctl_mutex);
311
602adf40
YS
312 dout("rbd_client_create created %p\n", rbdc);
313 return rbdc;
314
315out_err:
316 ceph_destroy_client(rbdc->client);
bc534d86
AE
317out_mutex:
318 mutex_unlock(&ctl_mutex);
602adf40
YS
319 kfree(rbdc);
320out_opt:
43ae4701
AE
321 if (ceph_opts)
322 ceph_destroy_options(ceph_opts);
28f259b7 323 return ERR_PTR(ret);
602adf40
YS
324}
325
326/*
327 * Find a ceph client with specific addr and configuration.
328 */
43ae4701 329static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
330{
331 struct rbd_client *client_node;
332
43ae4701 333 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
334 return NULL;
335
336 list_for_each_entry(client_node, &rbd_client_list, node)
43ae4701 337 if (!ceph_compare_options(ceph_opts, client_node->client))
602adf40
YS
338 return client_node;
339 return NULL;
340}
341
59c2be1e
YS
342/*
343 * mount options
344 */
345enum {
346 Opt_notify_timeout,
347 Opt_last_int,
348 /* int args above */
349 Opt_last_string,
350 /* string args above */
351};
352
43ae4701 353static match_table_t rbd_opts_tokens = {
59c2be1e
YS
354 {Opt_notify_timeout, "notify_timeout=%d"},
355 /* int args above */
356 /* string args above */
357 {-1, NULL}
358};
359
360static int parse_rbd_opts_token(char *c, void *private)
361{
43ae4701 362 struct rbd_options *rbd_opts = private;
59c2be1e
YS
363 substring_t argstr[MAX_OPT_ARGS];
364 int token, intval, ret;
365
43ae4701 366 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
367 if (token < 0)
368 return -EINVAL;
369
370 if (token < Opt_last_int) {
371 ret = match_int(&argstr[0], &intval);
372 if (ret < 0) {
373 pr_err("bad mount option arg (not int) "
374 "at '%s'\n", c);
375 return ret;
376 }
377 dout("got int token %d val %d\n", token, intval);
378 } else if (token > Opt_last_int && token < Opt_last_string) {
379 dout("got string token %d val %s\n", token,
380 argstr[0].from);
381 } else {
382 dout("got token %d\n", token);
383 }
384
385 switch (token) {
386 case Opt_notify_timeout:
43ae4701 387 rbd_opts->notify_timeout = intval;
59c2be1e
YS
388 break;
389 default:
390 BUG_ON(token);
391 }
392 return 0;
393}
394
602adf40
YS
395/*
396 * Get a ceph client with specific addr and configuration, if one does
397 * not exist create it.
398 */
5214ecc4
AE
399static struct rbd_client *rbd_get_client(const char *mon_addr,
400 size_t mon_addr_len,
401 char *options)
602adf40
YS
402{
403 struct rbd_client *rbdc;
43ae4701 404 struct ceph_options *ceph_opts;
59c2be1e
YS
405 struct rbd_options *rbd_opts;
406
407 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408 if (!rbd_opts)
d720bcb0 409 return ERR_PTR(-ENOMEM);
59c2be1e
YS
410
411 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 412
43ae4701
AE
413 ceph_opts = ceph_parse_options(options, mon_addr,
414 mon_addr + mon_addr_len,
415 parse_rbd_opts_token, rbd_opts);
416 if (IS_ERR(ceph_opts)) {
d720bcb0 417 kfree(rbd_opts);
43ae4701 418 return ERR_CAST(ceph_opts);
ee57741c 419 }
602adf40 420
432b8587 421 spin_lock(&rbd_client_list_lock);
43ae4701 422 rbdc = __rbd_client_find(ceph_opts);
602adf40 423 if (rbdc) {
602adf40
YS
424 /* using an existing client */
425 kref_get(&rbdc->kref);
432b8587 426 spin_unlock(&rbd_client_list_lock);
e6994d3d 427
43ae4701 428 ceph_destroy_options(ceph_opts);
e6994d3d
AE
429 kfree(rbd_opts);
430
d720bcb0 431 return rbdc;
602adf40 432 }
432b8587 433 spin_unlock(&rbd_client_list_lock);
602adf40 434
43ae4701 435 rbdc = rbd_client_create(ceph_opts, rbd_opts);
d97081b0 436
d720bcb0
AE
437 if (IS_ERR(rbdc))
438 kfree(rbd_opts);
602adf40 439
d720bcb0 440 return rbdc;
602adf40
YS
441}
442
443/*
444 * Destroy ceph client
d23a4b3f 445 *
432b8587 446 * Caller must hold rbd_client_list_lock.
602adf40
YS
447 */
448static void rbd_client_release(struct kref *kref)
449{
450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 453 spin_lock(&rbd_client_list_lock);
602adf40 454 list_del(&rbdc->node);
cd9d9f5d 455 spin_unlock(&rbd_client_list_lock);
602adf40
YS
456
457 ceph_destroy_client(rbdc->client);
59c2be1e 458 kfree(rbdc->rbd_opts);
602adf40
YS
459 kfree(rbdc);
460}
461
462/*
463 * Drop reference to ceph client node. If it's not referenced anymore, release
464 * it.
465 */
466static void rbd_put_client(struct rbd_device *rbd_dev)
467{
468 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
469 rbd_dev->rbd_client = NULL;
602adf40
YS
470}
471
1fec7093
YS
472/*
473 * Destroy requests collection
474 */
475static void rbd_coll_release(struct kref *kref)
476{
477 struct rbd_req_coll *coll =
478 container_of(kref, struct rbd_req_coll, kref);
479
480 dout("rbd_coll_release %p\n", coll);
481 kfree(coll);
482}
602adf40
YS
483
484/*
485 * Create a new header structure, translate header format from the on-disk
486 * header.
487 */
488static int rbd_header_from_disk(struct rbd_image_header *header,
489 struct rbd_image_header_ondisk *ondisk,
50f7c4c9 490 u32 allocated_snaps,
602adf40
YS
491 gfp_t gfp_flags)
492{
50f7c4c9 493 u32 i, snap_count;
602adf40 494
21079786 495 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 496 return -ENXIO;
81e759fb 497
00f1f36f 498 snap_count = le32_to_cpu(ondisk->snap_count);
50f7c4c9
XW
499 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
500 / sizeof (*ondisk))
501 return -EINVAL;
602adf40 502 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
f9f9a190 503 snap_count * sizeof(u64),
602adf40
YS
504 gfp_flags);
505 if (!header->snapc)
506 return -ENOMEM;
00f1f36f 507
00f1f36f 508 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
602adf40
YS
509 if (snap_count) {
510 header->snap_names = kmalloc(header->snap_names_len,
f8ad495a 511 gfp_flags);
602adf40
YS
512 if (!header->snap_names)
513 goto err_snapc;
514 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
f8ad495a 515 gfp_flags);
602adf40
YS
516 if (!header->snap_sizes)
517 goto err_names;
518 } else {
519 header->snap_names = NULL;
520 header->snap_sizes = NULL;
521 }
849b4260
AE
522
523 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
524 gfp_flags);
525 if (!header->object_prefix)
526 goto err_sizes;
527
ca1e49a6 528 memcpy(header->object_prefix, ondisk->block_name,
602adf40 529 sizeof(ondisk->block_name));
849b4260 530 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
602adf40
YS
531
532 header->image_size = le64_to_cpu(ondisk->image_size);
533 header->obj_order = ondisk->options.order;
534 header->crypt_type = ondisk->options.crypt_type;
535 header->comp_type = ondisk->options.comp_type;
536
537 atomic_set(&header->snapc->nref, 1);
505cbb9b 538 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40
YS
539 header->snapc->num_snaps = snap_count;
540 header->total_snaps = snap_count;
541
21079786 542 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
543 for (i = 0; i < snap_count; i++) {
544 header->snapc->snaps[i] =
545 le64_to_cpu(ondisk->snaps[i].id);
546 header->snap_sizes[i] =
547 le64_to_cpu(ondisk->snaps[i].image_size);
548 }
549
550 /* copy snapshot names */
551 memcpy(header->snap_names, &ondisk->snaps[i],
552 header->snap_names_len);
553 }
554
555 return 0;
556
849b4260
AE
557err_sizes:
558 kfree(header->snap_sizes);
602adf40
YS
559err_names:
560 kfree(header->snap_names);
561err_snapc:
562 kfree(header->snapc);
00f1f36f 563 return -ENOMEM;
602adf40
YS
564}
565
602adf40
YS
566static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
567 u64 *seq, u64 *size)
568{
569 int i;
570 char *p = header->snap_names;
571
00f1f36f
AE
572 for (i = 0; i < header->total_snaps; i++) {
573 if (!strcmp(snap_name, p)) {
602adf40 574
00f1f36f 575 /* Found it. Pass back its id and/or size */
602adf40 576
00f1f36f
AE
577 if (seq)
578 *seq = header->snapc->snaps[i];
579 if (size)
580 *size = header->snap_sizes[i];
581 return i;
582 }
583 p += strlen(p) + 1; /* Skip ahead to the next name */
584 }
585 return -ENOENT;
602adf40
YS
586}
587
0ce1a794 588static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 589{
78dc447d 590 int ret;
602adf40 591
0ce1a794 592 down_write(&rbd_dev->header_rwsem);
602adf40 593
0ce1a794 594 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 595 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 596 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 597 rbd_dev->snap_exists = false;
0ce1a794 598 rbd_dev->read_only = 0;
602adf40 599 if (size)
78dc447d 600 *size = rbd_dev->header.image_size;
602adf40 601 } else {
78dc447d
AE
602 u64 snap_id = 0;
603
604 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
605 &snap_id, size);
602adf40
YS
606 if (ret < 0)
607 goto done;
78dc447d 608 rbd_dev->snap_id = snap_id;
e88a36ec 609 rbd_dev->snap_exists = true;
0ce1a794 610 rbd_dev->read_only = 1;
602adf40
YS
611 }
612
613 ret = 0;
614done:
0ce1a794 615 up_write(&rbd_dev->header_rwsem);
602adf40
YS
616 return ret;
617}
618
619static void rbd_header_free(struct rbd_image_header *header)
620{
849b4260 621 kfree(header->object_prefix);
602adf40 622 kfree(header->snap_sizes);
849b4260 623 kfree(header->snap_names);
d1d25646 624 ceph_put_snap_context(header->snapc);
602adf40
YS
625}
626
627/*
628 * get the actual striped segment name, offset and length
629 */
630static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 631 const char *object_prefix,
602adf40
YS
632 u64 ofs, u64 len,
633 char *seg_name, u64 *segofs)
634{
635 u64 seg = ofs >> header->obj_order;
636
637 if (seg_name)
638 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 639 "%s.%012llx", object_prefix, seg);
602adf40
YS
640
641 ofs = ofs & ((1 << header->obj_order) - 1);
642 len = min_t(u64, len, (1 << header->obj_order) - ofs);
643
644 if (segofs)
645 *segofs = ofs;
646
647 return len;
648}
649
1fec7093
YS
650static int rbd_get_num_segments(struct rbd_image_header *header,
651 u64 ofs, u64 len)
652{
653 u64 start_seg = ofs >> header->obj_order;
654 u64 end_seg = (ofs + len - 1) >> header->obj_order;
655 return end_seg - start_seg + 1;
656}
657
029bcbd8
JD
658/*
659 * returns the size of an object in the image
660 */
661static u64 rbd_obj_bytes(struct rbd_image_header *header)
662{
663 return 1 << header->obj_order;
664}
665
602adf40
YS
666/*
667 * bio helpers
668 */
669
670static void bio_chain_put(struct bio *chain)
671{
672 struct bio *tmp;
673
674 while (chain) {
675 tmp = chain;
676 chain = chain->bi_next;
677 bio_put(tmp);
678 }
679}
680
681/*
682 * zeros a bio chain, starting at specific offset
683 */
684static void zero_bio_chain(struct bio *chain, int start_ofs)
685{
686 struct bio_vec *bv;
687 unsigned long flags;
688 void *buf;
689 int i;
690 int pos = 0;
691
692 while (chain) {
693 bio_for_each_segment(bv, chain, i) {
694 if (pos + bv->bv_len > start_ofs) {
695 int remainder = max(start_ofs - pos, 0);
696 buf = bvec_kmap_irq(bv, &flags);
697 memset(buf + remainder, 0,
698 bv->bv_len - remainder);
85b5aaa6 699 bvec_kunmap_irq(buf, &flags);
602adf40
YS
700 }
701 pos += bv->bv_len;
702 }
703
704 chain = chain->bi_next;
705 }
706}
707
708/*
709 * bio_chain_clone - clone a chain of bios up to a certain length.
710 * might return a bio_pair that will need to be released.
711 */
712static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
713 struct bio_pair **bp,
714 int len, gfp_t gfpmask)
715{
716 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
717 int total = 0;
718
719 if (*bp) {
720 bio_pair_release(*bp);
721 *bp = NULL;
722 }
723
724 while (old_chain && (total < len)) {
725 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
726 if (!tmp)
727 goto err_out;
728
729 if (total + old_chain->bi_size > len) {
730 struct bio_pair *bp;
731
732 /*
733 * this split can only happen with a single paged bio,
734 * split_bio will BUG_ON if this is not the case
735 */
736 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
737 "bi_size=%u\n",
738 total, len - total, old_chain->bi_size);
602adf40
YS
739
740 /* split the bio. We'll release it either in the next
741 call, or it will have to be released outside */
593a9e7b 742 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
743 if (!bp)
744 goto err_out;
745
746 __bio_clone(tmp, &bp->bio1);
747
748 *next = &bp->bio2;
749 } else {
750 __bio_clone(tmp, old_chain);
751 *next = old_chain->bi_next;
752 }
753
754 tmp->bi_bdev = NULL;
755 gfpmask &= ~__GFP_WAIT;
756 tmp->bi_next = NULL;
757
758 if (!new_chain) {
759 new_chain = tail = tmp;
760 } else {
761 tail->bi_next = tmp;
762 tail = tmp;
763 }
764 old_chain = old_chain->bi_next;
765
766 total += tmp->bi_size;
767 }
768
769 BUG_ON(total < len);
770
771 if (tail)
772 tail->bi_next = NULL;
773
774 *old = old_chain;
775
776 return new_chain;
777
778err_out:
779 dout("bio_chain_clone with err\n");
780 bio_chain_put(new_chain);
781 return NULL;
782}
783
784/*
785 * helpers for osd request op vectors.
786 */
787static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
788 int num_ops,
789 int opcode,
790 u32 payload_len)
791{
792 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
793 GFP_NOIO);
794 if (!*ops)
795 return -ENOMEM;
796 (*ops)[0].op = opcode;
797 /*
798 * op extent offset and length will be set later on
799 * in calc_raw_layout()
800 */
801 (*ops)[0].payload_len = payload_len;
802 return 0;
803}
804
805static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
806{
807 kfree(ops);
808}
809
1fec7093
YS
810static void rbd_coll_end_req_index(struct request *rq,
811 struct rbd_req_coll *coll,
812 int index,
813 int ret, u64 len)
814{
815 struct request_queue *q;
816 int min, max, i;
817
bd919d45
AE
818 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
819 coll, index, ret, (unsigned long long) len);
1fec7093
YS
820
821 if (!rq)
822 return;
823
824 if (!coll) {
825 blk_end_request(rq, ret, len);
826 return;
827 }
828
829 q = rq->q;
830
831 spin_lock_irq(q->queue_lock);
832 coll->status[index].done = 1;
833 coll->status[index].rc = ret;
834 coll->status[index].bytes = len;
835 max = min = coll->num_done;
836 while (max < coll->total && coll->status[max].done)
837 max++;
838
839 for (i = min; i<max; i++) {
840 __blk_end_request(rq, coll->status[i].rc,
841 coll->status[i].bytes);
842 coll->num_done++;
843 kref_put(&coll->kref, rbd_coll_release);
844 }
845 spin_unlock_irq(q->queue_lock);
846}
847
848static void rbd_coll_end_req(struct rbd_request *req,
849 int ret, u64 len)
850{
851 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
852}
853
602adf40
YS
854/*
855 * Send ceph osd request
856 */
857static int rbd_do_request(struct request *rq,
0ce1a794 858 struct rbd_device *rbd_dev,
602adf40
YS
859 struct ceph_snap_context *snapc,
860 u64 snapid,
aded07ea 861 const char *object_name, u64 ofs, u64 len,
602adf40
YS
862 struct bio *bio,
863 struct page **pages,
864 int num_pages,
865 int flags,
866 struct ceph_osd_req_op *ops,
1fec7093
YS
867 struct rbd_req_coll *coll,
868 int coll_index,
602adf40 869 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
870 struct ceph_msg *msg),
871 struct ceph_osd_request **linger_req,
872 u64 *ver)
602adf40
YS
873{
874 struct ceph_osd_request *req;
875 struct ceph_file_layout *layout;
876 int ret;
877 u64 bno;
878 struct timespec mtime = CURRENT_TIME;
879 struct rbd_request *req_data;
880 struct ceph_osd_request_head *reqhead;
1dbb4399 881 struct ceph_osd_client *osdc;
602adf40 882
602adf40 883 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
884 if (!req_data) {
885 if (coll)
886 rbd_coll_end_req_index(rq, coll, coll_index,
887 -ENOMEM, len);
888 return -ENOMEM;
889 }
890
891 if (coll) {
892 req_data->coll = coll;
893 req_data->coll_index = coll_index;
894 }
602adf40 895
bd919d45
AE
896 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
897 (unsigned long long) ofs, (unsigned long long) len);
602adf40 898
0ce1a794 899 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
900 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
901 false, GFP_NOIO, pages, bio);
4ad12621 902 if (!req) {
4ad12621 903 ret = -ENOMEM;
602adf40
YS
904 goto done_pages;
905 }
906
907 req->r_callback = rbd_cb;
908
909 req_data->rq = rq;
910 req_data->bio = bio;
911 req_data->pages = pages;
912 req_data->len = len;
913
914 req->r_priv = req_data;
915
916 reqhead = req->r_request->front.iov_base;
917 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
918
aded07ea 919 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
920 req->r_oid_len = strlen(req->r_oid);
921
922 layout = &req->r_file_layout;
923 memset(layout, 0, sizeof(*layout));
924 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
925 layout->fl_stripe_count = cpu_to_le32(1);
926 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 927 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
928 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
929 req, ops);
602adf40
YS
930
931 ceph_osdc_build_request(req, ofs, &len,
932 ops,
933 snapc,
934 &mtime,
935 req->r_oid, req->r_oid_len);
602adf40 936
59c2be1e 937 if (linger_req) {
1dbb4399 938 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
939 *linger_req = req;
940 }
941
1dbb4399 942 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
943 if (ret < 0)
944 goto done_err;
945
946 if (!rbd_cb) {
1dbb4399 947 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
948 if (ver)
949 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
950 dout("reassert_ver=%llu\n",
951 (unsigned long long)
952 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
953 ceph_osdc_put_request(req);
954 }
955 return ret;
956
957done_err:
958 bio_chain_put(req_data->bio);
959 ceph_osdc_put_request(req);
960done_pages:
1fec7093 961 rbd_coll_end_req(req_data, ret, len);
602adf40 962 kfree(req_data);
602adf40
YS
963 return ret;
964}
965
966/*
967 * Ceph osd op callback
968 */
969static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
970{
971 struct rbd_request *req_data = req->r_priv;
972 struct ceph_osd_reply_head *replyhead;
973 struct ceph_osd_op *op;
974 __s32 rc;
975 u64 bytes;
976 int read_op;
977
978 /* parse reply */
979 replyhead = msg->front.iov_base;
980 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
981 op = (void *)(replyhead + 1);
982 rc = le32_to_cpu(replyhead->result);
983 bytes = le64_to_cpu(op->extent.length);
895cfcc8 984 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 985
bd919d45
AE
986 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
987 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
988
989 if (rc == -ENOENT && read_op) {
990 zero_bio_chain(req_data->bio, 0);
991 rc = 0;
992 } else if (rc == 0 && read_op && bytes < req_data->len) {
993 zero_bio_chain(req_data->bio, bytes);
994 bytes = req_data->len;
995 }
996
1fec7093 997 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
998
999 if (req_data->bio)
1000 bio_chain_put(req_data->bio);
1001
1002 ceph_osdc_put_request(req);
1003 kfree(req_data);
1004}
1005
59c2be1e
YS
1006static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1007{
1008 ceph_osdc_put_request(req);
1009}
1010
602adf40
YS
1011/*
1012 * Do a synchronous ceph osd operation
1013 */
0ce1a794 1014static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1015 struct ceph_snap_context *snapc,
1016 u64 snapid,
1017 int opcode,
1018 int flags,
1019 struct ceph_osd_req_op *orig_ops,
aded07ea 1020 const char *object_name,
602adf40 1021 u64 ofs, u64 len,
59c2be1e
YS
1022 char *buf,
1023 struct ceph_osd_request **linger_req,
1024 u64 *ver)
602adf40
YS
1025{
1026 int ret;
1027 struct page **pages;
1028 int num_pages;
1029 struct ceph_osd_req_op *ops = orig_ops;
1030 u32 payload_len;
1031
1032 num_pages = calc_pages_for(ofs , len);
1033 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1034 if (IS_ERR(pages))
1035 return PTR_ERR(pages);
602adf40
YS
1036
1037 if (!orig_ops) {
1038 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1039 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1040 if (ret < 0)
1041 goto done;
1042
1043 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1044 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1045 if (ret < 0)
1046 goto done_ops;
1047 }
1048 }
1049
0ce1a794 1050 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1051 object_name, ofs, len, NULL,
602adf40
YS
1052 pages, num_pages,
1053 flags,
1054 ops,
1fec7093 1055 NULL, 0,
59c2be1e
YS
1056 NULL,
1057 linger_req, ver);
602adf40
YS
1058 if (ret < 0)
1059 goto done_ops;
1060
1061 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063
1064done_ops:
1065 if (!orig_ops)
1066 rbd_destroy_ops(ops);
1067done:
1068 ceph_release_page_vector(pages, num_pages);
1069 return ret;
1070}
1071
1072/*
1073 * Do an asynchronous ceph osd operation
1074 */
1075static int rbd_do_op(struct request *rq,
0ce1a794 1076 struct rbd_device *rbd_dev,
602adf40
YS
1077 struct ceph_snap_context *snapc,
1078 u64 snapid,
d1f57ea6 1079 int opcode, int flags,
602adf40 1080 u64 ofs, u64 len,
1fec7093
YS
1081 struct bio *bio,
1082 struct rbd_req_coll *coll,
1083 int coll_index)
602adf40
YS
1084{
1085 char *seg_name;
1086 u64 seg_ofs;
1087 u64 seg_len;
1088 int ret;
1089 struct ceph_osd_req_op *ops;
1090 u32 payload_len;
1091
1092 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1093 if (!seg_name)
1094 return -ENOMEM;
1095
1096 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1097 rbd_dev->header.object_prefix,
602adf40
YS
1098 ofs, len,
1099 seg_name, &seg_ofs);
602adf40
YS
1100
1101 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1102
1103 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1104 if (ret < 0)
1105 goto done;
1106
1107 /* we've taken care of segment sizes earlier when we
1108 cloned the bios. We should never have a segment
1109 truncated at this point */
1110 BUG_ON(seg_len < len);
1111
1112 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1113 seg_name, seg_ofs, seg_len,
1114 bio,
1115 NULL, 0,
1116 flags,
1117 ops,
1fec7093 1118 coll, coll_index,
59c2be1e 1119 rbd_req_cb, 0, NULL);
11f77002
SW
1120
1121 rbd_destroy_ops(ops);
602adf40
YS
1122done:
1123 kfree(seg_name);
1124 return ret;
1125}
1126
1127/*
1128 * Request async osd write
1129 */
1130static int rbd_req_write(struct request *rq,
1131 struct rbd_device *rbd_dev,
1132 struct ceph_snap_context *snapc,
1133 u64 ofs, u64 len,
1fec7093
YS
1134 struct bio *bio,
1135 struct rbd_req_coll *coll,
1136 int coll_index)
602adf40
YS
1137{
1138 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1139 CEPH_OSD_OP_WRITE,
1140 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1141 ofs, len, bio, coll, coll_index);
602adf40
YS
1142}
1143
1144/*
1145 * Request async osd read
1146 */
1147static int rbd_req_read(struct request *rq,
1148 struct rbd_device *rbd_dev,
1149 u64 snapid,
1150 u64 ofs, u64 len,
1fec7093
YS
1151 struct bio *bio,
1152 struct rbd_req_coll *coll,
1153 int coll_index)
602adf40
YS
1154{
1155 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1156 snapid,
602adf40
YS
1157 CEPH_OSD_OP_READ,
1158 CEPH_OSD_FLAG_READ,
1fec7093 1159 ofs, len, bio, coll, coll_index);
602adf40
YS
1160}
1161
1162/*
1163 * Request sync osd read
1164 */
0ce1a794 1165static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40
YS
1166 struct ceph_snap_context *snapc,
1167 u64 snapid,
aded07ea 1168 const char *object_name,
602adf40 1169 u64 ofs, u64 len,
59c2be1e
YS
1170 char *buf,
1171 u64 *ver)
602adf40 1172{
0ce1a794 1173 return rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1174 snapid,
602adf40
YS
1175 CEPH_OSD_OP_READ,
1176 CEPH_OSD_FLAG_READ,
1177 NULL,
d1f57ea6 1178 object_name, ofs, len, buf, NULL, ver);
602adf40
YS
1179}
1180
1181/*
59c2be1e
YS
1182 * Request sync osd watch
1183 */
0ce1a794 1184static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e
YS
1185 u64 ver,
1186 u64 notify_id,
aded07ea 1187 const char *object_name)
59c2be1e
YS
1188{
1189 struct ceph_osd_req_op *ops;
11f77002
SW
1190 int ret;
1191
1192 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1193 if (ret < 0)
1194 return ret;
1195
a71b891b 1196 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1197 ops[0].watch.cookie = notify_id;
1198 ops[0].watch.flag = 0;
1199
0ce1a794 1200 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
aded07ea 1201 object_name, 0, 0, NULL,
ad4f232f 1202 NULL, 0,
59c2be1e
YS
1203 CEPH_OSD_FLAG_READ,
1204 ops,
1fec7093 1205 NULL, 0,
59c2be1e
YS
1206 rbd_simple_req_cb, 0, NULL);
1207
1208 rbd_destroy_ops(ops);
1209 return ret;
1210}
1211
1212static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1213{
0ce1a794 1214 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1215 u64 hver;
13143d2d
SW
1216 int rc;
1217
0ce1a794 1218 if (!rbd_dev)
59c2be1e
YS
1219 return;
1220
bd919d45
AE
1221 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1222 rbd_dev->header_name, (unsigned long long) notify_id,
1223 (unsigned int) opcode);
59c2be1e 1224 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
0ce1a794 1225 rc = __rbd_refresh_header(rbd_dev);
a71b891b 1226 hver = rbd_dev->header.obj_version;
59c2be1e 1227 mutex_unlock(&ctl_mutex);
13143d2d 1228 if (rc)
f0f8cef5 1229 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1230 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1231
a71b891b 1232 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id, rbd_dev->header_name);
59c2be1e
YS
1233}
1234
1235/*
1236 * Request sync osd watch
1237 */
0ce1a794 1238static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
aded07ea 1239 const char *object_name,
59c2be1e
YS
1240 u64 ver)
1241{
1242 struct ceph_osd_req_op *ops;
0ce1a794 1243 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1244
1245 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1246 if (ret < 0)
1247 return ret;
1248
1249 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1250 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1251 if (ret < 0)
1252 goto fail;
1253
1254 ops[0].watch.ver = cpu_to_le64(ver);
0ce1a794 1255 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1256 ops[0].watch.flag = 1;
1257
0ce1a794 1258 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1259 CEPH_NOSNAP,
1260 0,
1261 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1262 ops,
d1f57ea6 1263 object_name, 0, 0, NULL,
0ce1a794 1264 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1265
1266 if (ret < 0)
1267 goto fail_event;
1268
1269 rbd_destroy_ops(ops);
1270 return 0;
1271
1272fail_event:
0ce1a794
AE
1273 ceph_osdc_cancel_event(rbd_dev->watch_event);
1274 rbd_dev->watch_event = NULL;
59c2be1e
YS
1275fail:
1276 rbd_destroy_ops(ops);
1277 return ret;
1278}
1279
79e3057c
YS
1280/*
1281 * Request sync osd unwatch
1282 */
0ce1a794 1283static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
aded07ea 1284 const char *object_name)
79e3057c
YS
1285{
1286 struct ceph_osd_req_op *ops;
1287
1288 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1289 if (ret < 0)
1290 return ret;
1291
1292 ops[0].watch.ver = 0;
0ce1a794 1293 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1294 ops[0].watch.flag = 0;
1295
0ce1a794 1296 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c
YS
1297 CEPH_NOSNAP,
1298 0,
1299 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1300 ops,
d1f57ea6 1301 object_name, 0, 0, NULL, NULL, NULL);
79e3057c
YS
1302
1303 rbd_destroy_ops(ops);
0ce1a794
AE
1304 ceph_osdc_cancel_event(rbd_dev->watch_event);
1305 rbd_dev->watch_event = NULL;
79e3057c
YS
1306 return ret;
1307}
1308
59c2be1e 1309struct rbd_notify_info {
0ce1a794 1310 struct rbd_device *rbd_dev;
59c2be1e
YS
1311};
1312
1313static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1314{
0ce1a794
AE
1315 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1316 if (!rbd_dev)
59c2be1e
YS
1317 return;
1318
bd919d45
AE
1319 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320 rbd_dev->header_name, (unsigned long long) notify_id,
1321 (unsigned int) opcode);
59c2be1e
YS
1322}
1323
1324/*
1325 * Request sync osd notify
1326 */
0ce1a794 1327static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
aded07ea 1328 const char *object_name)
59c2be1e
YS
1329{
1330 struct ceph_osd_req_op *ops;
0ce1a794 1331 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1332 struct ceph_osd_event *event;
1333 struct rbd_notify_info info;
1334 int payload_len = sizeof(u32) + sizeof(u32);
1335 int ret;
1336
1337 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1338 if (ret < 0)
1339 return ret;
1340
0ce1a794 1341 info.rbd_dev = rbd_dev;
59c2be1e
YS
1342
1343 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1344 (void *)&info, &event);
1345 if (ret < 0)
1346 goto fail;
1347
1348 ops[0].watch.ver = 1;
1349 ops[0].watch.flag = 1;
1350 ops[0].watch.cookie = event->cookie;
1351 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1352 ops[0].watch.timeout = 12;
1353
0ce1a794 1354 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1355 CEPH_NOSNAP,
1356 0,
1357 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1358 ops,
d1f57ea6 1359 object_name, 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1360 if (ret < 0)
1361 goto fail_event;
1362
1363 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1364 dout("ceph_osdc_wait_event returned %d\n", ret);
1365 rbd_destroy_ops(ops);
1366 return 0;
1367
1368fail_event:
1369 ceph_osdc_cancel_event(event);
1370fail:
1371 rbd_destroy_ops(ops);
1372 return ret;
1373}
1374
602adf40
YS
1375/*
1376 * Request sync osd read
1377 */
0ce1a794 1378static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1379 const char *object_name,
1380 const char *class_name,
1381 const char *method_name,
602adf40 1382 const char *data,
59c2be1e
YS
1383 int len,
1384 u64 *ver)
602adf40
YS
1385{
1386 struct ceph_osd_req_op *ops;
aded07ea
AE
1387 int class_name_len = strlen(class_name);
1388 int method_name_len = strlen(method_name);
602adf40 1389 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
aded07ea 1390 class_name_len + method_name_len + len);
602adf40
YS
1391 if (ret < 0)
1392 return ret;
1393
aded07ea
AE
1394 ops[0].cls.class_name = class_name;
1395 ops[0].cls.class_len = (__u8) class_name_len;
1396 ops[0].cls.method_name = method_name;
1397 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1398 ops[0].cls.argc = 0;
1399 ops[0].cls.indata = data;
1400 ops[0].cls.indata_len = len;
1401
0ce1a794 1402 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40
YS
1403 CEPH_NOSNAP,
1404 0,
1405 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1406 ops,
d1f57ea6 1407 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1408
1409 rbd_destroy_ops(ops);
1410
1411 dout("cls_exec returned %d\n", ret);
1412 return ret;
1413}
1414
1fec7093
YS
1415static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1416{
1417 struct rbd_req_coll *coll =
1418 kzalloc(sizeof(struct rbd_req_coll) +
1419 sizeof(struct rbd_req_status) * num_reqs,
1420 GFP_ATOMIC);
1421
1422 if (!coll)
1423 return NULL;
1424 coll->total = num_reqs;
1425 kref_init(&coll->kref);
1426 return coll;
1427}
1428
602adf40
YS
1429/*
1430 * block device queue callback
1431 */
1432static void rbd_rq_fn(struct request_queue *q)
1433{
1434 struct rbd_device *rbd_dev = q->queuedata;
1435 struct request *rq;
1436 struct bio_pair *bp = NULL;
1437
00f1f36f 1438 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1439 struct bio *bio;
1440 struct bio *rq_bio, *next_bio = NULL;
1441 bool do_write;
bd919d45
AE
1442 unsigned int size;
1443 u64 op_size = 0;
602adf40 1444 u64 ofs;
1fec7093
YS
1445 int num_segs, cur_seg = 0;
1446 struct rbd_req_coll *coll;
d1d25646 1447 struct ceph_snap_context *snapc;
602adf40
YS
1448
1449 /* peek at request from block layer */
1450 if (!rq)
1451 break;
1452
1453 dout("fetched request\n");
1454
1455 /* filter out block requests we don't understand */
1456 if ((rq->cmd_type != REQ_TYPE_FS)) {
1457 __blk_end_request_all(rq, 0);
00f1f36f 1458 continue;
602adf40
YS
1459 }
1460
1461 /* deduce our operation (read, write) */
1462 do_write = (rq_data_dir(rq) == WRITE);
1463
1464 size = blk_rq_bytes(rq);
593a9e7b 1465 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1466 rq_bio = rq->bio;
1467 if (do_write && rbd_dev->read_only) {
1468 __blk_end_request_all(rq, -EROFS);
00f1f36f 1469 continue;
602adf40
YS
1470 }
1471
1472 spin_unlock_irq(q->queue_lock);
1473
d1d25646 1474 down_read(&rbd_dev->header_rwsem);
e88a36ec 1475
d1d25646 1476 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1477 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1478 dout("request for non-existent snapshot");
1479 spin_lock_irq(q->queue_lock);
1480 __blk_end_request_all(rq, -ENXIO);
1481 continue;
e88a36ec
JD
1482 }
1483
d1d25646
JD
1484 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485
1486 up_read(&rbd_dev->header_rwsem);
1487
602adf40
YS
1488 dout("%s 0x%x bytes at 0x%llx\n",
1489 do_write ? "write" : "read",
bd919d45 1490 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1491
1fec7093
YS
1492 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493 coll = rbd_alloc_coll(num_segs);
1494 if (!coll) {
1495 spin_lock_irq(q->queue_lock);
1496 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1497 ceph_put_snap_context(snapc);
00f1f36f 1498 continue;
1fec7093
YS
1499 }
1500
602adf40
YS
1501 do {
1502 /* a bio clone to be passed down to OSD req */
bd919d45 1503 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
602adf40 1504 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1505 rbd_dev->header.object_prefix,
602adf40
YS
1506 ofs, size,
1507 NULL, NULL);
1fec7093 1508 kref_get(&coll->kref);
602adf40
YS
1509 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510 op_size, GFP_ATOMIC);
1511 if (!bio) {
1fec7093
YS
1512 rbd_coll_end_req_index(rq, coll, cur_seg,
1513 -ENOMEM, op_size);
1514 goto next_seg;
602adf40
YS
1515 }
1516
1fec7093 1517
602adf40
YS
1518 /* init OSD command: write or read */
1519 if (do_write)
1520 rbd_req_write(rq, rbd_dev,
d1d25646 1521 snapc,
602adf40 1522 ofs,
1fec7093
YS
1523 op_size, bio,
1524 coll, cur_seg);
602adf40
YS
1525 else
1526 rbd_req_read(rq, rbd_dev,
77dfe99f 1527 rbd_dev->snap_id,
602adf40 1528 ofs,
1fec7093
YS
1529 op_size, bio,
1530 coll, cur_seg);
602adf40 1531
1fec7093 1532next_seg:
602adf40
YS
1533 size -= op_size;
1534 ofs += op_size;
1535
1fec7093 1536 cur_seg++;
602adf40
YS
1537 rq_bio = next_bio;
1538 } while (size > 0);
1fec7093 1539 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1540
1541 if (bp)
1542 bio_pair_release(bp);
602adf40 1543 spin_lock_irq(q->queue_lock);
d1d25646
JD
1544
1545 ceph_put_snap_context(snapc);
602adf40
YS
1546 }
1547}
1548
1549/*
1550 * a queue callback. Makes sure that we don't create a bio that spans across
1551 * multiple osd objects. One exception would be with a single page bios,
1552 * which we handle later at bio_chain_clone
1553 */
1554static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555 struct bio_vec *bvec)
1556{
1557 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1558 unsigned int chunk_sectors;
1559 sector_t sector;
1560 unsigned int bio_sectors;
602adf40
YS
1561 int max;
1562
593a9e7b
AE
1563 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566
602adf40 1567 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1568 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1569 if (max < 0)
1570 max = 0; /* bio_add cannot handle a negative return */
1571 if (max <= bvec->bv_len && bio_sectors == 0)
1572 return bvec->bv_len;
1573 return max;
1574}
1575
1576static void rbd_free_disk(struct rbd_device *rbd_dev)
1577{
1578 struct gendisk *disk = rbd_dev->disk;
1579
1580 if (!disk)
1581 return;
1582
1583 rbd_header_free(&rbd_dev->header);
1584
1585 if (disk->flags & GENHD_FL_UP)
1586 del_gendisk(disk);
1587 if (disk->queue)
1588 blk_cleanup_queue(disk->queue);
1589 put_disk(disk);
1590}
1591
1592/*
1593 * reload the ondisk the header
1594 */
1595static int rbd_read_header(struct rbd_device *rbd_dev,
1596 struct rbd_image_header *header)
1597{
1598 ssize_t rc;
1599 struct rbd_image_header_ondisk *dh;
50f7c4c9 1600 u32 snap_count = 0;
59c2be1e 1601 u64 ver;
00f1f36f 1602 size_t len;
602adf40 1603
00f1f36f
AE
1604 /*
1605 * First reads the fixed-size header to determine the number
1606 * of snapshots, then re-reads it, along with all snapshot
1607 * records as well as their stored names.
1608 */
1609 len = sizeof (*dh);
602adf40 1610 while (1) {
602adf40
YS
1611 dh = kmalloc(len, GFP_KERNEL);
1612 if (!dh)
1613 return -ENOMEM;
1614
1615 rc = rbd_req_sync_read(rbd_dev,
1616 NULL, CEPH_NOSNAP,
0bed54dc 1617 rbd_dev->header_name,
602adf40 1618 0, len,
59c2be1e 1619 (char *)dh, &ver);
602adf40
YS
1620 if (rc < 0)
1621 goto out_dh;
1622
1623 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb 1624 if (rc < 0) {
00f1f36f 1625 if (rc == -ENXIO)
81e759fb 1626 pr_warning("unrecognized header format"
0bed54dc
AE
1627 " for image %s\n",
1628 rbd_dev->image_name);
602adf40 1629 goto out_dh;
81e759fb 1630 }
602adf40 1631
00f1f36f
AE
1632 if (snap_count == header->total_snaps)
1633 break;
1634
1635 snap_count = header->total_snaps;
1636 len = sizeof (*dh) +
1637 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638 header->snap_names_len;
1639
1640 rbd_header_free(header);
1641 kfree(dh);
602adf40 1642 }
59c2be1e 1643 header->obj_version = ver;
602adf40
YS
1644
1645out_dh:
1646 kfree(dh);
1647 return rc;
1648}
1649
1650/*
1651 * create a snapshot
1652 */
0ce1a794 1653static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1654 const char *snap_name,
1655 gfp_t gfp_flags)
1656{
1657 int name_len = strlen(snap_name);
1658 u64 new_snapid;
1659 int ret;
916d4d67 1660 void *data, *p, *e;
59c2be1e 1661 u64 ver;
1dbb4399 1662 struct ceph_mon_client *monc;
602adf40
YS
1663
1664 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1665 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1666 return -EINVAL;
1667
0ce1a794
AE
1668 monc = &rbd_dev->rbd_client->client->monc;
1669 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1670 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1671 if (ret < 0)
1672 return ret;
1673
1674 data = kmalloc(name_len + 16, gfp_flags);
1675 if (!data)
1676 return -ENOMEM;
1677
916d4d67
SW
1678 p = data;
1679 e = data + name_len + 16;
602adf40 1680
916d4d67
SW
1681 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1682 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1683
0bed54dc 1684 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1685 "rbd", "snap_add",
916d4d67 1686 data, p - data, &ver);
602adf40 1687
916d4d67 1688 kfree(data);
602adf40 1689
505cbb9b 1690 return ret < 0 ? ret : 0;
602adf40
YS
1691bad:
1692 return -ERANGE;
1693}
1694
dfc5606d
YS
1695static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1696{
1697 struct rbd_snap *snap;
a0593290 1698 struct rbd_snap *next;
dfc5606d 1699
a0593290 1700 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
dfc5606d 1701 __rbd_remove_snap_dev(rbd_dev, snap);
dfc5606d
YS
1702}
1703
602adf40
YS
1704/*
1705 * only read the first part of the ondisk header, without the snaps info
1706 */
263c6ca0 1707static int __rbd_refresh_header(struct rbd_device *rbd_dev)
602adf40
YS
1708{
1709 int ret;
1710 struct rbd_image_header h;
602adf40
YS
1711
1712 ret = rbd_read_header(rbd_dev, &h);
1713 if (ret < 0)
1714 return ret;
1715
a51aa0c0
JD
1716 down_write(&rbd_dev->header_rwsem);
1717
9db4b3e3 1718 /* resized? */
474ef7ce
JD
1719 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1720 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1721
1722 dout("setting size to %llu sectors", (unsigned long long) size);
1723 set_capacity(rbd_dev->disk, size);
1724 }
9db4b3e3 1725
849b4260 1726 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1727 kfree(rbd_dev->header.snap_sizes);
849b4260 1728 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1729 /* osd requests may still refer to snapc */
1730 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1731
a71b891b 1732 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1733 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1734 rbd_dev->header.total_snaps = h.total_snaps;
1735 rbd_dev->header.snapc = h.snapc;
1736 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1737 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1738 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1739 /* Free the extra copy of the object prefix */
1740 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1741 kfree(h.object_prefix);
1742
dfc5606d
YS
1743 ret = __rbd_init_snaps_header(rbd_dev);
1744
c666601a 1745 up_write(&rbd_dev->header_rwsem);
602adf40 1746
dfc5606d 1747 return ret;
602adf40
YS
1748}
1749
1750static int rbd_init_disk(struct rbd_device *rbd_dev)
1751{
1752 struct gendisk *disk;
1753 struct request_queue *q;
1754 int rc;
593a9e7b 1755 u64 segment_size;
602adf40
YS
1756 u64 total_size = 0;
1757
1758 /* contact OSD, request size info about the object being mapped */
1759 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1760 if (rc)
1761 return rc;
1762
dfc5606d
YS
1763 /* no need to lock here, as rbd_dev is not registered yet */
1764 rc = __rbd_init_snaps_header(rbd_dev);
1765 if (rc)
1766 return rc;
1767
cc9d734c 1768 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1769 if (rc)
1770 return rc;
1771
1772 /* create gendisk info */
1773 rc = -ENOMEM;
1774 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1775 if (!disk)
1776 goto out;
1777
f0f8cef5 1778 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
aedfec59 1779 rbd_dev->id);
602adf40
YS
1780 disk->major = rbd_dev->major;
1781 disk->first_minor = 0;
1782 disk->fops = &rbd_bd_ops;
1783 disk->private_data = rbd_dev;
1784
1785 /* init rq */
1786 rc = -ENOMEM;
1787 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1788 if (!q)
1789 goto out_disk;
029bcbd8 1790
593a9e7b
AE
1791 /* We use the default size, but let's be explicit about it. */
1792 blk_queue_physical_block_size(q, SECTOR_SIZE);
1793
029bcbd8 1794 /* set io sizes to object size */
593a9e7b
AE
1795 segment_size = rbd_obj_bytes(&rbd_dev->header);
1796 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1797 blk_queue_max_segment_size(q, segment_size);
1798 blk_queue_io_min(q, segment_size);
1799 blk_queue_io_opt(q, segment_size);
029bcbd8 1800
602adf40
YS
1801 blk_queue_merge_bvec(q, rbd_merge_bvec);
1802 disk->queue = q;
1803
1804 q->queuedata = rbd_dev;
1805
1806 rbd_dev->disk = disk;
1807 rbd_dev->q = q;
1808
1809 /* finally, announce the disk to the world */
593a9e7b 1810 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1811 add_disk(disk);
1812
1813 pr_info("%s: added with size 0x%llx\n",
1814 disk->disk_name, (unsigned long long)total_size);
1815 return 0;
1816
1817out_disk:
1818 put_disk(disk);
1819out:
1820 return rc;
1821}
1822
dfc5606d
YS
1823/*
1824 sysfs
1825*/
1826
593a9e7b
AE
1827static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1828{
1829 return container_of(dev, struct rbd_device, dev);
1830}
1831
dfc5606d
YS
1832static ssize_t rbd_size_show(struct device *dev,
1833 struct device_attribute *attr, char *buf)
1834{
593a9e7b 1835 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1836 sector_t size;
1837
1838 down_read(&rbd_dev->header_rwsem);
1839 size = get_capacity(rbd_dev->disk);
1840 up_read(&rbd_dev->header_rwsem);
dfc5606d 1841
a51aa0c0 1842 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1843}
1844
1845static ssize_t rbd_major_show(struct device *dev,
1846 struct device_attribute *attr, char *buf)
1847{
593a9e7b 1848 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1849
dfc5606d
YS
1850 return sprintf(buf, "%d\n", rbd_dev->major);
1851}
1852
1853static ssize_t rbd_client_id_show(struct device *dev,
1854 struct device_attribute *attr, char *buf)
602adf40 1855{
593a9e7b 1856 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1857
1dbb4399
AE
1858 return sprintf(buf, "client%lld\n",
1859 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1860}
1861
dfc5606d
YS
1862static ssize_t rbd_pool_show(struct device *dev,
1863 struct device_attribute *attr, char *buf)
602adf40 1864{
593a9e7b 1865 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1866
1867 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1868}
1869
9bb2f334
AE
1870static ssize_t rbd_pool_id_show(struct device *dev,
1871 struct device_attribute *attr, char *buf)
1872{
1873 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874
1875 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1876}
1877
dfc5606d
YS
1878static ssize_t rbd_name_show(struct device *dev,
1879 struct device_attribute *attr, char *buf)
1880{
593a9e7b 1881 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1882
0bed54dc 1883 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1884}
1885
1886static ssize_t rbd_snap_show(struct device *dev,
1887 struct device_attribute *attr,
1888 char *buf)
1889{
593a9e7b 1890 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1891
1892 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1893}
1894
1895static ssize_t rbd_image_refresh(struct device *dev,
1896 struct device_attribute *attr,
1897 const char *buf,
1898 size_t size)
1899{
593a9e7b 1900 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1901 int rc;
1902 int ret = size;
602adf40
YS
1903
1904 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1905
263c6ca0 1906 rc = __rbd_refresh_header(rbd_dev);
dfc5606d
YS
1907 if (rc < 0)
1908 ret = rc;
602adf40 1909
dfc5606d
YS
1910 mutex_unlock(&ctl_mutex);
1911 return ret;
1912}
602adf40 1913
dfc5606d
YS
1914static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1915static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1916static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1917static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1918static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1919static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1920static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1921static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1922static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1923
1924static struct attribute *rbd_attrs[] = {
1925 &dev_attr_size.attr,
1926 &dev_attr_major.attr,
1927 &dev_attr_client_id.attr,
1928 &dev_attr_pool.attr,
9bb2f334 1929 &dev_attr_pool_id.attr,
dfc5606d
YS
1930 &dev_attr_name.attr,
1931 &dev_attr_current_snap.attr,
1932 &dev_attr_refresh.attr,
1933 &dev_attr_create_snap.attr,
dfc5606d
YS
1934 NULL
1935};
1936
1937static struct attribute_group rbd_attr_group = {
1938 .attrs = rbd_attrs,
1939};
1940
1941static const struct attribute_group *rbd_attr_groups[] = {
1942 &rbd_attr_group,
1943 NULL
1944};
1945
1946static void rbd_sysfs_dev_release(struct device *dev)
1947{
1948}
1949
1950static struct device_type rbd_device_type = {
1951 .name = "rbd",
1952 .groups = rbd_attr_groups,
1953 .release = rbd_sysfs_dev_release,
1954};
1955
1956
1957/*
1958 sysfs - snapshots
1959*/
1960
1961static ssize_t rbd_snap_size_show(struct device *dev,
1962 struct device_attribute *attr,
1963 char *buf)
1964{
1965 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966
3591538f 1967 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
1968}
1969
1970static ssize_t rbd_snap_id_show(struct device *dev,
1971 struct device_attribute *attr,
1972 char *buf)
1973{
1974 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1975
3591538f 1976 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
1977}
1978
1979static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1980static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1981
1982static struct attribute *rbd_snap_attrs[] = {
1983 &dev_attr_snap_size.attr,
1984 &dev_attr_snap_id.attr,
1985 NULL,
1986};
1987
1988static struct attribute_group rbd_snap_attr_group = {
1989 .attrs = rbd_snap_attrs,
1990};
1991
1992static void rbd_snap_dev_release(struct device *dev)
1993{
1994 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995 kfree(snap->name);
1996 kfree(snap);
1997}
1998
1999static const struct attribute_group *rbd_snap_attr_groups[] = {
2000 &rbd_snap_attr_group,
2001 NULL
2002};
2003
2004static struct device_type rbd_snap_device_type = {
2005 .groups = rbd_snap_attr_groups,
2006 .release = rbd_snap_dev_release,
2007};
2008
2009static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2010 struct rbd_snap *snap)
2011{
2012 list_del(&snap->node);
2013 device_unregister(&snap->dev);
2014}
2015
2016static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2017 struct rbd_snap *snap,
2018 struct device *parent)
2019{
2020 struct device *dev = &snap->dev;
2021 int ret;
2022
2023 dev->type = &rbd_snap_device_type;
2024 dev->parent = parent;
2025 dev->release = rbd_snap_dev_release;
2026 dev_set_name(dev, "snap_%s", snap->name);
2027 ret = device_register(dev);
2028
2029 return ret;
2030}
2031
2032static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2033 int i, const char *name,
2034 struct rbd_snap **snapp)
2035{
2036 int ret;
2037 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2038 if (!snap)
2039 return -ENOMEM;
2040 snap->name = kstrdup(name, GFP_KERNEL);
2041 snap->size = rbd_dev->header.snap_sizes[i];
2042 snap->id = rbd_dev->header.snapc->snaps[i];
2043 if (device_is_registered(&rbd_dev->dev)) {
2044 ret = rbd_register_snap_dev(rbd_dev, snap,
2045 &rbd_dev->dev);
2046 if (ret < 0)
2047 goto err;
2048 }
2049 *snapp = snap;
2050 return 0;
2051err:
2052 kfree(snap->name);
2053 kfree(snap);
2054 return ret;
2055}
2056
2057/*
2058 * search for the previous snap in a null delimited string list
2059 */
2060const char *rbd_prev_snap_name(const char *name, const char *start)
2061{
2062 if (name < start + 2)
2063 return NULL;
2064
2065 name -= 2;
2066 while (*name) {
2067 if (name == start)
2068 return start;
2069 name--;
2070 }
2071 return name + 1;
2072}
2073
2074/*
2075 * compare the old list of snapshots that we have to what's in the header
2076 * and update it accordingly. Note that the header holds the snapshots
2077 * in a reverse order (from newest to oldest) and we need to go from
2078 * older to new so that we don't get a duplicate snap name when
2079 * doing the process (e.g., removed snapshot and recreated a new
2080 * one with the same name.
2081 */
2082static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2083{
2084 const char *name, *first_name;
2085 int i = rbd_dev->header.total_snaps;
2086 struct rbd_snap *snap, *old_snap = NULL;
2087 int ret;
2088 struct list_head *p, *n;
2089
2090 first_name = rbd_dev->header.snap_names;
2091 name = first_name + rbd_dev->header.snap_names_len;
2092
2093 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2094 u64 cur_id;
2095
2096 old_snap = list_entry(p, struct rbd_snap, node);
2097
2098 if (i)
2099 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2100
2101 if (!i || old_snap->id < cur_id) {
e88a36ec
JD
2102 /*
2103 * old_snap->id was skipped, thus was
2104 * removed. If this rbd_dev is mapped to
2105 * the removed snapshot, record that it no
2106 * longer exists, to prevent further I/O.
2107 */
2108 if (rbd_dev->snap_id == old_snap->id)
2109 rbd_dev->snap_exists = false;
dfc5606d
YS
2110 __rbd_remove_snap_dev(rbd_dev, old_snap);
2111 continue;
2112 }
2113 if (old_snap->id == cur_id) {
2114 /* we have this snapshot already */
2115 i--;
2116 name = rbd_prev_snap_name(name, first_name);
2117 continue;
2118 }
2119 for (; i > 0;
2120 i--, name = rbd_prev_snap_name(name, first_name)) {
2121 if (!name) {
2122 WARN_ON(1);
2123 return -EINVAL;
2124 }
2125 cur_id = rbd_dev->header.snapc->snaps[i];
2126 /* snapshot removal? handle it above */
2127 if (cur_id >= old_snap->id)
2128 break;
2129 /* a new snapshot */
2130 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2131 if (ret < 0)
2132 return ret;
2133
2134 /* note that we add it backward so using n and not p */
2135 list_add(&snap->node, n);
2136 p = &snap->node;
2137 }
2138 }
2139 /* we're done going over the old snap list, just add what's left */
2140 for (; i > 0; i--) {
2141 name = rbd_prev_snap_name(name, first_name);
2142 if (!name) {
2143 WARN_ON(1);
2144 return -EINVAL;
2145 }
2146 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2147 if (ret < 0)
2148 return ret;
2149 list_add(&snap->node, &rbd_dev->snaps);
2150 }
2151
2152 return 0;
2153}
2154
dfc5606d
YS
2155static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2156{
f0f8cef5 2157 int ret;
dfc5606d
YS
2158 struct device *dev;
2159 struct rbd_snap *snap;
2160
2161 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2162 dev = &rbd_dev->dev;
2163
2164 dev->bus = &rbd_bus_type;
2165 dev->type = &rbd_device_type;
2166 dev->parent = &rbd_root_dev;
2167 dev->release = rbd_dev_release;
2168 dev_set_name(dev, "%d", rbd_dev->id);
2169 ret = device_register(dev);
2170 if (ret < 0)
f0f8cef5 2171 goto out;
dfc5606d
YS
2172
2173 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2174 ret = rbd_register_snap_dev(rbd_dev, snap,
2175 &rbd_dev->dev);
2176 if (ret < 0)
602adf40
YS
2177 break;
2178 }
f0f8cef5 2179out:
dfc5606d
YS
2180 mutex_unlock(&ctl_mutex);
2181 return ret;
602adf40
YS
2182}
2183
dfc5606d
YS
2184static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2185{
2186 device_unregister(&rbd_dev->dev);
2187}
2188
59c2be1e
YS
2189static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2190{
2191 int ret, rc;
2192
2193 do {
0bed54dc 2194 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
59c2be1e
YS
2195 rbd_dev->header.obj_version);
2196 if (ret == -ERANGE) {
2197 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
263c6ca0 2198 rc = __rbd_refresh_header(rbd_dev);
59c2be1e
YS
2199 mutex_unlock(&ctl_mutex);
2200 if (rc < 0)
2201 return rc;
2202 }
2203 } while (ret == -ERANGE);
2204
2205 return ret;
2206}
2207
1ddbe94e
AE
2208static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2209
2210/*
499afd5b
AE
2211 * Get a unique rbd identifier for the given new rbd_dev, and add
2212 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2213 */
499afd5b 2214static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2215{
499afd5b
AE
2216 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2217
2218 spin_lock(&rbd_dev_list_lock);
2219 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2220 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2221}
b7f23c36 2222
1ddbe94e 2223/*
499afd5b
AE
2224 * Remove an rbd_dev from the global list, and record that its
2225 * identifier is no longer in use.
1ddbe94e 2226 */
499afd5b 2227static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2228{
d184f6bf
AE
2229 struct list_head *tmp;
2230 int rbd_id = rbd_dev->id;
2231 int max_id;
2232
2233 BUG_ON(rbd_id < 1);
499afd5b
AE
2234
2235 spin_lock(&rbd_dev_list_lock);
2236 list_del_init(&rbd_dev->node);
d184f6bf
AE
2237
2238 /*
2239 * If the id being "put" is not the current maximum, there
2240 * is nothing special we need to do.
2241 */
2242 if (rbd_id != atomic64_read(&rbd_id_max)) {
2243 spin_unlock(&rbd_dev_list_lock);
2244 return;
2245 }
2246
2247 /*
2248 * We need to update the current maximum id. Search the
2249 * list to find out what it is. We're more likely to find
2250 * the maximum at the end, so search the list backward.
2251 */
2252 max_id = 0;
2253 list_for_each_prev(tmp, &rbd_dev_list) {
2254 struct rbd_device *rbd_dev;
2255
2256 rbd_dev = list_entry(tmp, struct rbd_device, node);
2257 if (rbd_id > max_id)
2258 max_id = rbd_id;
2259 }
499afd5b 2260 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2261
1ddbe94e 2262 /*
d184f6bf
AE
2263 * The max id could have been updated by rbd_id_get(), in
2264 * which case it now accurately reflects the new maximum.
2265 * Be careful not to overwrite the maximum value in that
2266 * case.
1ddbe94e 2267 */
d184f6bf 2268 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2269}
2270
e28fff26
AE
2271/*
2272 * Skips over white space at *buf, and updates *buf to point to the
2273 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2274 * the token (string of non-white space characters) found. Note
2275 * that *buf must be terminated with '\0'.
e28fff26
AE
2276 */
2277static inline size_t next_token(const char **buf)
2278{
2279 /*
2280 * These are the characters that produce nonzero for
2281 * isspace() in the "C" and "POSIX" locales.
2282 */
2283 const char *spaces = " \f\n\r\t\v";
2284
2285 *buf += strspn(*buf, spaces); /* Find start of token */
2286
2287 return strcspn(*buf, spaces); /* Return token length */
2288}
2289
2290/*
2291 * Finds the next token in *buf, and if the provided token buffer is
2292 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2293 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2294 * must be terminated with '\0' on entry.
e28fff26
AE
2295 *
2296 * Returns the length of the token found (not including the '\0').
2297 * Return value will be 0 if no token is found, and it will be >=
2298 * token_size if the token would not fit.
2299 *
593a9e7b 2300 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2301 * found token. Note that this occurs even if the token buffer is
2302 * too small to hold it.
2303 */
2304static inline size_t copy_token(const char **buf,
2305 char *token,
2306 size_t token_size)
2307{
2308 size_t len;
2309
2310 len = next_token(buf);
2311 if (len < token_size) {
2312 memcpy(token, *buf, len);
2313 *(token + len) = '\0';
2314 }
2315 *buf += len;
2316
2317 return len;
2318}
2319
ea3352f4
AE
2320/*
2321 * Finds the next token in *buf, dynamically allocates a buffer big
2322 * enough to hold a copy of it, and copies the token into the new
2323 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2324 * that a duplicate buffer is created even for a zero-length token.
2325 *
2326 * Returns a pointer to the newly-allocated duplicate, or a null
2327 * pointer if memory for the duplicate was not available. If
2328 * the lenp argument is a non-null pointer, the length of the token
2329 * (not including the '\0') is returned in *lenp.
2330 *
2331 * If successful, the *buf pointer will be updated to point beyond
2332 * the end of the found token.
2333 *
2334 * Note: uses GFP_KERNEL for allocation.
2335 */
2336static inline char *dup_token(const char **buf, size_t *lenp)
2337{
2338 char *dup;
2339 size_t len;
2340
2341 len = next_token(buf);
2342 dup = kmalloc(len + 1, GFP_KERNEL);
2343 if (!dup)
2344 return NULL;
2345
2346 memcpy(dup, *buf, len);
2347 *(dup + len) = '\0';
2348 *buf += len;
2349
2350 if (lenp)
2351 *lenp = len;
2352
2353 return dup;
2354}
2355
a725f65e 2356/*
0bed54dc 2357 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2358 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2359 * on the list of monitor addresses and other options provided via
2360 * /sys/bus/rbd/add.
d22f76e7
AE
2361 *
2362 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2363 */
2364static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2365 const char *buf,
7ef3214a 2366 const char **mon_addrs,
5214ecc4 2367 size_t *mon_addrs_size,
e28fff26 2368 char *options,
0bed54dc 2369 size_t options_size)
e28fff26 2370{
d22f76e7
AE
2371 size_t len;
2372 int ret;
e28fff26
AE
2373
2374 /* The first four tokens are required */
2375
7ef3214a
AE
2376 len = next_token(&buf);
2377 if (!len)
a725f65e 2378 return -EINVAL;
5214ecc4 2379 *mon_addrs_size = len + 1;
7ef3214a
AE
2380 *mon_addrs = buf;
2381
2382 buf += len;
a725f65e 2383
e28fff26
AE
2384 len = copy_token(&buf, options, options_size);
2385 if (!len || len >= options_size)
2386 return -EINVAL;
2387
bf3e5ae1 2388 ret = -ENOMEM;
d22f76e7
AE
2389 rbd_dev->pool_name = dup_token(&buf, NULL);
2390 if (!rbd_dev->pool_name)
d22f76e7 2391 goto out_err;
e28fff26 2392
0bed54dc
AE
2393 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2394 if (!rbd_dev->image_name)
bf3e5ae1 2395 goto out_err;
a725f65e 2396
cb8627c7
AE
2397 /* Create the name of the header object */
2398
0bed54dc 2399 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2400 + sizeof (RBD_SUFFIX),
2401 GFP_KERNEL);
0bed54dc 2402 if (!rbd_dev->header_name)
cb8627c7 2403 goto out_err;
0bed54dc 2404 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2405
e28fff26 2406 /*
820a5f3e
AE
2407 * The snapshot name is optional. If none is is supplied,
2408 * we use the default value.
e28fff26 2409 */
820a5f3e
AE
2410 rbd_dev->snap_name = dup_token(&buf, &len);
2411 if (!rbd_dev->snap_name)
2412 goto out_err;
2413 if (!len) {
2414 /* Replace the empty name with the default */
2415 kfree(rbd_dev->snap_name);
2416 rbd_dev->snap_name
2417 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2418 if (!rbd_dev->snap_name)
2419 goto out_err;
2420
e28fff26
AE
2421 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2422 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2423 }
e28fff26 2424
a725f65e 2425 return 0;
d22f76e7
AE
2426
2427out_err:
0bed54dc
AE
2428 kfree(rbd_dev->header_name);
2429 kfree(rbd_dev->image_name);
d22f76e7
AE
2430 kfree(rbd_dev->pool_name);
2431 rbd_dev->pool_name = NULL;
2432
2433 return ret;
a725f65e
AE
2434}
2435
59c2be1e
YS
2436static ssize_t rbd_add(struct bus_type *bus,
2437 const char *buf,
2438 size_t count)
602adf40 2439{
cb8627c7
AE
2440 char *options;
2441 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2442 const char *mon_addrs = NULL;
2443 size_t mon_addrs_size = 0;
27cc2594
AE
2444 struct ceph_osd_client *osdc;
2445 int rc = -ENOMEM;
602adf40
YS
2446
2447 if (!try_module_get(THIS_MODULE))
2448 return -ENODEV;
2449
60571c7d 2450 options = kmalloc(count, GFP_KERNEL);
602adf40 2451 if (!options)
27cc2594 2452 goto err_nomem;
cb8627c7
AE
2453 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2454 if (!rbd_dev)
2455 goto err_nomem;
602adf40
YS
2456
2457 /* static rbd_device initialization */
2458 spin_lock_init(&rbd_dev->lock);
2459 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2460 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2461 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2462
d184f6bf 2463 /* generate unique id: find highest unique id, add one */
499afd5b 2464 rbd_id_get(rbd_dev);
602adf40 2465
a725f65e 2466 /* Fill in the device name, now that we have its id. */
81a89793
AE
2467 BUILD_BUG_ON(DEV_NAME_LEN
2468 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2469 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
a725f65e 2470
602adf40 2471 /* parse add command */
7ef3214a 2472 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2473 options, count);
a725f65e 2474 if (rc)
f0f8cef5 2475 goto err_put_id;
e124a82f 2476
5214ecc4
AE
2477 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2478 options);
d720bcb0
AE
2479 if (IS_ERR(rbd_dev->rbd_client)) {
2480 rc = PTR_ERR(rbd_dev->rbd_client);
f0f8cef5 2481 goto err_put_id;
d720bcb0 2482 }
602adf40 2483
602adf40 2484 /* pick the pool */
1dbb4399 2485 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2486 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2487 if (rc < 0)
2488 goto err_out_client;
9bb2f334 2489 rbd_dev->pool_id = rc;
602adf40
YS
2490
2491 /* register our block device */
27cc2594
AE
2492 rc = register_blkdev(0, rbd_dev->name);
2493 if (rc < 0)
602adf40 2494 goto err_out_client;
27cc2594 2495 rbd_dev->major = rc;
602adf40 2496
dfc5606d
YS
2497 rc = rbd_bus_add_dev(rbd_dev);
2498 if (rc)
766fc439
YS
2499 goto err_out_blkdev;
2500
32eec68d
AE
2501 /*
2502 * At this point cleanup in the event of an error is the job
2503 * of the sysfs code (initiated by rbd_bus_del_dev()).
2504 *
2505 * Set up and announce blkdev mapping.
2506 */
602adf40
YS
2507 rc = rbd_init_disk(rbd_dev);
2508 if (rc)
766fc439 2509 goto err_out_bus;
602adf40 2510
59c2be1e
YS
2511 rc = rbd_init_watch_dev(rbd_dev);
2512 if (rc)
2513 goto err_out_bus;
2514
602adf40
YS
2515 return count;
2516
766fc439 2517err_out_bus:
766fc439
YS
2518 /* this will also clean up rest of rbd_dev stuff */
2519
2520 rbd_bus_del_dev(rbd_dev);
2521 kfree(options);
766fc439
YS
2522 return rc;
2523
602adf40
YS
2524err_out_blkdev:
2525 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2526err_out_client:
2527 rbd_put_client(rbd_dev);
f0f8cef5 2528err_put_id:
cb8627c7 2529 if (rbd_dev->pool_name) {
820a5f3e 2530 kfree(rbd_dev->snap_name);
0bed54dc
AE
2531 kfree(rbd_dev->header_name);
2532 kfree(rbd_dev->image_name);
cb8627c7
AE
2533 kfree(rbd_dev->pool_name);
2534 }
499afd5b 2535 rbd_id_put(rbd_dev);
27cc2594 2536err_nomem:
27cc2594 2537 kfree(rbd_dev);
cb8627c7 2538 kfree(options);
27cc2594 2539
602adf40
YS
2540 dout("Error adding device %s\n", buf);
2541 module_put(THIS_MODULE);
27cc2594
AE
2542
2543 return (ssize_t) rc;
602adf40
YS
2544}
2545
2546static struct rbd_device *__rbd_get_dev(unsigned long id)
2547{
2548 struct list_head *tmp;
2549 struct rbd_device *rbd_dev;
2550
e124a82f 2551 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2552 list_for_each(tmp, &rbd_dev_list) {
2553 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2554 if (rbd_dev->id == id) {
2555 spin_unlock(&rbd_dev_list_lock);
602adf40 2556 return rbd_dev;
e124a82f 2557 }
602adf40 2558 }
e124a82f 2559 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2560 return NULL;
2561}
2562
dfc5606d 2563static void rbd_dev_release(struct device *dev)
602adf40 2564{
593a9e7b 2565 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2566
1dbb4399
AE
2567 if (rbd_dev->watch_request) {
2568 struct ceph_client *client = rbd_dev->rbd_client->client;
2569
2570 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2571 rbd_dev->watch_request);
1dbb4399 2572 }
59c2be1e 2573 if (rbd_dev->watch_event)
0bed54dc 2574 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
59c2be1e 2575
602adf40
YS
2576 rbd_put_client(rbd_dev);
2577
2578 /* clean up and free blkdev */
2579 rbd_free_disk(rbd_dev);
2580 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2581
2582 /* done with the id, and with the rbd_dev */
820a5f3e 2583 kfree(rbd_dev->snap_name);
0bed54dc 2584 kfree(rbd_dev->header_name);
d22f76e7 2585 kfree(rbd_dev->pool_name);
0bed54dc 2586 kfree(rbd_dev->image_name);
32eec68d 2587 rbd_id_put(rbd_dev);
602adf40
YS
2588 kfree(rbd_dev);
2589
2590 /* release module ref */
2591 module_put(THIS_MODULE);
602adf40
YS
2592}
2593
dfc5606d
YS
2594static ssize_t rbd_remove(struct bus_type *bus,
2595 const char *buf,
2596 size_t count)
602adf40
YS
2597{
2598 struct rbd_device *rbd_dev = NULL;
2599 int target_id, rc;
2600 unsigned long ul;
2601 int ret = count;
2602
2603 rc = strict_strtoul(buf, 10, &ul);
2604 if (rc)
2605 return rc;
2606
2607 /* convert to int; abort if we lost anything in the conversion */
2608 target_id = (int) ul;
2609 if (target_id != ul)
2610 return -EINVAL;
2611
2612 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2613
2614 rbd_dev = __rbd_get_dev(target_id);
2615 if (!rbd_dev) {
2616 ret = -ENOENT;
2617 goto done;
2618 }
2619
dfc5606d
YS
2620 __rbd_remove_all_snaps(rbd_dev);
2621 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2622
2623done:
2624 mutex_unlock(&ctl_mutex);
2625 return ret;
2626}
2627
dfc5606d
YS
2628static ssize_t rbd_snap_add(struct device *dev,
2629 struct device_attribute *attr,
2630 const char *buf,
2631 size_t count)
602adf40 2632{
593a9e7b 2633 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2634 int ret;
2635 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2636 if (!name)
2637 return -ENOMEM;
2638
dfc5606d 2639 snprintf(name, count, "%s", buf);
602adf40
YS
2640
2641 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2642
602adf40
YS
2643 ret = rbd_header_add_snap(rbd_dev,
2644 name, GFP_KERNEL);
2645 if (ret < 0)
59c2be1e 2646 goto err_unlock;
602adf40 2647
263c6ca0 2648 ret = __rbd_refresh_header(rbd_dev);
602adf40 2649 if (ret < 0)
59c2be1e
YS
2650 goto err_unlock;
2651
2652 /* shouldn't hold ctl_mutex when notifying.. notify might
2653 trigger a watch callback that would need to get that mutex */
2654 mutex_unlock(&ctl_mutex);
2655
2656 /* make a best effort, don't error if failed */
0bed54dc 2657 rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
602adf40
YS
2658
2659 ret = count;
59c2be1e
YS
2660 kfree(name);
2661 return ret;
2662
2663err_unlock:
602adf40 2664 mutex_unlock(&ctl_mutex);
602adf40
YS
2665 kfree(name);
2666 return ret;
2667}
2668
602adf40
YS
2669/*
2670 * create control files in sysfs
dfc5606d 2671 * /sys/bus/rbd/...
602adf40
YS
2672 */
2673static int rbd_sysfs_init(void)
2674{
dfc5606d 2675 int ret;
602adf40 2676
fed4c143 2677 ret = device_register(&rbd_root_dev);
21079786 2678 if (ret < 0)
dfc5606d 2679 return ret;
602adf40 2680
fed4c143
AE
2681 ret = bus_register(&rbd_bus_type);
2682 if (ret < 0)
2683 device_unregister(&rbd_root_dev);
602adf40 2684
602adf40
YS
2685 return ret;
2686}
2687
2688static void rbd_sysfs_cleanup(void)
2689{
dfc5606d 2690 bus_unregister(&rbd_bus_type);
fed4c143 2691 device_unregister(&rbd_root_dev);
602adf40
YS
2692}
2693
2694int __init rbd_init(void)
2695{
2696 int rc;
2697
2698 rc = rbd_sysfs_init();
2699 if (rc)
2700 return rc;
f0f8cef5 2701 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2702 return 0;
2703}
2704
2705void __exit rbd_exit(void)
2706{
2707 rbd_sysfs_cleanup();
2708}
2709
2710module_init(rbd_init);
2711module_exit(rbd_exit);
2712
2713MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2714MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2715MODULE_DESCRIPTION("rados block device");
2716
2717/* following authorship retained from original osdblk.c */
2718MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2719
2720MODULE_LICENSE("GPL");