rbd: use a single value of snap_name to mean no snap
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define DRV_NAME "rbd"
45#define DRV_NAME_LONG "rbd (rados block device)"
46
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
21079786 49#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
602adf40
YS
50#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024
53
54#define RBD_SNAP_HEAD_NAME "-"
55
56#define DEV_NAME_LEN 32
57
59c2be1e
YS
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
602adf40
YS
60/*
61 * block device image metadata (in-memory version)
62 */
63struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
74
75 char *snap_names;
76 u64 *snap_sizes;
59c2be1e
YS
77
78 u64 obj_version;
79};
80
81struct rbd_options {
82 int notify_timeout;
602adf40
YS
83};
84
85/*
86 * an instance of the client. multiple devices may share a client.
87 */
88struct rbd_client {
89 struct ceph_client *client;
59c2be1e 90 struct rbd_options *rbd_opts;
602adf40
YS
91 struct kref kref;
92 struct list_head node;
93};
94
1fec7093
YS
95struct rbd_req_coll;
96
602adf40
YS
97/*
98 * a single io request
99 */
100struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
1fec7093
YS
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
602adf40
YS
123};
124
dfc5606d
YS
125struct rbd_snap {
126 struct device dev;
127 const char *name;
128 size_t size;
129 struct list_head node;
130 u64 id;
131};
132
602adf40
YS
133/*
134 * a single device
135 */
136struct rbd_device {
137 int id; /* blkdev unique id */
138
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
142
602adf40
YS
143 struct rbd_client *rbd_client;
144
145 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
146
147 spinlock_t lock; /* queue lock */
148
149 struct rbd_image_header header;
150 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
151 int obj_len;
152 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
153 char pool_name[RBD_MAX_POOL_NAME_LEN];
154 int poolid;
155
59c2be1e
YS
156 struct ceph_osd_event *watch_event;
157 struct ceph_osd_request *watch_request;
158
602adf40
YS
159 char snap_name[RBD_MAX_SNAP_NAME_LEN];
160 u32 cur_snap; /* index+1 of current snapshot within snap context
161 0 - for the head */
162 int read_only;
163
164 struct list_head node;
dfc5606d
YS
165
166 /* list of snapshots */
167 struct list_head snaps;
168
169 /* sysfs related */
170 struct device dev;
171};
172
173static struct bus_type rbd_bus_type = {
174 .name = "rbd",
602adf40
YS
175};
176
21079786 177static DEFINE_SPINLOCK(node_lock); /* protects client get/put */
602adf40 178
602adf40
YS
179static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
180static LIST_HEAD(rbd_dev_list); /* devices */
181static LIST_HEAD(rbd_client_list); /* clients */
182
dfc5606d
YS
183static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
184static void rbd_dev_release(struct device *dev);
dfc5606d
YS
185static ssize_t rbd_snap_add(struct device *dev,
186 struct device_attribute *attr,
187 const char *buf,
188 size_t count);
189static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 190 struct rbd_snap *snap);
dfc5606d
YS
191
192
193static struct rbd_device *dev_to_rbd(struct device *dev)
194{
195 return container_of(dev, struct rbd_device, dev);
196}
197
198static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
199{
200 return get_device(&rbd_dev->dev);
201}
202
203static void rbd_put_dev(struct rbd_device *rbd_dev)
204{
205 put_device(&rbd_dev->dev);
206}
602adf40 207
59c2be1e
YS
208static int __rbd_update_snaps(struct rbd_device *rbd_dev);
209
602adf40
YS
210static int rbd_open(struct block_device *bdev, fmode_t mode)
211{
212 struct gendisk *disk = bdev->bd_disk;
213 struct rbd_device *rbd_dev = disk->private_data;
214
dfc5606d
YS
215 rbd_get_dev(rbd_dev);
216
602adf40
YS
217 set_device_ro(bdev, rbd_dev->read_only);
218
219 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
220 return -EROFS;
221
222 return 0;
223}
224
dfc5606d
YS
225static int rbd_release(struct gendisk *disk, fmode_t mode)
226{
227 struct rbd_device *rbd_dev = disk->private_data;
228
229 rbd_put_dev(rbd_dev);
230
231 return 0;
232}
233
602adf40
YS
234static const struct block_device_operations rbd_bd_ops = {
235 .owner = THIS_MODULE,
236 .open = rbd_open,
dfc5606d 237 .release = rbd_release,
602adf40
YS
238};
239
240/*
241 * Initialize an rbd client instance.
242 * We own *opt.
243 */
59c2be1e
YS
244static struct rbd_client *rbd_client_create(struct ceph_options *opt,
245 struct rbd_options *rbd_opts)
602adf40
YS
246{
247 struct rbd_client *rbdc;
248 int ret = -ENOMEM;
249
250 dout("rbd_client_create\n");
251 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
252 if (!rbdc)
253 goto out_opt;
254
255 kref_init(&rbdc->kref);
256 INIT_LIST_HEAD(&rbdc->node);
257
6ab00d46 258 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
602adf40
YS
259 if (IS_ERR(rbdc->client))
260 goto out_rbdc;
28f259b7 261 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
262
263 ret = ceph_open_session(rbdc->client);
264 if (ret < 0)
265 goto out_err;
266
59c2be1e
YS
267 rbdc->rbd_opts = rbd_opts;
268
602adf40
YS
269 spin_lock(&node_lock);
270 list_add_tail(&rbdc->node, &rbd_client_list);
271 spin_unlock(&node_lock);
272
273 dout("rbd_client_create created %p\n", rbdc);
274 return rbdc;
275
276out_err:
277 ceph_destroy_client(rbdc->client);
602adf40
YS
278out_rbdc:
279 kfree(rbdc);
280out_opt:
28f259b7
VK
281 if (opt)
282 ceph_destroy_options(opt);
283 return ERR_PTR(ret);
602adf40
YS
284}
285
286/*
287 * Find a ceph client with specific addr and configuration.
288 */
289static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
290{
291 struct rbd_client *client_node;
292
293 if (opt->flags & CEPH_OPT_NOSHARE)
294 return NULL;
295
296 list_for_each_entry(client_node, &rbd_client_list, node)
297 if (ceph_compare_options(opt, client_node->client) == 0)
298 return client_node;
299 return NULL;
300}
301
59c2be1e
YS
302/*
303 * mount options
304 */
305enum {
306 Opt_notify_timeout,
307 Opt_last_int,
308 /* int args above */
309 Opt_last_string,
310 /* string args above */
311};
312
313static match_table_t rbdopt_tokens = {
314 {Opt_notify_timeout, "notify_timeout=%d"},
315 /* int args above */
316 /* string args above */
317 {-1, NULL}
318};
319
320static int parse_rbd_opts_token(char *c, void *private)
321{
322 struct rbd_options *rbdopt = private;
323 substring_t argstr[MAX_OPT_ARGS];
324 int token, intval, ret;
325
21079786 326 token = match_token(c, rbdopt_tokens, argstr);
59c2be1e
YS
327 if (token < 0)
328 return -EINVAL;
329
330 if (token < Opt_last_int) {
331 ret = match_int(&argstr[0], &intval);
332 if (ret < 0) {
333 pr_err("bad mount option arg (not int) "
334 "at '%s'\n", c);
335 return ret;
336 }
337 dout("got int token %d val %d\n", token, intval);
338 } else if (token > Opt_last_int && token < Opt_last_string) {
339 dout("got string token %d val %s\n", token,
340 argstr[0].from);
341 } else {
342 dout("got token %d\n", token);
343 }
344
345 switch (token) {
346 case Opt_notify_timeout:
347 rbdopt->notify_timeout = intval;
348 break;
349 default:
350 BUG_ON(token);
351 }
352 return 0;
353}
354
602adf40
YS
355/*
356 * Get a ceph client with specific addr and configuration, if one does
357 * not exist create it.
358 */
359static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
360 char *options)
361{
362 struct rbd_client *rbdc;
363 struct ceph_options *opt;
364 int ret;
59c2be1e
YS
365 struct rbd_options *rbd_opts;
366
367 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
368 if (!rbd_opts)
369 return -ENOMEM;
370
371 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 372
ee57741c 373 opt = ceph_parse_options(options, mon_addr,
21079786
AE
374 mon_addr + strlen(mon_addr),
375 parse_rbd_opts_token, rbd_opts);
ee57741c
AE
376 if (IS_ERR(opt)) {
377 ret = PTR_ERR(opt);
59c2be1e 378 goto done_err;
ee57741c 379 }
602adf40
YS
380
381 spin_lock(&node_lock);
382 rbdc = __rbd_client_find(opt);
383 if (rbdc) {
384 ceph_destroy_options(opt);
97bb59a0 385 kfree(rbd_opts);
602adf40
YS
386
387 /* using an existing client */
388 kref_get(&rbdc->kref);
389 rbd_dev->rbd_client = rbdc;
602adf40
YS
390 spin_unlock(&node_lock);
391 return 0;
392 }
393 spin_unlock(&node_lock);
394
59c2be1e
YS
395 rbdc = rbd_client_create(opt, rbd_opts);
396 if (IS_ERR(rbdc)) {
397 ret = PTR_ERR(rbdc);
398 goto done_err;
399 }
602adf40
YS
400
401 rbd_dev->rbd_client = rbdc;
602adf40 402 return 0;
59c2be1e
YS
403done_err:
404 kfree(rbd_opts);
405 return ret;
602adf40
YS
406}
407
408/*
409 * Destroy ceph client
d23a4b3f
AE
410 *
411 * Caller must hold node_lock.
602adf40
YS
412 */
413static void rbd_client_release(struct kref *kref)
414{
415 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
416
417 dout("rbd_release_client %p\n", rbdc);
602adf40 418 list_del(&rbdc->node);
602adf40
YS
419
420 ceph_destroy_client(rbdc->client);
59c2be1e 421 kfree(rbdc->rbd_opts);
602adf40
YS
422 kfree(rbdc);
423}
424
425/*
426 * Drop reference to ceph client node. If it's not referenced anymore, release
427 * it.
428 */
429static void rbd_put_client(struct rbd_device *rbd_dev)
430{
d23a4b3f 431 spin_lock(&node_lock);
602adf40 432 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
d23a4b3f 433 spin_unlock(&node_lock);
602adf40 434 rbd_dev->rbd_client = NULL;
602adf40
YS
435}
436
1fec7093
YS
437/*
438 * Destroy requests collection
439 */
440static void rbd_coll_release(struct kref *kref)
441{
442 struct rbd_req_coll *coll =
443 container_of(kref, struct rbd_req_coll, kref);
444
445 dout("rbd_coll_release %p\n", coll);
446 kfree(coll);
447}
602adf40
YS
448
449/*
450 * Create a new header structure, translate header format from the on-disk
451 * header.
452 */
453static int rbd_header_from_disk(struct rbd_image_header *header,
454 struct rbd_image_header_ondisk *ondisk,
455 int allocated_snaps,
456 gfp_t gfp_flags)
457{
458 int i;
459 u32 snap_count = le32_to_cpu(ondisk->snap_count);
460 int ret = -ENOMEM;
461
21079786 462 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 463 return -ENXIO;
81e759fb 464
602adf40 465 init_rwsem(&header->snap_rwsem);
602adf40
YS
466 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
467 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
21079786 468 snap_count * sizeof (*ondisk),
602adf40
YS
469 gfp_flags);
470 if (!header->snapc)
471 return -ENOMEM;
472 if (snap_count) {
473 header->snap_names = kmalloc(header->snap_names_len,
474 GFP_KERNEL);
475 if (!header->snap_names)
476 goto err_snapc;
477 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478 GFP_KERNEL);
479 if (!header->snap_sizes)
480 goto err_names;
481 } else {
482 header->snap_names = NULL;
483 header->snap_sizes = NULL;
484 }
485 memcpy(header->block_name, ondisk->block_name,
486 sizeof(ondisk->block_name));
487
488 header->image_size = le64_to_cpu(ondisk->image_size);
489 header->obj_order = ondisk->options.order;
490 header->crypt_type = ondisk->options.crypt_type;
491 header->comp_type = ondisk->options.comp_type;
492
493 atomic_set(&header->snapc->nref, 1);
494 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 header->snapc->num_snaps = snap_count;
496 header->total_snaps = snap_count;
497
21079786 498 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
499 for (i = 0; i < snap_count; i++) {
500 header->snapc->snaps[i] =
501 le64_to_cpu(ondisk->snaps[i].id);
502 header->snap_sizes[i] =
503 le64_to_cpu(ondisk->snaps[i].image_size);
504 }
505
506 /* copy snapshot names */
507 memcpy(header->snap_names, &ondisk->snaps[i],
508 header->snap_names_len);
509 }
510
511 return 0;
512
513err_names:
514 kfree(header->snap_names);
515err_snapc:
516 kfree(header->snapc);
517 return ret;
518}
519
520static int snap_index(struct rbd_image_header *header, int snap_num)
521{
522 return header->total_snaps - snap_num;
523}
524
525static u64 cur_snap_id(struct rbd_device *rbd_dev)
526{
527 struct rbd_image_header *header = &rbd_dev->header;
528
529 if (!rbd_dev->cur_snap)
530 return 0;
531
532 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
533}
534
535static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
536 u64 *seq, u64 *size)
537{
538 int i;
539 char *p = header->snap_names;
540
541 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
542 if (strcmp(snap_name, p) == 0)
543 break;
544 }
545 if (i == header->total_snaps)
546 return -ENOENT;
547 if (seq)
548 *seq = header->snapc->snaps[i];
549
550 if (size)
551 *size = header->snap_sizes[i];
552
553 return i;
554}
555
cc9d734c 556static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
602adf40
YS
557{
558 struct rbd_image_header *header = &dev->header;
559 struct ceph_snap_context *snapc = header->snapc;
560 int ret = -ENOENT;
561
cc9d734c
JD
562 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
563
602adf40
YS
564 down_write(&header->snap_rwsem);
565
cc9d734c
JD
566 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
567 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
568 if (header->total_snaps)
569 snapc->seq = header->snap_seq;
570 else
571 snapc->seq = 0;
572 dev->cur_snap = 0;
573 dev->read_only = 0;
574 if (size)
575 *size = header->image_size;
576 } else {
cc9d734c 577 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
602adf40
YS
578 if (ret < 0)
579 goto done;
580
581 dev->cur_snap = header->total_snaps - ret;
582 dev->read_only = 1;
583 }
584
585 ret = 0;
586done:
587 up_write(&header->snap_rwsem);
588 return ret;
589}
590
591static void rbd_header_free(struct rbd_image_header *header)
592{
593 kfree(header->snapc);
594 kfree(header->snap_names);
595 kfree(header->snap_sizes);
596}
597
598/*
599 * get the actual striped segment name, offset and length
600 */
601static u64 rbd_get_segment(struct rbd_image_header *header,
602 const char *block_name,
603 u64 ofs, u64 len,
604 char *seg_name, u64 *segofs)
605{
606 u64 seg = ofs >> header->obj_order;
607
608 if (seg_name)
609 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
610 "%s.%012llx", block_name, seg);
611
612 ofs = ofs & ((1 << header->obj_order) - 1);
613 len = min_t(u64, len, (1 << header->obj_order) - ofs);
614
615 if (segofs)
616 *segofs = ofs;
617
618 return len;
619}
620
1fec7093
YS
621static int rbd_get_num_segments(struct rbd_image_header *header,
622 u64 ofs, u64 len)
623{
624 u64 start_seg = ofs >> header->obj_order;
625 u64 end_seg = (ofs + len - 1) >> header->obj_order;
626 return end_seg - start_seg + 1;
627}
628
029bcbd8
JD
629/*
630 * returns the size of an object in the image
631 */
632static u64 rbd_obj_bytes(struct rbd_image_header *header)
633{
634 return 1 << header->obj_order;
635}
636
602adf40
YS
637/*
638 * bio helpers
639 */
640
641static void bio_chain_put(struct bio *chain)
642{
643 struct bio *tmp;
644
645 while (chain) {
646 tmp = chain;
647 chain = chain->bi_next;
648 bio_put(tmp);
649 }
650}
651
652/*
653 * zeros a bio chain, starting at specific offset
654 */
655static void zero_bio_chain(struct bio *chain, int start_ofs)
656{
657 struct bio_vec *bv;
658 unsigned long flags;
659 void *buf;
660 int i;
661 int pos = 0;
662
663 while (chain) {
664 bio_for_each_segment(bv, chain, i) {
665 if (pos + bv->bv_len > start_ofs) {
666 int remainder = max(start_ofs - pos, 0);
667 buf = bvec_kmap_irq(bv, &flags);
668 memset(buf + remainder, 0,
669 bv->bv_len - remainder);
85b5aaa6 670 bvec_kunmap_irq(buf, &flags);
602adf40
YS
671 }
672 pos += bv->bv_len;
673 }
674
675 chain = chain->bi_next;
676 }
677}
678
679/*
680 * bio_chain_clone - clone a chain of bios up to a certain length.
681 * might return a bio_pair that will need to be released.
682 */
683static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
684 struct bio_pair **bp,
685 int len, gfp_t gfpmask)
686{
687 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
688 int total = 0;
689
690 if (*bp) {
691 bio_pair_release(*bp);
692 *bp = NULL;
693 }
694
695 while (old_chain && (total < len)) {
696 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
697 if (!tmp)
698 goto err_out;
699
700 if (total + old_chain->bi_size > len) {
701 struct bio_pair *bp;
702
703 /*
704 * this split can only happen with a single paged bio,
705 * split_bio will BUG_ON if this is not the case
706 */
707 dout("bio_chain_clone split! total=%d remaining=%d"
708 "bi_size=%d\n",
709 (int)total, (int)len-total,
710 (int)old_chain->bi_size);
711
712 /* split the bio. We'll release it either in the next
713 call, or it will have to be released outside */
714 bp = bio_split(old_chain, (len - total) / 512ULL);
715 if (!bp)
716 goto err_out;
717
718 __bio_clone(tmp, &bp->bio1);
719
720 *next = &bp->bio2;
721 } else {
722 __bio_clone(tmp, old_chain);
723 *next = old_chain->bi_next;
724 }
725
726 tmp->bi_bdev = NULL;
727 gfpmask &= ~__GFP_WAIT;
728 tmp->bi_next = NULL;
729
730 if (!new_chain) {
731 new_chain = tail = tmp;
732 } else {
733 tail->bi_next = tmp;
734 tail = tmp;
735 }
736 old_chain = old_chain->bi_next;
737
738 total += tmp->bi_size;
739 }
740
741 BUG_ON(total < len);
742
743 if (tail)
744 tail->bi_next = NULL;
745
746 *old = old_chain;
747
748 return new_chain;
749
750err_out:
751 dout("bio_chain_clone with err\n");
752 bio_chain_put(new_chain);
753 return NULL;
754}
755
756/*
757 * helpers for osd request op vectors.
758 */
759static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
760 int num_ops,
761 int opcode,
762 u32 payload_len)
763{
764 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
765 GFP_NOIO);
766 if (!*ops)
767 return -ENOMEM;
768 (*ops)[0].op = opcode;
769 /*
770 * op extent offset and length will be set later on
771 * in calc_raw_layout()
772 */
773 (*ops)[0].payload_len = payload_len;
774 return 0;
775}
776
777static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
778{
779 kfree(ops);
780}
781
1fec7093
YS
782static void rbd_coll_end_req_index(struct request *rq,
783 struct rbd_req_coll *coll,
784 int index,
785 int ret, u64 len)
786{
787 struct request_queue *q;
788 int min, max, i;
789
790 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
791 coll, index, ret, len);
792
793 if (!rq)
794 return;
795
796 if (!coll) {
797 blk_end_request(rq, ret, len);
798 return;
799 }
800
801 q = rq->q;
802
803 spin_lock_irq(q->queue_lock);
804 coll->status[index].done = 1;
805 coll->status[index].rc = ret;
806 coll->status[index].bytes = len;
807 max = min = coll->num_done;
808 while (max < coll->total && coll->status[max].done)
809 max++;
810
811 for (i = min; i<max; i++) {
812 __blk_end_request(rq, coll->status[i].rc,
813 coll->status[i].bytes);
814 coll->num_done++;
815 kref_put(&coll->kref, rbd_coll_release);
816 }
817 spin_unlock_irq(q->queue_lock);
818}
819
820static void rbd_coll_end_req(struct rbd_request *req,
821 int ret, u64 len)
822{
823 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
824}
825
602adf40
YS
826/*
827 * Send ceph osd request
828 */
829static int rbd_do_request(struct request *rq,
830 struct rbd_device *dev,
831 struct ceph_snap_context *snapc,
832 u64 snapid,
833 const char *obj, u64 ofs, u64 len,
834 struct bio *bio,
835 struct page **pages,
836 int num_pages,
837 int flags,
838 struct ceph_osd_req_op *ops,
839 int num_reply,
1fec7093
YS
840 struct rbd_req_coll *coll,
841 int coll_index,
602adf40 842 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
843 struct ceph_msg *msg),
844 struct ceph_osd_request **linger_req,
845 u64 *ver)
602adf40
YS
846{
847 struct ceph_osd_request *req;
848 struct ceph_file_layout *layout;
849 int ret;
850 u64 bno;
851 struct timespec mtime = CURRENT_TIME;
852 struct rbd_request *req_data;
853 struct ceph_osd_request_head *reqhead;
854 struct rbd_image_header *header = &dev->header;
1dbb4399 855 struct ceph_osd_client *osdc;
602adf40 856
602adf40 857 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
858 if (!req_data) {
859 if (coll)
860 rbd_coll_end_req_index(rq, coll, coll_index,
861 -ENOMEM, len);
862 return -ENOMEM;
863 }
864
865 if (coll) {
866 req_data->coll = coll;
867 req_data->coll_index = coll_index;
868 }
602adf40 869
1fec7093 870 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40
YS
871
872 down_read(&header->snap_rwsem);
873
1dbb4399
AE
874 osdc = &dev->rbd_client->client->osdc;
875 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
876 false, GFP_NOIO, pages, bio);
4ad12621 877 if (!req) {
602adf40 878 up_read(&header->snap_rwsem);
4ad12621 879 ret = -ENOMEM;
602adf40
YS
880 goto done_pages;
881 }
882
883 req->r_callback = rbd_cb;
884
885 req_data->rq = rq;
886 req_data->bio = bio;
887 req_data->pages = pages;
888 req_data->len = len;
889
890 req->r_priv = req_data;
891
892 reqhead = req->r_request->front.iov_base;
893 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
894
895 strncpy(req->r_oid, obj, sizeof(req->r_oid));
896 req->r_oid_len = strlen(req->r_oid);
897
898 layout = &req->r_file_layout;
899 memset(layout, 0, sizeof(*layout));
900 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
901 layout->fl_stripe_count = cpu_to_le32(1);
902 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
903 layout->fl_pg_preferred = cpu_to_le32(-1);
904 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
1dbb4399
AE
905 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
906 req, ops);
602adf40
YS
907
908 ceph_osdc_build_request(req, ofs, &len,
909 ops,
910 snapc,
911 &mtime,
912 req->r_oid, req->r_oid_len);
913 up_read(&header->snap_rwsem);
914
59c2be1e 915 if (linger_req) {
1dbb4399 916 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
917 *linger_req = req;
918 }
919
1dbb4399 920 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
921 if (ret < 0)
922 goto done_err;
923
924 if (!rbd_cb) {
1dbb4399 925 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
926 if (ver)
927 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
928 dout("reassert_ver=%lld\n",
929 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
930 ceph_osdc_put_request(req);
931 }
932 return ret;
933
934done_err:
935 bio_chain_put(req_data->bio);
936 ceph_osdc_put_request(req);
937done_pages:
1fec7093 938 rbd_coll_end_req(req_data, ret, len);
602adf40 939 kfree(req_data);
602adf40
YS
940 return ret;
941}
942
943/*
944 * Ceph osd op callback
945 */
946static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
947{
948 struct rbd_request *req_data = req->r_priv;
949 struct ceph_osd_reply_head *replyhead;
950 struct ceph_osd_op *op;
951 __s32 rc;
952 u64 bytes;
953 int read_op;
954
955 /* parse reply */
956 replyhead = msg->front.iov_base;
957 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
958 op = (void *)(replyhead + 1);
959 rc = le32_to_cpu(replyhead->result);
960 bytes = le64_to_cpu(op->extent.length);
961 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
962
963 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
964
965 if (rc == -ENOENT && read_op) {
966 zero_bio_chain(req_data->bio, 0);
967 rc = 0;
968 } else if (rc == 0 && read_op && bytes < req_data->len) {
969 zero_bio_chain(req_data->bio, bytes);
970 bytes = req_data->len;
971 }
972
1fec7093 973 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
974
975 if (req_data->bio)
976 bio_chain_put(req_data->bio);
977
978 ceph_osdc_put_request(req);
979 kfree(req_data);
980}
981
59c2be1e
YS
982static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983{
984 ceph_osdc_put_request(req);
985}
986
602adf40
YS
987/*
988 * Do a synchronous ceph osd operation
989 */
990static int rbd_req_sync_op(struct rbd_device *dev,
991 struct ceph_snap_context *snapc,
992 u64 snapid,
993 int opcode,
994 int flags,
995 struct ceph_osd_req_op *orig_ops,
996 int num_reply,
997 const char *obj,
998 u64 ofs, u64 len,
59c2be1e
YS
999 char *buf,
1000 struct ceph_osd_request **linger_req,
1001 u64 *ver)
602adf40
YS
1002{
1003 int ret;
1004 struct page **pages;
1005 int num_pages;
1006 struct ceph_osd_req_op *ops = orig_ops;
1007 u32 payload_len;
1008
1009 num_pages = calc_pages_for(ofs , len);
1010 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1011 if (IS_ERR(pages))
1012 return PTR_ERR(pages);
602adf40
YS
1013
1014 if (!orig_ops) {
1015 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1016 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1017 if (ret < 0)
1018 goto done;
1019
1020 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1021 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1022 if (ret < 0)
1023 goto done_ops;
1024 }
1025 }
1026
1027 ret = rbd_do_request(NULL, dev, snapc, snapid,
1028 obj, ofs, len, NULL,
1029 pages, num_pages,
1030 flags,
1031 ops,
1032 2,
1fec7093 1033 NULL, 0,
59c2be1e
YS
1034 NULL,
1035 linger_req, ver);
602adf40
YS
1036 if (ret < 0)
1037 goto done_ops;
1038
1039 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1040 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1041
1042done_ops:
1043 if (!orig_ops)
1044 rbd_destroy_ops(ops);
1045done:
1046 ceph_release_page_vector(pages, num_pages);
1047 return ret;
1048}
1049
1050/*
1051 * Do an asynchronous ceph osd operation
1052 */
1053static int rbd_do_op(struct request *rq,
1054 struct rbd_device *rbd_dev ,
1055 struct ceph_snap_context *snapc,
1056 u64 snapid,
1057 int opcode, int flags, int num_reply,
1058 u64 ofs, u64 len,
1fec7093
YS
1059 struct bio *bio,
1060 struct rbd_req_coll *coll,
1061 int coll_index)
602adf40
YS
1062{
1063 char *seg_name;
1064 u64 seg_ofs;
1065 u64 seg_len;
1066 int ret;
1067 struct ceph_osd_req_op *ops;
1068 u32 payload_len;
1069
1070 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1071 if (!seg_name)
1072 return -ENOMEM;
1073
1074 seg_len = rbd_get_segment(&rbd_dev->header,
1075 rbd_dev->header.block_name,
1076 ofs, len,
1077 seg_name, &seg_ofs);
602adf40
YS
1078
1079 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1080
1081 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1082 if (ret < 0)
1083 goto done;
1084
1085 /* we've taken care of segment sizes earlier when we
1086 cloned the bios. We should never have a segment
1087 truncated at this point */
1088 BUG_ON(seg_len < len);
1089
1090 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1091 seg_name, seg_ofs, seg_len,
1092 bio,
1093 NULL, 0,
1094 flags,
1095 ops,
1096 num_reply,
1fec7093 1097 coll, coll_index,
59c2be1e 1098 rbd_req_cb, 0, NULL);
11f77002
SW
1099
1100 rbd_destroy_ops(ops);
602adf40
YS
1101done:
1102 kfree(seg_name);
1103 return ret;
1104}
1105
1106/*
1107 * Request async osd write
1108 */
1109static int rbd_req_write(struct request *rq,
1110 struct rbd_device *rbd_dev,
1111 struct ceph_snap_context *snapc,
1112 u64 ofs, u64 len,
1fec7093
YS
1113 struct bio *bio,
1114 struct rbd_req_coll *coll,
1115 int coll_index)
602adf40
YS
1116{
1117 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1118 CEPH_OSD_OP_WRITE,
1119 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1120 2,
1fec7093 1121 ofs, len, bio, coll, coll_index);
602adf40
YS
1122}
1123
1124/*
1125 * Request async osd read
1126 */
1127static int rbd_req_read(struct request *rq,
1128 struct rbd_device *rbd_dev,
1129 u64 snapid,
1130 u64 ofs, u64 len,
1fec7093
YS
1131 struct bio *bio,
1132 struct rbd_req_coll *coll,
1133 int coll_index)
602adf40
YS
1134{
1135 return rbd_do_op(rq, rbd_dev, NULL,
1136 (snapid ? snapid : CEPH_NOSNAP),
1137 CEPH_OSD_OP_READ,
1138 CEPH_OSD_FLAG_READ,
1139 2,
1fec7093 1140 ofs, len, bio, coll, coll_index);
602adf40
YS
1141}
1142
1143/*
1144 * Request sync osd read
1145 */
1146static int rbd_req_sync_read(struct rbd_device *dev,
1147 struct ceph_snap_context *snapc,
1148 u64 snapid,
1149 const char *obj,
1150 u64 ofs, u64 len,
59c2be1e
YS
1151 char *buf,
1152 u64 *ver)
602adf40
YS
1153{
1154 return rbd_req_sync_op(dev, NULL,
1155 (snapid ? snapid : CEPH_NOSNAP),
1156 CEPH_OSD_OP_READ,
1157 CEPH_OSD_FLAG_READ,
1158 NULL,
59c2be1e 1159 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1160}
1161
1162/*
59c2be1e
YS
1163 * Request sync osd watch
1164 */
1165static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1166 u64 ver,
1167 u64 notify_id,
1168 const char *obj)
1169{
1170 struct ceph_osd_req_op *ops;
1171 struct page **pages = NULL;
11f77002
SW
1172 int ret;
1173
1174 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1175 if (ret < 0)
1176 return ret;
1177
1178 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1179 ops[0].watch.cookie = notify_id;
1180 ops[0].watch.flag = 0;
1181
1182 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1183 obj, 0, 0, NULL,
1184 pages, 0,
1185 CEPH_OSD_FLAG_READ,
1186 ops,
1187 1,
1fec7093 1188 NULL, 0,
59c2be1e
YS
1189 rbd_simple_req_cb, 0, NULL);
1190
1191 rbd_destroy_ops(ops);
1192 return ret;
1193}
1194
1195static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1196{
1197 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1198 int rc;
1199
59c2be1e
YS
1200 if (!dev)
1201 return;
1202
1203 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1204 notify_id, (int)opcode);
1205 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
13143d2d 1206 rc = __rbd_update_snaps(dev);
59c2be1e 1207 mutex_unlock(&ctl_mutex);
13143d2d
SW
1208 if (rc)
1209 pr_warning(DRV_NAME "%d got notification but failed to update"
1210 " snaps: %d\n", dev->major, rc);
59c2be1e
YS
1211
1212 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1213}
1214
1215/*
1216 * Request sync osd watch
1217 */
1218static int rbd_req_sync_watch(struct rbd_device *dev,
1219 const char *obj,
1220 u64 ver)
1221{
1222 struct ceph_osd_req_op *ops;
1dbb4399 1223 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1224
1225 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1226 if (ret < 0)
1227 return ret;
1228
1229 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1230 (void *)dev, &dev->watch_event);
1231 if (ret < 0)
1232 goto fail;
1233
1234 ops[0].watch.ver = cpu_to_le64(ver);
1235 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1236 ops[0].watch.flag = 1;
1237
1238 ret = rbd_req_sync_op(dev, NULL,
1239 CEPH_NOSNAP,
1240 0,
1241 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1242 ops,
1243 1, obj, 0, 0, NULL,
1244 &dev->watch_request, NULL);
1245
1246 if (ret < 0)
1247 goto fail_event;
1248
1249 rbd_destroy_ops(ops);
1250 return 0;
1251
1252fail_event:
1253 ceph_osdc_cancel_event(dev->watch_event);
1254 dev->watch_event = NULL;
1255fail:
1256 rbd_destroy_ops(ops);
1257 return ret;
1258}
1259
79e3057c
YS
1260/*
1261 * Request sync osd unwatch
1262 */
1263static int rbd_req_sync_unwatch(struct rbd_device *dev,
1264 const char *obj)
1265{
1266 struct ceph_osd_req_op *ops;
1267
1268 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1269 if (ret < 0)
1270 return ret;
1271
1272 ops[0].watch.ver = 0;
1273 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1274 ops[0].watch.flag = 0;
1275
1276 ret = rbd_req_sync_op(dev, NULL,
1277 CEPH_NOSNAP,
1278 0,
1279 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1280 ops,
1281 1, obj, 0, 0, NULL, NULL, NULL);
1282
1283 rbd_destroy_ops(ops);
1284 ceph_osdc_cancel_event(dev->watch_event);
1285 dev->watch_event = NULL;
1286 return ret;
1287}
1288
59c2be1e
YS
1289struct rbd_notify_info {
1290 struct rbd_device *dev;
1291};
1292
1293static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1294{
1295 struct rbd_device *dev = (struct rbd_device *)data;
1296 if (!dev)
1297 return;
1298
1299 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1300 notify_id, (int)opcode);
1301}
1302
1303/*
1304 * Request sync osd notify
1305 */
1306static int rbd_req_sync_notify(struct rbd_device *dev,
1307 const char *obj)
1308{
1309 struct ceph_osd_req_op *ops;
1dbb4399 1310 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1311 struct ceph_osd_event *event;
1312 struct rbd_notify_info info;
1313 int payload_len = sizeof(u32) + sizeof(u32);
1314 int ret;
1315
1316 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1317 if (ret < 0)
1318 return ret;
1319
1320 info.dev = dev;
1321
1322 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1323 (void *)&info, &event);
1324 if (ret < 0)
1325 goto fail;
1326
1327 ops[0].watch.ver = 1;
1328 ops[0].watch.flag = 1;
1329 ops[0].watch.cookie = event->cookie;
1330 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1331 ops[0].watch.timeout = 12;
1332
1333 ret = rbd_req_sync_op(dev, NULL,
1334 CEPH_NOSNAP,
1335 0,
1336 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1337 ops,
1338 1, obj, 0, 0, NULL, NULL, NULL);
1339 if (ret < 0)
1340 goto fail_event;
1341
1342 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1343 dout("ceph_osdc_wait_event returned %d\n", ret);
1344 rbd_destroy_ops(ops);
1345 return 0;
1346
1347fail_event:
1348 ceph_osdc_cancel_event(event);
1349fail:
1350 rbd_destroy_ops(ops);
1351 return ret;
1352}
1353
602adf40
YS
1354/*
1355 * Request sync osd read
1356 */
1357static int rbd_req_sync_exec(struct rbd_device *dev,
1358 const char *obj,
1359 const char *cls,
1360 const char *method,
1361 const char *data,
59c2be1e
YS
1362 int len,
1363 u64 *ver)
602adf40
YS
1364{
1365 struct ceph_osd_req_op *ops;
1366 int cls_len = strlen(cls);
1367 int method_len = strlen(method);
1368 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1369 cls_len + method_len + len);
1370 if (ret < 0)
1371 return ret;
1372
1373 ops[0].cls.class_name = cls;
1374 ops[0].cls.class_len = (__u8)cls_len;
1375 ops[0].cls.method_name = method;
1376 ops[0].cls.method_len = (__u8)method_len;
1377 ops[0].cls.argc = 0;
1378 ops[0].cls.indata = data;
1379 ops[0].cls.indata_len = len;
1380
1381 ret = rbd_req_sync_op(dev, NULL,
1382 CEPH_NOSNAP,
1383 0,
1384 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1385 ops,
59c2be1e 1386 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1387
1388 rbd_destroy_ops(ops);
1389
1390 dout("cls_exec returned %d\n", ret);
1391 return ret;
1392}
1393
1fec7093
YS
1394static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1395{
1396 struct rbd_req_coll *coll =
1397 kzalloc(sizeof(struct rbd_req_coll) +
1398 sizeof(struct rbd_req_status) * num_reqs,
1399 GFP_ATOMIC);
1400
1401 if (!coll)
1402 return NULL;
1403 coll->total = num_reqs;
1404 kref_init(&coll->kref);
1405 return coll;
1406}
1407
602adf40
YS
1408/*
1409 * block device queue callback
1410 */
1411static void rbd_rq_fn(struct request_queue *q)
1412{
1413 struct rbd_device *rbd_dev = q->queuedata;
1414 struct request *rq;
1415 struct bio_pair *bp = NULL;
1416
1417 rq = blk_fetch_request(q);
1418
1419 while (1) {
1420 struct bio *bio;
1421 struct bio *rq_bio, *next_bio = NULL;
1422 bool do_write;
1423 int size, op_size = 0;
1424 u64 ofs;
1fec7093
YS
1425 int num_segs, cur_seg = 0;
1426 struct rbd_req_coll *coll;
602adf40
YS
1427
1428 /* peek at request from block layer */
1429 if (!rq)
1430 break;
1431
1432 dout("fetched request\n");
1433
1434 /* filter out block requests we don't understand */
1435 if ((rq->cmd_type != REQ_TYPE_FS)) {
1436 __blk_end_request_all(rq, 0);
1437 goto next;
1438 }
1439
1440 /* deduce our operation (read, write) */
1441 do_write = (rq_data_dir(rq) == WRITE);
1442
1443 size = blk_rq_bytes(rq);
1444 ofs = blk_rq_pos(rq) * 512ULL;
1445 rq_bio = rq->bio;
1446 if (do_write && rbd_dev->read_only) {
1447 __blk_end_request_all(rq, -EROFS);
1448 goto next;
1449 }
1450
1451 spin_unlock_irq(q->queue_lock);
1452
1453 dout("%s 0x%x bytes at 0x%llx\n",
1454 do_write ? "write" : "read",
1455 size, blk_rq_pos(rq) * 512ULL);
1456
1fec7093
YS
1457 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1458 coll = rbd_alloc_coll(num_segs);
1459 if (!coll) {
1460 spin_lock_irq(q->queue_lock);
1461 __blk_end_request_all(rq, -ENOMEM);
1462 goto next;
1463 }
1464
602adf40
YS
1465 do {
1466 /* a bio clone to be passed down to OSD req */
1467 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1468 op_size = rbd_get_segment(&rbd_dev->header,
1469 rbd_dev->header.block_name,
1470 ofs, size,
1471 NULL, NULL);
1fec7093 1472 kref_get(&coll->kref);
602adf40
YS
1473 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1474 op_size, GFP_ATOMIC);
1475 if (!bio) {
1fec7093
YS
1476 rbd_coll_end_req_index(rq, coll, cur_seg,
1477 -ENOMEM, op_size);
1478 goto next_seg;
602adf40
YS
1479 }
1480
1fec7093 1481
602adf40
YS
1482 /* init OSD command: write or read */
1483 if (do_write)
1484 rbd_req_write(rq, rbd_dev,
1485 rbd_dev->header.snapc,
1486 ofs,
1fec7093
YS
1487 op_size, bio,
1488 coll, cur_seg);
602adf40
YS
1489 else
1490 rbd_req_read(rq, rbd_dev,
1491 cur_snap_id(rbd_dev),
1492 ofs,
1fec7093
YS
1493 op_size, bio,
1494 coll, cur_seg);
602adf40 1495
1fec7093 1496next_seg:
602adf40
YS
1497 size -= op_size;
1498 ofs += op_size;
1499
1fec7093 1500 cur_seg++;
602adf40
YS
1501 rq_bio = next_bio;
1502 } while (size > 0);
1fec7093 1503 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1504
1505 if (bp)
1506 bio_pair_release(bp);
602adf40
YS
1507 spin_lock_irq(q->queue_lock);
1508next:
1509 rq = blk_fetch_request(q);
1510 }
1511}
1512
1513/*
1514 * a queue callback. Makes sure that we don't create a bio that spans across
1515 * multiple osd objects. One exception would be with a single page bios,
1516 * which we handle later at bio_chain_clone
1517 */
1518static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1519 struct bio_vec *bvec)
1520{
1521 struct rbd_device *rbd_dev = q->queuedata;
1522 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1523 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1524 unsigned int bio_sectors = bmd->bi_size >> 9;
1525 int max;
1526
1527 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1528 + bio_sectors)) << 9;
1529 if (max < 0)
1530 max = 0; /* bio_add cannot handle a negative return */
1531 if (max <= bvec->bv_len && bio_sectors == 0)
1532 return bvec->bv_len;
1533 return max;
1534}
1535
1536static void rbd_free_disk(struct rbd_device *rbd_dev)
1537{
1538 struct gendisk *disk = rbd_dev->disk;
1539
1540 if (!disk)
1541 return;
1542
1543 rbd_header_free(&rbd_dev->header);
1544
1545 if (disk->flags & GENHD_FL_UP)
1546 del_gendisk(disk);
1547 if (disk->queue)
1548 blk_cleanup_queue(disk->queue);
1549 put_disk(disk);
1550}
1551
1552/*
1553 * reload the ondisk the header
1554 */
1555static int rbd_read_header(struct rbd_device *rbd_dev,
1556 struct rbd_image_header *header)
1557{
1558 ssize_t rc;
1559 struct rbd_image_header_ondisk *dh;
1560 int snap_count = 0;
1561 u64 snap_names_len = 0;
59c2be1e 1562 u64 ver;
602adf40
YS
1563
1564 while (1) {
1565 int len = sizeof(*dh) +
1566 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1567 snap_names_len;
1568
1569 rc = -ENOMEM;
1570 dh = kmalloc(len, GFP_KERNEL);
1571 if (!dh)
1572 return -ENOMEM;
1573
1574 rc = rbd_req_sync_read(rbd_dev,
1575 NULL, CEPH_NOSNAP,
1576 rbd_dev->obj_md_name,
1577 0, len,
59c2be1e 1578 (char *)dh, &ver);
602adf40
YS
1579 if (rc < 0)
1580 goto out_dh;
1581
1582 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb
JD
1583 if (rc < 0) {
1584 if (rc == -ENXIO) {
1585 pr_warning("unrecognized header format"
1586 " for image %s", rbd_dev->obj);
1587 }
602adf40 1588 goto out_dh;
81e759fb 1589 }
602adf40
YS
1590
1591 if (snap_count != header->total_snaps) {
1592 snap_count = header->total_snaps;
1593 snap_names_len = header->snap_names_len;
1594 rbd_header_free(header);
1595 kfree(dh);
1596 continue;
1597 }
1598 break;
1599 }
59c2be1e 1600 header->obj_version = ver;
602adf40
YS
1601
1602out_dh:
1603 kfree(dh);
1604 return rc;
1605}
1606
1607/*
1608 * create a snapshot
1609 */
1610static int rbd_header_add_snap(struct rbd_device *dev,
1611 const char *snap_name,
1612 gfp_t gfp_flags)
1613{
1614 int name_len = strlen(snap_name);
1615 u64 new_snapid;
1616 int ret;
916d4d67 1617 void *data, *p, *e;
59c2be1e 1618 u64 ver;
1dbb4399 1619 struct ceph_mon_client *monc;
602adf40
YS
1620
1621 /* we should create a snapshot only if we're pointing at the head */
1622 if (dev->cur_snap)
1623 return -EINVAL;
1624
1dbb4399
AE
1625 monc = &dev->rbd_client->client->monc;
1626 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
602adf40
YS
1627 dout("created snapid=%lld\n", new_snapid);
1628 if (ret < 0)
1629 return ret;
1630
1631 data = kmalloc(name_len + 16, gfp_flags);
1632 if (!data)
1633 return -ENOMEM;
1634
916d4d67
SW
1635 p = data;
1636 e = data + name_len + 16;
602adf40 1637
916d4d67
SW
1638 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1639 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1640
1641 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1642 data, p - data, &ver);
602adf40 1643
916d4d67 1644 kfree(data);
602adf40
YS
1645
1646 if (ret < 0)
1647 return ret;
1648
1649 dev->header.snapc->seq = new_snapid;
1650
1651 return 0;
1652bad:
1653 return -ERANGE;
1654}
1655
dfc5606d
YS
1656static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1657{
1658 struct rbd_snap *snap;
1659
1660 while (!list_empty(&rbd_dev->snaps)) {
1661 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1662 __rbd_remove_snap_dev(rbd_dev, snap);
1663 }
1664}
1665
602adf40
YS
1666/*
1667 * only read the first part of the ondisk header, without the snaps info
1668 */
dfc5606d 1669static int __rbd_update_snaps(struct rbd_device *rbd_dev)
602adf40
YS
1670{
1671 int ret;
1672 struct rbd_image_header h;
1673 u64 snap_seq;
59c2be1e 1674 int follow_seq = 0;
602adf40
YS
1675
1676 ret = rbd_read_header(rbd_dev, &h);
1677 if (ret < 0)
1678 return ret;
1679
9db4b3e3
SW
1680 /* resized? */
1681 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1682
602adf40
YS
1683 down_write(&rbd_dev->header.snap_rwsem);
1684
1685 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1686 if (rbd_dev->header.total_snaps &&
1687 rbd_dev->header.snapc->snaps[0] == snap_seq)
1688 /* pointing at the head, will need to follow that
1689 if head moves */
1690 follow_seq = 1;
602adf40
YS
1691
1692 kfree(rbd_dev->header.snapc);
1693 kfree(rbd_dev->header.snap_names);
1694 kfree(rbd_dev->header.snap_sizes);
1695
1696 rbd_dev->header.total_snaps = h.total_snaps;
1697 rbd_dev->header.snapc = h.snapc;
1698 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1699 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1700 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1701 if (follow_seq)
1702 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1703 else
1704 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1705
dfc5606d
YS
1706 ret = __rbd_init_snaps_header(rbd_dev);
1707
602adf40
YS
1708 up_write(&rbd_dev->header.snap_rwsem);
1709
dfc5606d 1710 return ret;
602adf40
YS
1711}
1712
1713static int rbd_init_disk(struct rbd_device *rbd_dev)
1714{
1715 struct gendisk *disk;
1716 struct request_queue *q;
1717 int rc;
1718 u64 total_size = 0;
1719
1720 /* contact OSD, request size info about the object being mapped */
1721 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1722 if (rc)
1723 return rc;
1724
dfc5606d
YS
1725 /* no need to lock here, as rbd_dev is not registered yet */
1726 rc = __rbd_init_snaps_header(rbd_dev);
1727 if (rc)
1728 return rc;
1729
cc9d734c 1730 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1731 if (rc)
1732 return rc;
1733
1734 /* create gendisk info */
1735 rc = -ENOMEM;
1736 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1737 if (!disk)
1738 goto out;
1739
aedfec59
SW
1740 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1741 rbd_dev->id);
602adf40
YS
1742 disk->major = rbd_dev->major;
1743 disk->first_minor = 0;
1744 disk->fops = &rbd_bd_ops;
1745 disk->private_data = rbd_dev;
1746
1747 /* init rq */
1748 rc = -ENOMEM;
1749 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1750 if (!q)
1751 goto out_disk;
029bcbd8
JD
1752
1753 /* set io sizes to object size */
1754 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1755 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1756 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1757 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1758
602adf40
YS
1759 blk_queue_merge_bvec(q, rbd_merge_bvec);
1760 disk->queue = q;
1761
1762 q->queuedata = rbd_dev;
1763
1764 rbd_dev->disk = disk;
1765 rbd_dev->q = q;
1766
1767 /* finally, announce the disk to the world */
1768 set_capacity(disk, total_size / 512ULL);
1769 add_disk(disk);
1770
1771 pr_info("%s: added with size 0x%llx\n",
1772 disk->disk_name, (unsigned long long)total_size);
1773 return 0;
1774
1775out_disk:
1776 put_disk(disk);
1777out:
1778 return rc;
1779}
1780
dfc5606d
YS
1781/*
1782 sysfs
1783*/
1784
1785static ssize_t rbd_size_show(struct device *dev,
1786 struct device_attribute *attr, char *buf)
1787{
1788 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1789
1790 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1791}
1792
1793static ssize_t rbd_major_show(struct device *dev,
1794 struct device_attribute *attr, char *buf)
1795{
1796 struct rbd_device *rbd_dev = dev_to_rbd(dev);
602adf40 1797
dfc5606d
YS
1798 return sprintf(buf, "%d\n", rbd_dev->major);
1799}
1800
1801static ssize_t rbd_client_id_show(struct device *dev,
1802 struct device_attribute *attr, char *buf)
602adf40 1803{
dfc5606d
YS
1804 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1805
1dbb4399
AE
1806 return sprintf(buf, "client%lld\n",
1807 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1808}
1809
dfc5606d
YS
1810static ssize_t rbd_pool_show(struct device *dev,
1811 struct device_attribute *attr, char *buf)
602adf40 1812{
dfc5606d
YS
1813 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1814
1815 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1816}
1817
1818static ssize_t rbd_name_show(struct device *dev,
1819 struct device_attribute *attr, char *buf)
1820{
1821 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1822
1823 return sprintf(buf, "%s\n", rbd_dev->obj);
1824}
1825
1826static ssize_t rbd_snap_show(struct device *dev,
1827 struct device_attribute *attr,
1828 char *buf)
1829{
1830 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1831
1832 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1833}
1834
1835static ssize_t rbd_image_refresh(struct device *dev,
1836 struct device_attribute *attr,
1837 const char *buf,
1838 size_t size)
1839{
1840 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1841 int rc;
1842 int ret = size;
602adf40
YS
1843
1844 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1845
dfc5606d
YS
1846 rc = __rbd_update_snaps(rbd_dev);
1847 if (rc < 0)
1848 ret = rc;
602adf40 1849
dfc5606d
YS
1850 mutex_unlock(&ctl_mutex);
1851 return ret;
1852}
602adf40 1853
dfc5606d
YS
1854static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1855static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1856static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1857static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1858static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1859static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1860static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1861static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1862
1863static struct attribute *rbd_attrs[] = {
1864 &dev_attr_size.attr,
1865 &dev_attr_major.attr,
1866 &dev_attr_client_id.attr,
1867 &dev_attr_pool.attr,
1868 &dev_attr_name.attr,
1869 &dev_attr_current_snap.attr,
1870 &dev_attr_refresh.attr,
1871 &dev_attr_create_snap.attr,
dfc5606d
YS
1872 NULL
1873};
1874
1875static struct attribute_group rbd_attr_group = {
1876 .attrs = rbd_attrs,
1877};
1878
1879static const struct attribute_group *rbd_attr_groups[] = {
1880 &rbd_attr_group,
1881 NULL
1882};
1883
1884static void rbd_sysfs_dev_release(struct device *dev)
1885{
1886}
1887
1888static struct device_type rbd_device_type = {
1889 .name = "rbd",
1890 .groups = rbd_attr_groups,
1891 .release = rbd_sysfs_dev_release,
1892};
1893
1894
1895/*
1896 sysfs - snapshots
1897*/
1898
1899static ssize_t rbd_snap_size_show(struct device *dev,
1900 struct device_attribute *attr,
1901 char *buf)
1902{
1903 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1904
1905 return sprintf(buf, "%lld\n", (long long)snap->size);
1906}
1907
1908static ssize_t rbd_snap_id_show(struct device *dev,
1909 struct device_attribute *attr,
1910 char *buf)
1911{
1912 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1913
1914 return sprintf(buf, "%lld\n", (long long)snap->id);
1915}
1916
1917static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1918static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1919
1920static struct attribute *rbd_snap_attrs[] = {
1921 &dev_attr_snap_size.attr,
1922 &dev_attr_snap_id.attr,
1923 NULL,
1924};
1925
1926static struct attribute_group rbd_snap_attr_group = {
1927 .attrs = rbd_snap_attrs,
1928};
1929
1930static void rbd_snap_dev_release(struct device *dev)
1931{
1932 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1933 kfree(snap->name);
1934 kfree(snap);
1935}
1936
1937static const struct attribute_group *rbd_snap_attr_groups[] = {
1938 &rbd_snap_attr_group,
1939 NULL
1940};
1941
1942static struct device_type rbd_snap_device_type = {
1943 .groups = rbd_snap_attr_groups,
1944 .release = rbd_snap_dev_release,
1945};
1946
1947static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1948 struct rbd_snap *snap)
1949{
1950 list_del(&snap->node);
1951 device_unregister(&snap->dev);
1952}
1953
1954static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1955 struct rbd_snap *snap,
1956 struct device *parent)
1957{
1958 struct device *dev = &snap->dev;
1959 int ret;
1960
1961 dev->type = &rbd_snap_device_type;
1962 dev->parent = parent;
1963 dev->release = rbd_snap_dev_release;
1964 dev_set_name(dev, "snap_%s", snap->name);
1965 ret = device_register(dev);
1966
1967 return ret;
1968}
1969
1970static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1971 int i, const char *name,
1972 struct rbd_snap **snapp)
1973{
1974 int ret;
1975 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1976 if (!snap)
1977 return -ENOMEM;
1978 snap->name = kstrdup(name, GFP_KERNEL);
1979 snap->size = rbd_dev->header.snap_sizes[i];
1980 snap->id = rbd_dev->header.snapc->snaps[i];
1981 if (device_is_registered(&rbd_dev->dev)) {
1982 ret = rbd_register_snap_dev(rbd_dev, snap,
1983 &rbd_dev->dev);
1984 if (ret < 0)
1985 goto err;
1986 }
1987 *snapp = snap;
1988 return 0;
1989err:
1990 kfree(snap->name);
1991 kfree(snap);
1992 return ret;
1993}
1994
1995/*
1996 * search for the previous snap in a null delimited string list
1997 */
1998const char *rbd_prev_snap_name(const char *name, const char *start)
1999{
2000 if (name < start + 2)
2001 return NULL;
2002
2003 name -= 2;
2004 while (*name) {
2005 if (name == start)
2006 return start;
2007 name--;
2008 }
2009 return name + 1;
2010}
2011
2012/*
2013 * compare the old list of snapshots that we have to what's in the header
2014 * and update it accordingly. Note that the header holds the snapshots
2015 * in a reverse order (from newest to oldest) and we need to go from
2016 * older to new so that we don't get a duplicate snap name when
2017 * doing the process (e.g., removed snapshot and recreated a new
2018 * one with the same name.
2019 */
2020static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2021{
2022 const char *name, *first_name;
2023 int i = rbd_dev->header.total_snaps;
2024 struct rbd_snap *snap, *old_snap = NULL;
2025 int ret;
2026 struct list_head *p, *n;
2027
2028 first_name = rbd_dev->header.snap_names;
2029 name = first_name + rbd_dev->header.snap_names_len;
2030
2031 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2032 u64 cur_id;
2033
2034 old_snap = list_entry(p, struct rbd_snap, node);
2035
2036 if (i)
2037 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2038
2039 if (!i || old_snap->id < cur_id) {
2040 /* old_snap->id was skipped, thus was removed */
2041 __rbd_remove_snap_dev(rbd_dev, old_snap);
2042 continue;
2043 }
2044 if (old_snap->id == cur_id) {
2045 /* we have this snapshot already */
2046 i--;
2047 name = rbd_prev_snap_name(name, first_name);
2048 continue;
2049 }
2050 for (; i > 0;
2051 i--, name = rbd_prev_snap_name(name, first_name)) {
2052 if (!name) {
2053 WARN_ON(1);
2054 return -EINVAL;
2055 }
2056 cur_id = rbd_dev->header.snapc->snaps[i];
2057 /* snapshot removal? handle it above */
2058 if (cur_id >= old_snap->id)
2059 break;
2060 /* a new snapshot */
2061 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2062 if (ret < 0)
2063 return ret;
2064
2065 /* note that we add it backward so using n and not p */
2066 list_add(&snap->node, n);
2067 p = &snap->node;
2068 }
2069 }
2070 /* we're done going over the old snap list, just add what's left */
2071 for (; i > 0; i--) {
2072 name = rbd_prev_snap_name(name, first_name);
2073 if (!name) {
2074 WARN_ON(1);
2075 return -EINVAL;
2076 }
2077 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2078 if (ret < 0)
2079 return ret;
2080 list_add(&snap->node, &rbd_dev->snaps);
2081 }
2082
2083 return 0;
2084}
2085
2086
2087static void rbd_root_dev_release(struct device *dev)
2088{
2089}
2090
2091static struct device rbd_root_dev = {
2092 .init_name = "rbd",
2093 .release = rbd_root_dev_release,
2094};
2095
2096static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2097{
2098 int ret = -ENOMEM;
2099 struct device *dev;
2100 struct rbd_snap *snap;
2101
2102 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2103 dev = &rbd_dev->dev;
2104
2105 dev->bus = &rbd_bus_type;
2106 dev->type = &rbd_device_type;
2107 dev->parent = &rbd_root_dev;
2108 dev->release = rbd_dev_release;
2109 dev_set_name(dev, "%d", rbd_dev->id);
2110 ret = device_register(dev);
2111 if (ret < 0)
2112 goto done_free;
2113
2114 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2115 ret = rbd_register_snap_dev(rbd_dev, snap,
2116 &rbd_dev->dev);
2117 if (ret < 0)
602adf40
YS
2118 break;
2119 }
2120
2121 mutex_unlock(&ctl_mutex);
dfc5606d
YS
2122 return 0;
2123done_free:
2124 mutex_unlock(&ctl_mutex);
2125 return ret;
602adf40
YS
2126}
2127
dfc5606d
YS
2128static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2129{
2130 device_unregister(&rbd_dev->dev);
2131}
2132
59c2be1e
YS
2133static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2134{
2135 int ret, rc;
2136
2137 do {
2138 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2139 rbd_dev->header.obj_version);
2140 if (ret == -ERANGE) {
2141 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2142 rc = __rbd_update_snaps(rbd_dev);
2143 mutex_unlock(&ctl_mutex);
2144 if (rc < 0)
2145 return rc;
2146 }
2147 } while (ret == -ERANGE);
2148
2149 return ret;
2150}
2151
2152static ssize_t rbd_add(struct bus_type *bus,
2153 const char *buf,
2154 size_t count)
602adf40
YS
2155{
2156 struct ceph_osd_client *osdc;
2157 struct rbd_device *rbd_dev;
2158 ssize_t rc = -ENOMEM;
2159 int irc, new_id = 0;
2160 struct list_head *tmp;
2161 char *mon_dev_name;
2162 char *options;
2163
2164 if (!try_module_get(THIS_MODULE))
2165 return -ENODEV;
2166
2167 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2168 if (!mon_dev_name)
2169 goto err_out_mod;
2170
2171 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2172 if (!options)
2173 goto err_mon_dev;
2174
2175 /* new rbd_device object */
2176 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2177 if (!rbd_dev)
2178 goto err_out_opt;
2179
2180 /* static rbd_device initialization */
2181 spin_lock_init(&rbd_dev->lock);
2182 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2183 INIT_LIST_HEAD(&rbd_dev->snaps);
602adf40 2184
0e805a1d
AE
2185 init_rwsem(&rbd_dev->header.snap_rwsem);
2186
602adf40
YS
2187 /* generate unique id: find highest unique id, add one */
2188 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2189
2190 list_for_each(tmp, &rbd_dev_list) {
2191 struct rbd_device *rbd_dev;
2192
2193 rbd_dev = list_entry(tmp, struct rbd_device, node);
2194 if (rbd_dev->id >= new_id)
2195 new_id = rbd_dev->id + 1;
2196 }
2197
2198 rbd_dev->id = new_id;
2199
2200 /* add to global list */
2201 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2202
2203 /* parse add command */
2204 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2205 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2206 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2207 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2208 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2209 mon_dev_name, options, rbd_dev->pool_name,
2210 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2211 rc = -EINVAL;
2212 goto err_out_slot;
2213 }
2214
2215 if (rbd_dev->snap_name[0] == 0)
cc9d734c
JD
2216 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2217 sizeof (RBD_SNAP_HEAD_NAME));
602adf40
YS
2218
2219 rbd_dev->obj_len = strlen(rbd_dev->obj);
2220 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2221 rbd_dev->obj, RBD_SUFFIX);
2222
2223 /* initialize rest of new object */
2224 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2225 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2226 if (rc < 0)
2227 goto err_out_slot;
2228
2229 mutex_unlock(&ctl_mutex);
2230
2231 /* pick the pool */
1dbb4399 2232 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2233 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2234 if (rc < 0)
2235 goto err_out_client;
2236 rbd_dev->poolid = rc;
2237
2238 /* register our block device */
2239 irc = register_blkdev(0, rbd_dev->name);
2240 if (irc < 0) {
2241 rc = irc;
2242 goto err_out_client;
2243 }
2244 rbd_dev->major = irc;
2245
dfc5606d
YS
2246 rc = rbd_bus_add_dev(rbd_dev);
2247 if (rc)
766fc439
YS
2248 goto err_out_blkdev;
2249
602adf40
YS
2250 /* set up and announce blkdev mapping */
2251 rc = rbd_init_disk(rbd_dev);
2252 if (rc)
766fc439 2253 goto err_out_bus;
602adf40 2254
59c2be1e
YS
2255 rc = rbd_init_watch_dev(rbd_dev);
2256 if (rc)
2257 goto err_out_bus;
2258
602adf40
YS
2259 return count;
2260
766fc439
YS
2261err_out_bus:
2262 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2263 list_del_init(&rbd_dev->node);
2264 mutex_unlock(&ctl_mutex);
2265
2266 /* this will also clean up rest of rbd_dev stuff */
2267
2268 rbd_bus_del_dev(rbd_dev);
2269 kfree(options);
2270 kfree(mon_dev_name);
2271 return rc;
2272
602adf40
YS
2273err_out_blkdev:
2274 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2275err_out_client:
2276 rbd_put_client(rbd_dev);
2277 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2278err_out_slot:
2279 list_del_init(&rbd_dev->node);
2280 mutex_unlock(&ctl_mutex);
2281
2282 kfree(rbd_dev);
2283err_out_opt:
2284 kfree(options);
2285err_mon_dev:
2286 kfree(mon_dev_name);
2287err_out_mod:
2288 dout("Error adding device %s\n", buf);
2289 module_put(THIS_MODULE);
2290 return rc;
2291}
2292
2293static struct rbd_device *__rbd_get_dev(unsigned long id)
2294{
2295 struct list_head *tmp;
2296 struct rbd_device *rbd_dev;
2297
2298 list_for_each(tmp, &rbd_dev_list) {
2299 rbd_dev = list_entry(tmp, struct rbd_device, node);
2300 if (rbd_dev->id == id)
2301 return rbd_dev;
2302 }
2303 return NULL;
2304}
2305
dfc5606d 2306static void rbd_dev_release(struct device *dev)
602adf40 2307{
dfc5606d
YS
2308 struct rbd_device *rbd_dev =
2309 container_of(dev, struct rbd_device, dev);
602adf40 2310
1dbb4399
AE
2311 if (rbd_dev->watch_request) {
2312 struct ceph_client *client = rbd_dev->rbd_client->client;
2313
2314 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2315 rbd_dev->watch_request);
1dbb4399 2316 }
59c2be1e 2317 if (rbd_dev->watch_event)
79e3057c 2318 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
59c2be1e 2319
602adf40
YS
2320 rbd_put_client(rbd_dev);
2321
2322 /* clean up and free blkdev */
2323 rbd_free_disk(rbd_dev);
2324 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2325 kfree(rbd_dev);
2326
2327 /* release module ref */
2328 module_put(THIS_MODULE);
602adf40
YS
2329}
2330
dfc5606d
YS
2331static ssize_t rbd_remove(struct bus_type *bus,
2332 const char *buf,
2333 size_t count)
602adf40
YS
2334{
2335 struct rbd_device *rbd_dev = NULL;
2336 int target_id, rc;
2337 unsigned long ul;
2338 int ret = count;
2339
2340 rc = strict_strtoul(buf, 10, &ul);
2341 if (rc)
2342 return rc;
2343
2344 /* convert to int; abort if we lost anything in the conversion */
2345 target_id = (int) ul;
2346 if (target_id != ul)
2347 return -EINVAL;
2348
2349 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2350
2351 rbd_dev = __rbd_get_dev(target_id);
2352 if (!rbd_dev) {
2353 ret = -ENOENT;
2354 goto done;
2355 }
2356
dfc5606d
YS
2357 list_del_init(&rbd_dev->node);
2358
2359 __rbd_remove_all_snaps(rbd_dev);
2360 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2361
2362done:
2363 mutex_unlock(&ctl_mutex);
2364 return ret;
2365}
2366
dfc5606d
YS
2367static ssize_t rbd_snap_add(struct device *dev,
2368 struct device_attribute *attr,
2369 const char *buf,
2370 size_t count)
602adf40 2371{
dfc5606d
YS
2372 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2373 int ret;
2374 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2375 if (!name)
2376 return -ENOMEM;
2377
dfc5606d 2378 snprintf(name, count, "%s", buf);
602adf40
YS
2379
2380 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2381
602adf40
YS
2382 ret = rbd_header_add_snap(rbd_dev,
2383 name, GFP_KERNEL);
2384 if (ret < 0)
59c2be1e 2385 goto err_unlock;
602adf40 2386
dfc5606d 2387 ret = __rbd_update_snaps(rbd_dev);
602adf40 2388 if (ret < 0)
59c2be1e
YS
2389 goto err_unlock;
2390
2391 /* shouldn't hold ctl_mutex when notifying.. notify might
2392 trigger a watch callback that would need to get that mutex */
2393 mutex_unlock(&ctl_mutex);
2394
2395 /* make a best effort, don't error if failed */
2396 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2397
2398 ret = count;
59c2be1e
YS
2399 kfree(name);
2400 return ret;
2401
2402err_unlock:
602adf40 2403 mutex_unlock(&ctl_mutex);
602adf40
YS
2404 kfree(name);
2405 return ret;
2406}
2407
dfc5606d
YS
2408static struct bus_attribute rbd_bus_attrs[] = {
2409 __ATTR(add, S_IWUSR, NULL, rbd_add),
2410 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
602adf40
YS
2411 __ATTR_NULL
2412};
2413
2414/*
2415 * create control files in sysfs
dfc5606d 2416 * /sys/bus/rbd/...
602adf40
YS
2417 */
2418static int rbd_sysfs_init(void)
2419{
dfc5606d 2420 int ret;
602adf40 2421
dfc5606d 2422 rbd_bus_type.bus_attrs = rbd_bus_attrs;
602adf40 2423
dfc5606d 2424 ret = bus_register(&rbd_bus_type);
21079786 2425 if (ret < 0)
dfc5606d 2426 return ret;
602adf40 2427
dfc5606d 2428 ret = device_register(&rbd_root_dev);
602adf40 2429
602adf40
YS
2430 return ret;
2431}
2432
2433static void rbd_sysfs_cleanup(void)
2434{
dfc5606d
YS
2435 device_unregister(&rbd_root_dev);
2436 bus_unregister(&rbd_bus_type);
602adf40
YS
2437}
2438
2439int __init rbd_init(void)
2440{
2441 int rc;
2442
2443 rc = rbd_sysfs_init();
2444 if (rc)
2445 return rc;
602adf40
YS
2446 pr_info("loaded " DRV_NAME_LONG "\n");
2447 return 0;
2448}
2449
2450void __exit rbd_exit(void)
2451{
2452 rbd_sysfs_cleanup();
2453}
2454
2455module_init(rbd_init);
2456module_exit(rbd_exit);
2457
2458MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2459MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2460MODULE_DESCRIPTION("rados block device");
2461
2462/* following authorship retained from original osdblk.c */
2463MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2464
2465MODULE_LICENSE("GPL");