2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
98 * an instance of the client. multiple devices may share an rbd client.
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
104 struct list_head node;
108 * a request completion status
110 struct rbd_req_status {
117 * a collection of requests
119 struct rbd_req_coll {
123 struct rbd_req_status status[0];
127 * a single io request
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
135 struct rbd_req_coll *coll;
142 struct list_head node;
150 int dev_id; /* blkdev unique id */
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
156 struct rbd_client *rbd_client;
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160 spinlock_t lock; /* queue lock */
162 struct rbd_image_header header;
164 size_t image_name_len;
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
182 struct list_head node;
184 /* list of snapshots */
185 struct list_head snaps;
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 static struct bus_type rbd_bus_type = {
220 .bus_attrs = rbd_bus_attrs,
223 static void rbd_root_dev_release(struct device *dev)
227 static struct device rbd_root_dev = {
229 .release = rbd_root_dev_release,
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 return get_device(&rbd_dev->dev);
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 put_device(&rbd_dev->dev);
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 struct rbd_device *rbd_dev = disk->private_data;
262 rbd_put_dev(rbd_dev);
267 static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
270 .release = rbd_release,
274 * Initialize an rbd client instance.
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278 struct rbd_options *rbd_opts)
280 struct rbd_client *rbdc;
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294 if (IS_ERR(rbdc->client))
296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298 ret = ceph_open_session(rbdc->client);
302 rbdc->rbd_opts = rbd_opts;
304 spin_lock(&rbd_client_list_lock);
305 list_add_tail(&rbdc->node, &rbd_client_list);
306 spin_unlock(&rbd_client_list_lock);
308 mutex_unlock(&ctl_mutex);
310 dout("rbd_client_create created %p\n", rbdc);
314 ceph_destroy_client(rbdc->client);
316 mutex_unlock(&ctl_mutex);
320 ceph_destroy_options(ceph_opts);
325 * Find a ceph client with specific addr and configuration. If
326 * found, bump its reference count.
328 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
330 struct rbd_client *client_node;
333 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
336 spin_lock(&rbd_client_list_lock);
337 list_for_each_entry(client_node, &rbd_client_list, node) {
338 if (!ceph_compare_options(ceph_opts, client_node->client)) {
339 kref_get(&client_node->kref);
344 spin_unlock(&rbd_client_list_lock);
346 return found ? client_node : NULL;
357 /* string args above */
360 static match_table_t rbd_opts_tokens = {
361 {Opt_notify_timeout, "notify_timeout=%d"},
363 /* string args above */
367 static int parse_rbd_opts_token(char *c, void *private)
369 struct rbd_options *rbd_opts = private;
370 substring_t argstr[MAX_OPT_ARGS];
371 int token, intval, ret;
373 token = match_token(c, rbd_opts_tokens, argstr);
377 if (token < Opt_last_int) {
378 ret = match_int(&argstr[0], &intval);
380 pr_err("bad mount option arg (not int) "
384 dout("got int token %d val %d\n", token, intval);
385 } else if (token > Opt_last_int && token < Opt_last_string) {
386 dout("got string token %d val %s\n", token,
389 dout("got token %d\n", token);
393 case Opt_notify_timeout:
394 rbd_opts->notify_timeout = intval;
403 * Get a ceph client with specific addr and configuration, if one does
404 * not exist create it.
406 static struct rbd_client *rbd_get_client(const char *mon_addr,
410 struct rbd_client *rbdc;
411 struct ceph_options *ceph_opts;
412 struct rbd_options *rbd_opts;
414 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
416 return ERR_PTR(-ENOMEM);
418 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
420 ceph_opts = ceph_parse_options(options, mon_addr,
421 mon_addr + mon_addr_len,
422 parse_rbd_opts_token, rbd_opts);
423 if (IS_ERR(ceph_opts)) {
425 return ERR_CAST(ceph_opts);
428 rbdc = rbd_client_find(ceph_opts);
430 /* using an existing client */
431 ceph_destroy_options(ceph_opts);
437 rbdc = rbd_client_create(ceph_opts, rbd_opts);
445 * Destroy ceph client
447 * Caller must hold rbd_client_list_lock.
449 static void rbd_client_release(struct kref *kref)
451 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
453 dout("rbd_release_client %p\n", rbdc);
454 spin_lock(&rbd_client_list_lock);
455 list_del(&rbdc->node);
456 spin_unlock(&rbd_client_list_lock);
458 ceph_destroy_client(rbdc->client);
459 kfree(rbdc->rbd_opts);
464 * Drop reference to ceph client node. If it's not referenced anymore, release
467 static void rbd_put_client(struct rbd_device *rbd_dev)
469 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470 rbd_dev->rbd_client = NULL;
474 * Destroy requests collection
476 static void rbd_coll_release(struct kref *kref)
478 struct rbd_req_coll *coll =
479 container_of(kref, struct rbd_req_coll, kref);
481 dout("rbd_coll_release %p\n", coll);
485 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
490 /* The header has to start with the magic rbd header text */
491 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
495 * The size of a snapshot header has to fit in a size_t, and
496 * that limits the number of snapshots.
498 snap_count = le32_to_cpu(ondisk->snap_count);
499 size = SIZE_MAX - sizeof (struct ceph_snap_context);
500 if (snap_count > size / sizeof (__le64))
504 * Not only that, but the size of the entire the snapshot
505 * header must also be representable in a size_t.
507 size -= snap_count * sizeof (__le64);
508 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
515 * Create a new header structure, translate header format from the on-disk
518 static int rbd_header_from_disk(struct rbd_image_header *header,
519 struct rbd_image_header_ondisk *ondisk)
525 memset(header, 0, sizeof (*header));
527 snap_count = le32_to_cpu(ondisk->snap_count);
529 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
530 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
531 if (!header->object_prefix)
533 memcpy(header->object_prefix, ondisk->object_prefix, len);
534 header->object_prefix[len] = '\0';
537 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
538 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
539 header->snap_names = kmalloc(header->snap_names_len,
541 if (!header->snap_names)
544 size = snap_count * sizeof (*header->snap_sizes);
545 header->snap_sizes = kmalloc(size, GFP_KERNEL);
546 if (!header->snap_sizes)
549 WARN_ON(ondisk->snap_names_len);
550 header->snap_names_len = 0;
551 header->snap_names = NULL;
552 header->snap_sizes = NULL;
555 header->image_size = le64_to_cpu(ondisk->image_size);
556 header->obj_order = ondisk->options.order;
557 header->crypt_type = ondisk->options.crypt_type;
558 header->comp_type = ondisk->options.comp_type;
559 header->total_snaps = snap_count;
561 size = sizeof (struct ceph_snap_context);
562 size += snap_count * sizeof (header->snapc->snaps[0]);
563 header->snapc = kzalloc(size, GFP_KERNEL);
567 atomic_set(&header->snapc->nref, 1);
568 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
569 header->snapc->num_snaps = snap_count;
571 /* Fill in the snapshot information */
576 for (i = 0; i < snap_count; i++) {
577 header->snapc->snaps[i] =
578 le64_to_cpu(ondisk->snaps[i].id);
579 header->snap_sizes[i] =
580 le64_to_cpu(ondisk->snaps[i].image_size);
583 /* copy snapshot names */
584 memcpy(header->snap_names, &ondisk->snaps[snap_count],
585 header->snap_names_len);
591 kfree(header->snap_sizes);
592 header->snap_sizes = NULL;
593 kfree(header->snap_names);
594 header->snap_names = NULL;
595 header->snap_names_len = 0;
596 kfree(header->object_prefix);
597 header->object_prefix = NULL;
602 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
606 char *p = header->snap_names;
608 for (i = 0; i < header->total_snaps; i++) {
609 if (!strcmp(snap_name, p)) {
611 /* Found it. Pass back its id and/or size */
614 *seq = header->snapc->snaps[i];
616 *size = header->snap_sizes[i];
619 p += strlen(p) + 1; /* Skip ahead to the next name */
624 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
628 down_write(&rbd_dev->header_rwsem);
630 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
631 sizeof (RBD_SNAP_HEAD_NAME))) {
632 rbd_dev->snap_id = CEPH_NOSNAP;
633 rbd_dev->snap_exists = false;
634 rbd_dev->read_only = 0;
636 *size = rbd_dev->header.image_size;
640 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
644 rbd_dev->snap_id = snap_id;
645 rbd_dev->snap_exists = true;
646 rbd_dev->read_only = 1;
651 up_write(&rbd_dev->header_rwsem);
655 static void rbd_header_free(struct rbd_image_header *header)
657 kfree(header->object_prefix);
658 header->object_prefix = NULL;
659 kfree(header->snap_sizes);
660 header->snap_sizes = NULL;
661 kfree(header->snap_names);
662 header->snap_names = NULL;
663 header->snap_names_len = 0;
664 ceph_put_snap_context(header->snapc);
665 header->snapc = NULL;
669 * get the actual striped segment name, offset and length
671 static u64 rbd_get_segment(struct rbd_image_header *header,
672 const char *object_prefix,
674 char *seg_name, u64 *segofs)
676 u64 seg = ofs >> header->obj_order;
679 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
680 "%s.%012llx", object_prefix, seg);
682 ofs = ofs & ((1 << header->obj_order) - 1);
683 len = min_t(u64, len, (1 << header->obj_order) - ofs);
691 static int rbd_get_num_segments(struct rbd_image_header *header,
694 u64 start_seg = ofs >> header->obj_order;
695 u64 end_seg = (ofs + len - 1) >> header->obj_order;
696 return end_seg - start_seg + 1;
700 * returns the size of an object in the image
702 static u64 rbd_obj_bytes(struct rbd_image_header *header)
704 return 1 << header->obj_order;
711 static void bio_chain_put(struct bio *chain)
717 chain = chain->bi_next;
723 * zeros a bio chain, starting at specific offset
725 static void zero_bio_chain(struct bio *chain, int start_ofs)
734 bio_for_each_segment(bv, chain, i) {
735 if (pos + bv->bv_len > start_ofs) {
736 int remainder = max(start_ofs - pos, 0);
737 buf = bvec_kmap_irq(bv, &flags);
738 memset(buf + remainder, 0,
739 bv->bv_len - remainder);
740 bvec_kunmap_irq(buf, &flags);
745 chain = chain->bi_next;
750 * bio_chain_clone - clone a chain of bios up to a certain length.
751 * might return a bio_pair that will need to be released.
753 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
754 struct bio_pair **bp,
755 int len, gfp_t gfpmask)
757 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
761 bio_pair_release(*bp);
765 while (old_chain && (total < len)) {
766 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
770 if (total + old_chain->bi_size > len) {
774 * this split can only happen with a single paged bio,
775 * split_bio will BUG_ON if this is not the case
777 dout("bio_chain_clone split! total=%d remaining=%d"
779 total, len - total, old_chain->bi_size);
781 /* split the bio. We'll release it either in the next
782 call, or it will have to be released outside */
783 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
787 __bio_clone(tmp, &bp->bio1);
791 __bio_clone(tmp, old_chain);
792 *next = old_chain->bi_next;
796 gfpmask &= ~__GFP_WAIT;
800 new_chain = tail = tmp;
805 old_chain = old_chain->bi_next;
807 total += tmp->bi_size;
813 tail->bi_next = NULL;
820 dout("bio_chain_clone with err\n");
821 bio_chain_put(new_chain);
826 * helpers for osd request op vectors.
828 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
829 int opcode, u32 payload_len)
831 struct ceph_osd_req_op *ops;
833 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
840 * op extent offset and length will be set later on
841 * in calc_raw_layout()
843 ops[0].payload_len = payload_len;
848 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
853 static void rbd_coll_end_req_index(struct request *rq,
854 struct rbd_req_coll *coll,
858 struct request_queue *q;
861 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
862 coll, index, ret, (unsigned long long) len);
868 blk_end_request(rq, ret, len);
874 spin_lock_irq(q->queue_lock);
875 coll->status[index].done = 1;
876 coll->status[index].rc = ret;
877 coll->status[index].bytes = len;
878 max = min = coll->num_done;
879 while (max < coll->total && coll->status[max].done)
882 for (i = min; i<max; i++) {
883 __blk_end_request(rq, coll->status[i].rc,
884 coll->status[i].bytes);
886 kref_put(&coll->kref, rbd_coll_release);
888 spin_unlock_irq(q->queue_lock);
891 static void rbd_coll_end_req(struct rbd_request *req,
894 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
898 * Send ceph osd request
900 static int rbd_do_request(struct request *rq,
901 struct rbd_device *rbd_dev,
902 struct ceph_snap_context *snapc,
904 const char *object_name, u64 ofs, u64 len,
909 struct ceph_osd_req_op *ops,
910 struct rbd_req_coll *coll,
912 void (*rbd_cb)(struct ceph_osd_request *req,
913 struct ceph_msg *msg),
914 struct ceph_osd_request **linger_req,
917 struct ceph_osd_request *req;
918 struct ceph_file_layout *layout;
921 struct timespec mtime = CURRENT_TIME;
922 struct rbd_request *req_data;
923 struct ceph_osd_request_head *reqhead;
924 struct ceph_osd_client *osdc;
926 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
929 rbd_coll_end_req_index(rq, coll, coll_index,
935 req_data->coll = coll;
936 req_data->coll_index = coll_index;
939 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
940 (unsigned long long) ofs, (unsigned long long) len);
942 osdc = &rbd_dev->rbd_client->client->osdc;
943 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
944 false, GFP_NOIO, pages, bio);
950 req->r_callback = rbd_cb;
954 req_data->pages = pages;
957 req->r_priv = req_data;
959 reqhead = req->r_request->front.iov_base;
960 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
962 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
963 req->r_oid_len = strlen(req->r_oid);
965 layout = &req->r_file_layout;
966 memset(layout, 0, sizeof(*layout));
967 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
968 layout->fl_stripe_count = cpu_to_le32(1);
969 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
970 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
971 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
974 ceph_osdc_build_request(req, ofs, &len,
978 req->r_oid, req->r_oid_len);
981 ceph_osdc_set_request_linger(osdc, req);
985 ret = ceph_osdc_start_request(osdc, req, false);
990 ret = ceph_osdc_wait_request(osdc, req);
992 *ver = le64_to_cpu(req->r_reassert_version.version);
993 dout("reassert_ver=%llu\n",
995 le64_to_cpu(req->r_reassert_version.version));
996 ceph_osdc_put_request(req);
1001 bio_chain_put(req_data->bio);
1002 ceph_osdc_put_request(req);
1004 rbd_coll_end_req(req_data, ret, len);
1010 * Ceph osd op callback
1012 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1014 struct rbd_request *req_data = req->r_priv;
1015 struct ceph_osd_reply_head *replyhead;
1016 struct ceph_osd_op *op;
1022 replyhead = msg->front.iov_base;
1023 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1024 op = (void *)(replyhead + 1);
1025 rc = le32_to_cpu(replyhead->result);
1026 bytes = le64_to_cpu(op->extent.length);
1027 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1029 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1030 (unsigned long long) bytes, read_op, (int) rc);
1032 if (rc == -ENOENT && read_op) {
1033 zero_bio_chain(req_data->bio, 0);
1035 } else if (rc == 0 && read_op && bytes < req_data->len) {
1036 zero_bio_chain(req_data->bio, bytes);
1037 bytes = req_data->len;
1040 rbd_coll_end_req(req_data, rc, bytes);
1043 bio_chain_put(req_data->bio);
1045 ceph_osdc_put_request(req);
1049 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1051 ceph_osdc_put_request(req);
1055 * Do a synchronous ceph osd operation
1057 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1058 struct ceph_snap_context *snapc,
1061 struct ceph_osd_req_op *ops,
1062 const char *object_name,
1065 struct ceph_osd_request **linger_req,
1069 struct page **pages;
1072 BUG_ON(ops == NULL);
1074 num_pages = calc_pages_for(ofs , len);
1075 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1077 return PTR_ERR(pages);
1079 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1080 object_name, ofs, len, NULL,
1090 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1091 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1094 ceph_release_page_vector(pages, num_pages);
1099 * Do an asynchronous ceph osd operation
1101 static int rbd_do_op(struct request *rq,
1102 struct rbd_device *rbd_dev,
1103 struct ceph_snap_context *snapc,
1105 int opcode, int flags,
1108 struct rbd_req_coll *coll,
1115 struct ceph_osd_req_op *ops;
1118 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1122 seg_len = rbd_get_segment(&rbd_dev->header,
1123 rbd_dev->header.object_prefix,
1125 seg_name, &seg_ofs);
1127 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1130 ops = rbd_create_rw_ops(1, opcode, payload_len);
1134 /* we've taken care of segment sizes earlier when we
1135 cloned the bios. We should never have a segment
1136 truncated at this point */
1137 BUG_ON(seg_len < len);
1139 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1140 seg_name, seg_ofs, seg_len,
1146 rbd_req_cb, 0, NULL);
1148 rbd_destroy_ops(ops);
1155 * Request async osd write
1157 static int rbd_req_write(struct request *rq,
1158 struct rbd_device *rbd_dev,
1159 struct ceph_snap_context *snapc,
1162 struct rbd_req_coll *coll,
1165 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1167 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1168 ofs, len, bio, coll, coll_index);
1172 * Request async osd read
1174 static int rbd_req_read(struct request *rq,
1175 struct rbd_device *rbd_dev,
1179 struct rbd_req_coll *coll,
1182 return rbd_do_op(rq, rbd_dev, NULL,
1186 ofs, len, bio, coll, coll_index);
1190 * Request sync osd read
1192 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1194 const char *object_name,
1199 struct ceph_osd_req_op *ops;
1202 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1206 ret = rbd_req_sync_op(rbd_dev, NULL,
1209 ops, object_name, ofs, len, buf, NULL, ver);
1210 rbd_destroy_ops(ops);
1216 * Request sync osd watch
1218 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1222 struct ceph_osd_req_op *ops;
1225 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1229 ops[0].watch.ver = cpu_to_le64(ver);
1230 ops[0].watch.cookie = notify_id;
1231 ops[0].watch.flag = 0;
1233 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1234 rbd_dev->header_name, 0, 0, NULL,
1239 rbd_simple_req_cb, 0, NULL);
1241 rbd_destroy_ops(ops);
1245 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1247 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1254 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1255 rbd_dev->header_name, (unsigned long long) notify_id,
1256 (unsigned int) opcode);
1257 rc = rbd_refresh_header(rbd_dev, &hver);
1259 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1260 " update snaps: %d\n", rbd_dev->major, rc);
1262 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1266 * Request sync osd watch
1268 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1270 struct ceph_osd_req_op *ops;
1271 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1274 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1278 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1279 (void *)rbd_dev, &rbd_dev->watch_event);
1283 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1284 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1285 ops[0].watch.flag = 1;
1287 ret = rbd_req_sync_op(rbd_dev, NULL,
1289 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1291 rbd_dev->header_name,
1293 &rbd_dev->watch_request, NULL);
1298 rbd_destroy_ops(ops);
1302 ceph_osdc_cancel_event(rbd_dev->watch_event);
1303 rbd_dev->watch_event = NULL;
1305 rbd_destroy_ops(ops);
1310 * Request sync osd unwatch
1312 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1314 struct ceph_osd_req_op *ops;
1317 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1321 ops[0].watch.ver = 0;
1322 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1323 ops[0].watch.flag = 0;
1325 ret = rbd_req_sync_op(rbd_dev, NULL,
1327 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1329 rbd_dev->header_name,
1330 0, 0, NULL, NULL, NULL);
1333 rbd_destroy_ops(ops);
1334 ceph_osdc_cancel_event(rbd_dev->watch_event);
1335 rbd_dev->watch_event = NULL;
1339 struct rbd_notify_info {
1340 struct rbd_device *rbd_dev;
1343 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1345 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1349 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1350 rbd_dev->header_name, (unsigned long long) notify_id,
1351 (unsigned int) opcode);
1355 * Request sync osd notify
1357 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1359 struct ceph_osd_req_op *ops;
1360 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1361 struct ceph_osd_event *event;
1362 struct rbd_notify_info info;
1363 int payload_len = sizeof(u32) + sizeof(u32);
1366 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1370 info.rbd_dev = rbd_dev;
1372 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1373 (void *)&info, &event);
1377 ops[0].watch.ver = 1;
1378 ops[0].watch.flag = 1;
1379 ops[0].watch.cookie = event->cookie;
1380 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1381 ops[0].watch.timeout = 12;
1383 ret = rbd_req_sync_op(rbd_dev, NULL,
1385 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1387 rbd_dev->header_name,
1388 0, 0, NULL, NULL, NULL);
1392 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1393 dout("ceph_osdc_wait_event returned %d\n", ret);
1394 rbd_destroy_ops(ops);
1398 ceph_osdc_cancel_event(event);
1400 rbd_destroy_ops(ops);
1405 * Request sync osd read
1407 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1408 const char *object_name,
1409 const char *class_name,
1410 const char *method_name,
1415 struct ceph_osd_req_op *ops;
1416 int class_name_len = strlen(class_name);
1417 int method_name_len = strlen(method_name);
1420 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1421 class_name_len + method_name_len + len);
1425 ops[0].cls.class_name = class_name;
1426 ops[0].cls.class_len = (__u8) class_name_len;
1427 ops[0].cls.method_name = method_name;
1428 ops[0].cls.method_len = (__u8) method_name_len;
1429 ops[0].cls.argc = 0;
1430 ops[0].cls.indata = data;
1431 ops[0].cls.indata_len = len;
1433 ret = rbd_req_sync_op(rbd_dev, NULL,
1435 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1437 object_name, 0, 0, NULL, NULL, ver);
1439 rbd_destroy_ops(ops);
1441 dout("cls_exec returned %d\n", ret);
1445 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1447 struct rbd_req_coll *coll =
1448 kzalloc(sizeof(struct rbd_req_coll) +
1449 sizeof(struct rbd_req_status) * num_reqs,
1454 coll->total = num_reqs;
1455 kref_init(&coll->kref);
1460 * block device queue callback
1462 static void rbd_rq_fn(struct request_queue *q)
1464 struct rbd_device *rbd_dev = q->queuedata;
1466 struct bio_pair *bp = NULL;
1468 while ((rq = blk_fetch_request(q))) {
1470 struct bio *rq_bio, *next_bio = NULL;
1475 int num_segs, cur_seg = 0;
1476 struct rbd_req_coll *coll;
1477 struct ceph_snap_context *snapc;
1479 /* peek at request from block layer */
1483 dout("fetched request\n");
1485 /* filter out block requests we don't understand */
1486 if ((rq->cmd_type != REQ_TYPE_FS)) {
1487 __blk_end_request_all(rq, 0);
1491 /* deduce our operation (read, write) */
1492 do_write = (rq_data_dir(rq) == WRITE);
1494 size = blk_rq_bytes(rq);
1495 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1497 if (do_write && rbd_dev->read_only) {
1498 __blk_end_request_all(rq, -EROFS);
1502 spin_unlock_irq(q->queue_lock);
1504 down_read(&rbd_dev->header_rwsem);
1506 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1507 up_read(&rbd_dev->header_rwsem);
1508 dout("request for non-existent snapshot");
1509 spin_lock_irq(q->queue_lock);
1510 __blk_end_request_all(rq, -ENXIO);
1514 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1516 up_read(&rbd_dev->header_rwsem);
1518 dout("%s 0x%x bytes at 0x%llx\n",
1519 do_write ? "write" : "read",
1520 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1522 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1523 coll = rbd_alloc_coll(num_segs);
1525 spin_lock_irq(q->queue_lock);
1526 __blk_end_request_all(rq, -ENOMEM);
1527 ceph_put_snap_context(snapc);
1532 /* a bio clone to be passed down to OSD req */
1533 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534 op_size = rbd_get_segment(&rbd_dev->header,
1535 rbd_dev->header.object_prefix,
1538 kref_get(&coll->kref);
1539 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1540 op_size, GFP_ATOMIC);
1542 rbd_coll_end_req_index(rq, coll, cur_seg,
1548 /* init OSD command: write or read */
1550 rbd_req_write(rq, rbd_dev,
1556 rbd_req_read(rq, rbd_dev,
1569 kref_put(&coll->kref, rbd_coll_release);
1572 bio_pair_release(bp);
1573 spin_lock_irq(q->queue_lock);
1575 ceph_put_snap_context(snapc);
1580 * a queue callback. Makes sure that we don't create a bio that spans across
1581 * multiple osd objects. One exception would be with a single page bios,
1582 * which we handle later at bio_chain_clone
1584 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1585 struct bio_vec *bvec)
1587 struct rbd_device *rbd_dev = q->queuedata;
1588 unsigned int chunk_sectors;
1590 unsigned int bio_sectors;
1593 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1594 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1595 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1597 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1598 + bio_sectors)) << SECTOR_SHIFT;
1600 max = 0; /* bio_add cannot handle a negative return */
1601 if (max <= bvec->bv_len && bio_sectors == 0)
1602 return bvec->bv_len;
1606 static void rbd_free_disk(struct rbd_device *rbd_dev)
1608 struct gendisk *disk = rbd_dev->disk;
1613 rbd_header_free(&rbd_dev->header);
1615 if (disk->flags & GENHD_FL_UP)
1618 blk_cleanup_queue(disk->queue);
1623 * Read the complete header for the given rbd device.
1625 * Returns a pointer to a dynamically-allocated buffer containing
1626 * the complete and validated header. Caller can pass the address
1627 * of a variable that will be filled in with the version of the
1628 * header object at the time it was read.
1630 * Returns a pointer-coded errno if a failure occurs.
1632 static struct rbd_image_header_ondisk *
1633 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1635 struct rbd_image_header_ondisk *ondisk = NULL;
1642 * The complete header will include an array of its 64-bit
1643 * snapshot ids, followed by the names of those snapshots as
1644 * a contiguous block of NUL-terminated strings. Note that
1645 * the number of snapshots could change by the time we read
1646 * it in, in which case we re-read it.
1653 size = sizeof (*ondisk);
1654 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1656 ondisk = kmalloc(size, GFP_KERNEL);
1658 return ERR_PTR(-ENOMEM);
1660 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1661 rbd_dev->header_name,
1663 (char *) ondisk, version);
1667 if (WARN_ON((size_t) ret < size)) {
1669 pr_warning("short header read for image %s"
1670 " (want %zd got %d)\n",
1671 rbd_dev->image_name, size, ret);
1674 if (!rbd_dev_ondisk_valid(ondisk)) {
1676 pr_warning("invalid header for image %s\n",
1677 rbd_dev->image_name);
1681 names_size = le64_to_cpu(ondisk->snap_names_len);
1682 want_count = snap_count;
1683 snap_count = le32_to_cpu(ondisk->snap_count);
1684 } while (snap_count != want_count);
1691 return ERR_PTR(ret);
1695 * reload the ondisk the header
1697 static int rbd_read_header(struct rbd_device *rbd_dev,
1698 struct rbd_image_header *header)
1700 struct rbd_image_header_ondisk *ondisk;
1704 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1706 return PTR_ERR(ondisk);
1707 ret = rbd_header_from_disk(header, ondisk);
1709 header->obj_version = ver;
1718 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1719 const char *snap_name,
1722 int name_len = strlen(snap_name);
1726 struct ceph_mon_client *monc;
1728 /* we should create a snapshot only if we're pointing at the head */
1729 if (rbd_dev->snap_id != CEPH_NOSNAP)
1732 monc = &rbd_dev->rbd_client->client->monc;
1733 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1734 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1738 data = kmalloc(name_len + 16, gfp_flags);
1743 e = data + name_len + 16;
1745 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1746 ceph_encode_64_safe(&p, e, new_snapid, bad);
1748 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1750 data, p - data, NULL);
1754 return ret < 0 ? ret : 0;
1759 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1761 struct rbd_snap *snap;
1762 struct rbd_snap *next;
1764 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1765 __rbd_remove_snap_dev(snap);
1769 * only read the first part of the ondisk header, without the snaps info
1771 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1774 struct rbd_image_header h;
1776 ret = rbd_read_header(rbd_dev, &h);
1780 down_write(&rbd_dev->header_rwsem);
1783 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1784 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1786 dout("setting size to %llu sectors", (unsigned long long) size);
1787 set_capacity(rbd_dev->disk, size);
1790 /* rbd_dev->header.object_prefix shouldn't change */
1791 kfree(rbd_dev->header.snap_sizes);
1792 kfree(rbd_dev->header.snap_names);
1793 /* osd requests may still refer to snapc */
1794 ceph_put_snap_context(rbd_dev->header.snapc);
1797 *hver = h.obj_version;
1798 rbd_dev->header.obj_version = h.obj_version;
1799 rbd_dev->header.image_size = h.image_size;
1800 rbd_dev->header.total_snaps = h.total_snaps;
1801 rbd_dev->header.snapc = h.snapc;
1802 rbd_dev->header.snap_names = h.snap_names;
1803 rbd_dev->header.snap_names_len = h.snap_names_len;
1804 rbd_dev->header.snap_sizes = h.snap_sizes;
1805 /* Free the extra copy of the object prefix */
1806 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1807 kfree(h.object_prefix);
1809 ret = __rbd_init_snaps_header(rbd_dev);
1811 up_write(&rbd_dev->header_rwsem);
1816 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1820 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1821 ret = __rbd_refresh_header(rbd_dev, hver);
1822 mutex_unlock(&ctl_mutex);
1827 static int rbd_init_disk(struct rbd_device *rbd_dev)
1829 struct gendisk *disk;
1830 struct request_queue *q;
1835 /* contact OSD, request size info about the object being mapped */
1836 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1840 /* no need to lock here, as rbd_dev is not registered yet */
1841 rc = __rbd_init_snaps_header(rbd_dev);
1845 rc = rbd_header_set_snap(rbd_dev, &total_size);
1849 /* create gendisk info */
1851 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1855 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1857 disk->major = rbd_dev->major;
1858 disk->first_minor = 0;
1859 disk->fops = &rbd_bd_ops;
1860 disk->private_data = rbd_dev;
1864 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1868 /* We use the default size, but let's be explicit about it. */
1869 blk_queue_physical_block_size(q, SECTOR_SIZE);
1871 /* set io sizes to object size */
1872 segment_size = rbd_obj_bytes(&rbd_dev->header);
1873 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1874 blk_queue_max_segment_size(q, segment_size);
1875 blk_queue_io_min(q, segment_size);
1876 blk_queue_io_opt(q, segment_size);
1878 blk_queue_merge_bvec(q, rbd_merge_bvec);
1881 q->queuedata = rbd_dev;
1883 rbd_dev->disk = disk;
1886 /* finally, announce the disk to the world */
1887 set_capacity(disk, total_size / SECTOR_SIZE);
1890 pr_info("%s: added with size 0x%llx\n",
1891 disk->disk_name, (unsigned long long)total_size);
1904 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1906 return container_of(dev, struct rbd_device, dev);
1909 static ssize_t rbd_size_show(struct device *dev,
1910 struct device_attribute *attr, char *buf)
1912 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1915 down_read(&rbd_dev->header_rwsem);
1916 size = get_capacity(rbd_dev->disk);
1917 up_read(&rbd_dev->header_rwsem);
1919 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1922 static ssize_t rbd_major_show(struct device *dev,
1923 struct device_attribute *attr, char *buf)
1925 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1927 return sprintf(buf, "%d\n", rbd_dev->major);
1930 static ssize_t rbd_client_id_show(struct device *dev,
1931 struct device_attribute *attr, char *buf)
1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1935 return sprintf(buf, "client%lld\n",
1936 ceph_client_id(rbd_dev->rbd_client->client));
1939 static ssize_t rbd_pool_show(struct device *dev,
1940 struct device_attribute *attr, char *buf)
1942 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1944 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1947 static ssize_t rbd_pool_id_show(struct device *dev,
1948 struct device_attribute *attr, char *buf)
1950 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1952 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1955 static ssize_t rbd_name_show(struct device *dev,
1956 struct device_attribute *attr, char *buf)
1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960 return sprintf(buf, "%s\n", rbd_dev->image_name);
1963 static ssize_t rbd_snap_show(struct device *dev,
1964 struct device_attribute *attr,
1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1969 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1972 static ssize_t rbd_image_refresh(struct device *dev,
1973 struct device_attribute *attr,
1977 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1980 ret = rbd_refresh_header(rbd_dev, NULL);
1982 return ret < 0 ? ret : size;
1985 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1986 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1987 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1988 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1989 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1990 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1991 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1992 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1993 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1995 static struct attribute *rbd_attrs[] = {
1996 &dev_attr_size.attr,
1997 &dev_attr_major.attr,
1998 &dev_attr_client_id.attr,
1999 &dev_attr_pool.attr,
2000 &dev_attr_pool_id.attr,
2001 &dev_attr_name.attr,
2002 &dev_attr_current_snap.attr,
2003 &dev_attr_refresh.attr,
2004 &dev_attr_create_snap.attr,
2008 static struct attribute_group rbd_attr_group = {
2012 static const struct attribute_group *rbd_attr_groups[] = {
2017 static void rbd_sysfs_dev_release(struct device *dev)
2021 static struct device_type rbd_device_type = {
2023 .groups = rbd_attr_groups,
2024 .release = rbd_sysfs_dev_release,
2032 static ssize_t rbd_snap_size_show(struct device *dev,
2033 struct device_attribute *attr,
2036 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2038 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2041 static ssize_t rbd_snap_id_show(struct device *dev,
2042 struct device_attribute *attr,
2045 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2047 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2050 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2051 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2053 static struct attribute *rbd_snap_attrs[] = {
2054 &dev_attr_snap_size.attr,
2055 &dev_attr_snap_id.attr,
2059 static struct attribute_group rbd_snap_attr_group = {
2060 .attrs = rbd_snap_attrs,
2063 static void rbd_snap_dev_release(struct device *dev)
2065 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2070 static const struct attribute_group *rbd_snap_attr_groups[] = {
2071 &rbd_snap_attr_group,
2075 static struct device_type rbd_snap_device_type = {
2076 .groups = rbd_snap_attr_groups,
2077 .release = rbd_snap_dev_release,
2080 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2082 list_del(&snap->node);
2083 device_unregister(&snap->dev);
2086 static int rbd_register_snap_dev(struct rbd_snap *snap,
2087 struct device *parent)
2089 struct device *dev = &snap->dev;
2092 dev->type = &rbd_snap_device_type;
2093 dev->parent = parent;
2094 dev->release = rbd_snap_dev_release;
2095 dev_set_name(dev, "snap_%s", snap->name);
2096 ret = device_register(dev);
2101 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2102 int i, const char *name)
2104 struct rbd_snap *snap;
2107 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2109 return ERR_PTR(-ENOMEM);
2112 snap->name = kstrdup(name, GFP_KERNEL);
2116 snap->size = rbd_dev->header.snap_sizes[i];
2117 snap->id = rbd_dev->header.snapc->snaps[i];
2118 if (device_is_registered(&rbd_dev->dev)) {
2119 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2130 return ERR_PTR(ret);
2134 * Scan the rbd device's current snapshot list and compare it to the
2135 * newly-received snapshot context. Remove any existing snapshots
2136 * not present in the new snapshot context. Add a new snapshot for
2137 * any snaphots in the snapshot context not in the current list.
2138 * And verify there are no changes to snapshots we already know
2141 * Assumes the snapshots in the snapshot context are sorted by
2142 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2143 * are also maintained in that order.)
2145 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2147 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2148 const u32 snap_count = snapc->num_snaps;
2149 char *snap_name = rbd_dev->header.snap_names;
2150 struct list_head *head = &rbd_dev->snaps;
2151 struct list_head *links = head->next;
2154 while (index < snap_count || links != head) {
2156 struct rbd_snap *snap;
2158 snap_id = index < snap_count ? snapc->snaps[index]
2160 snap = links != head ? list_entry(links, struct rbd_snap, node)
2162 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2164 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2165 struct list_head *next = links->next;
2167 /* Existing snapshot not in the new snap context */
2169 if (rbd_dev->snap_id == snap->id)
2170 rbd_dev->snap_exists = false;
2171 __rbd_remove_snap_dev(snap);
2173 /* Done with this list entry; advance */
2179 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2180 struct rbd_snap *new_snap;
2182 /* We haven't seen this snapshot before */
2184 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2186 if (IS_ERR(new_snap))
2187 return PTR_ERR(new_snap);
2189 /* New goes before existing, or at end of list */
2192 list_add_tail(&new_snap->node, &snap->node);
2194 list_add_tail(&new_snap->node, head);
2196 /* Already have this one */
2198 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2199 BUG_ON(strcmp(snap->name, snap_name));
2201 /* Done with this list entry; advance */
2203 links = links->next;
2206 /* Advance to the next entry in the snapshot context */
2209 snap_name += strlen(snap_name) + 1;
2215 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2219 struct rbd_snap *snap;
2221 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2222 dev = &rbd_dev->dev;
2224 dev->bus = &rbd_bus_type;
2225 dev->type = &rbd_device_type;
2226 dev->parent = &rbd_root_dev;
2227 dev->release = rbd_dev_release;
2228 dev_set_name(dev, "%d", rbd_dev->dev_id);
2229 ret = device_register(dev);
2233 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2234 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2239 mutex_unlock(&ctl_mutex);
2243 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2245 device_unregister(&rbd_dev->dev);
2248 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2253 ret = rbd_req_sync_watch(rbd_dev);
2254 if (ret == -ERANGE) {
2255 rc = rbd_refresh_header(rbd_dev, NULL);
2259 } while (ret == -ERANGE);
2264 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2267 * Get a unique rbd identifier for the given new rbd_dev, and add
2268 * the rbd_dev to the global list. The minimum rbd id is 1.
2270 static void rbd_id_get(struct rbd_device *rbd_dev)
2272 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2274 spin_lock(&rbd_dev_list_lock);
2275 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2276 spin_unlock(&rbd_dev_list_lock);
2280 * Remove an rbd_dev from the global list, and record that its
2281 * identifier is no longer in use.
2283 static void rbd_id_put(struct rbd_device *rbd_dev)
2285 struct list_head *tmp;
2286 int rbd_id = rbd_dev->dev_id;
2291 spin_lock(&rbd_dev_list_lock);
2292 list_del_init(&rbd_dev->node);
2295 * If the id being "put" is not the current maximum, there
2296 * is nothing special we need to do.
2298 if (rbd_id != atomic64_read(&rbd_id_max)) {
2299 spin_unlock(&rbd_dev_list_lock);
2304 * We need to update the current maximum id. Search the
2305 * list to find out what it is. We're more likely to find
2306 * the maximum at the end, so search the list backward.
2309 list_for_each_prev(tmp, &rbd_dev_list) {
2310 struct rbd_device *rbd_dev;
2312 rbd_dev = list_entry(tmp, struct rbd_device, node);
2313 if (rbd_id > max_id)
2316 spin_unlock(&rbd_dev_list_lock);
2319 * The max id could have been updated by rbd_id_get(), in
2320 * which case it now accurately reflects the new maximum.
2321 * Be careful not to overwrite the maximum value in that
2324 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2328 * Skips over white space at *buf, and updates *buf to point to the
2329 * first found non-space character (if any). Returns the length of
2330 * the token (string of non-white space characters) found. Note
2331 * that *buf must be terminated with '\0'.
2333 static inline size_t next_token(const char **buf)
2336 * These are the characters that produce nonzero for
2337 * isspace() in the "C" and "POSIX" locales.
2339 const char *spaces = " \f\n\r\t\v";
2341 *buf += strspn(*buf, spaces); /* Find start of token */
2343 return strcspn(*buf, spaces); /* Return token length */
2347 * Finds the next token in *buf, and if the provided token buffer is
2348 * big enough, copies the found token into it. The result, if
2349 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2350 * must be terminated with '\0' on entry.
2352 * Returns the length of the token found (not including the '\0').
2353 * Return value will be 0 if no token is found, and it will be >=
2354 * token_size if the token would not fit.
2356 * The *buf pointer will be updated to point beyond the end of the
2357 * found token. Note that this occurs even if the token buffer is
2358 * too small to hold it.
2360 static inline size_t copy_token(const char **buf,
2366 len = next_token(buf);
2367 if (len < token_size) {
2368 memcpy(token, *buf, len);
2369 *(token + len) = '\0';
2377 * Finds the next token in *buf, dynamically allocates a buffer big
2378 * enough to hold a copy of it, and copies the token into the new
2379 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2380 * that a duplicate buffer is created even for a zero-length token.
2382 * Returns a pointer to the newly-allocated duplicate, or a null
2383 * pointer if memory for the duplicate was not available. If
2384 * the lenp argument is a non-null pointer, the length of the token
2385 * (not including the '\0') is returned in *lenp.
2387 * If successful, the *buf pointer will be updated to point beyond
2388 * the end of the found token.
2390 * Note: uses GFP_KERNEL for allocation.
2392 static inline char *dup_token(const char **buf, size_t *lenp)
2397 len = next_token(buf);
2398 dup = kmalloc(len + 1, GFP_KERNEL);
2402 memcpy(dup, *buf, len);
2403 *(dup + len) = '\0';
2413 * This fills in the pool_name, image_name, image_name_len, snap_name,
2414 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2415 * on the list of monitor addresses and other options provided via
2418 * Note: rbd_dev is assumed to have been initially zero-filled.
2420 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2422 const char **mon_addrs,
2423 size_t *mon_addrs_size,
2425 size_t options_size)
2430 /* The first four tokens are required */
2432 len = next_token(&buf);
2435 *mon_addrs_size = len + 1;
2440 len = copy_token(&buf, options, options_size);
2441 if (!len || len >= options_size)
2445 rbd_dev->pool_name = dup_token(&buf, NULL);
2446 if (!rbd_dev->pool_name)
2449 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2450 if (!rbd_dev->image_name)
2453 /* Create the name of the header object */
2455 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2456 + sizeof (RBD_SUFFIX),
2458 if (!rbd_dev->header_name)
2460 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2463 * The snapshot name is optional. If none is is supplied,
2464 * we use the default value.
2466 rbd_dev->snap_name = dup_token(&buf, &len);
2467 if (!rbd_dev->snap_name)
2470 /* Replace the empty name with the default */
2471 kfree(rbd_dev->snap_name);
2473 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2474 if (!rbd_dev->snap_name)
2477 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2478 sizeof (RBD_SNAP_HEAD_NAME));
2484 kfree(rbd_dev->header_name);
2485 rbd_dev->header_name = NULL;
2486 kfree(rbd_dev->image_name);
2487 rbd_dev->image_name = NULL;
2488 rbd_dev->image_name_len = 0;
2489 kfree(rbd_dev->pool_name);
2490 rbd_dev->pool_name = NULL;
2495 static ssize_t rbd_add(struct bus_type *bus,
2500 struct rbd_device *rbd_dev = NULL;
2501 const char *mon_addrs = NULL;
2502 size_t mon_addrs_size = 0;
2503 struct ceph_osd_client *osdc;
2506 if (!try_module_get(THIS_MODULE))
2509 options = kmalloc(count, GFP_KERNEL);
2512 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2516 /* static rbd_device initialization */
2517 spin_lock_init(&rbd_dev->lock);
2518 INIT_LIST_HEAD(&rbd_dev->node);
2519 INIT_LIST_HEAD(&rbd_dev->snaps);
2520 init_rwsem(&rbd_dev->header_rwsem);
2522 /* generate unique id: find highest unique id, add one */
2523 rbd_id_get(rbd_dev);
2525 /* Fill in the device name, now that we have its id. */
2526 BUILD_BUG_ON(DEV_NAME_LEN
2527 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2528 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2530 /* parse add command */
2531 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2536 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2538 if (IS_ERR(rbd_dev->rbd_client)) {
2539 rc = PTR_ERR(rbd_dev->rbd_client);
2540 rbd_dev->rbd_client = NULL;
2545 osdc = &rbd_dev->rbd_client->client->osdc;
2546 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2548 goto err_out_client;
2549 rbd_dev->pool_id = rc;
2551 /* register our block device */
2552 rc = register_blkdev(0, rbd_dev->name);
2554 goto err_out_client;
2555 rbd_dev->major = rc;
2557 rc = rbd_bus_add_dev(rbd_dev);
2559 goto err_out_blkdev;
2562 * At this point cleanup in the event of an error is the job
2563 * of the sysfs code (initiated by rbd_bus_del_dev()).
2565 * Set up and announce blkdev mapping.
2567 rc = rbd_init_disk(rbd_dev);
2571 rc = rbd_init_watch_dev(rbd_dev);
2578 /* this will also clean up rest of rbd_dev stuff */
2580 rbd_bus_del_dev(rbd_dev);
2585 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2587 rbd_put_client(rbd_dev);
2589 if (rbd_dev->pool_name) {
2590 kfree(rbd_dev->snap_name);
2591 kfree(rbd_dev->header_name);
2592 kfree(rbd_dev->image_name);
2593 kfree(rbd_dev->pool_name);
2595 rbd_id_put(rbd_dev);
2600 dout("Error adding device %s\n", buf);
2601 module_put(THIS_MODULE);
2603 return (ssize_t) rc;
2606 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2608 struct list_head *tmp;
2609 struct rbd_device *rbd_dev;
2611 spin_lock(&rbd_dev_list_lock);
2612 list_for_each(tmp, &rbd_dev_list) {
2613 rbd_dev = list_entry(tmp, struct rbd_device, node);
2614 if (rbd_dev->dev_id == dev_id) {
2615 spin_unlock(&rbd_dev_list_lock);
2619 spin_unlock(&rbd_dev_list_lock);
2623 static void rbd_dev_release(struct device *dev)
2625 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2627 if (rbd_dev->watch_request) {
2628 struct ceph_client *client = rbd_dev->rbd_client->client;
2630 ceph_osdc_unregister_linger_request(&client->osdc,
2631 rbd_dev->watch_request);
2633 if (rbd_dev->watch_event)
2634 rbd_req_sync_unwatch(rbd_dev);
2636 rbd_put_client(rbd_dev);
2638 /* clean up and free blkdev */
2639 rbd_free_disk(rbd_dev);
2640 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2642 /* done with the id, and with the rbd_dev */
2643 kfree(rbd_dev->snap_name);
2644 kfree(rbd_dev->header_name);
2645 kfree(rbd_dev->pool_name);
2646 kfree(rbd_dev->image_name);
2647 rbd_id_put(rbd_dev);
2650 /* release module ref */
2651 module_put(THIS_MODULE);
2654 static ssize_t rbd_remove(struct bus_type *bus,
2658 struct rbd_device *rbd_dev = NULL;
2663 rc = strict_strtoul(buf, 10, &ul);
2667 /* convert to int; abort if we lost anything in the conversion */
2668 target_id = (int) ul;
2669 if (target_id != ul)
2672 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2674 rbd_dev = __rbd_get_dev(target_id);
2680 __rbd_remove_all_snaps(rbd_dev);
2681 rbd_bus_del_dev(rbd_dev);
2684 mutex_unlock(&ctl_mutex);
2688 static ssize_t rbd_snap_add(struct device *dev,
2689 struct device_attribute *attr,
2693 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2695 char *name = kmalloc(count + 1, GFP_KERNEL);
2699 snprintf(name, count, "%s", buf);
2701 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2703 ret = rbd_header_add_snap(rbd_dev,
2708 ret = __rbd_refresh_header(rbd_dev, NULL);
2712 /* shouldn't hold ctl_mutex when notifying.. notify might
2713 trigger a watch callback that would need to get that mutex */
2714 mutex_unlock(&ctl_mutex);
2716 /* make a best effort, don't error if failed */
2717 rbd_req_sync_notify(rbd_dev);
2724 mutex_unlock(&ctl_mutex);
2730 * create control files in sysfs
2733 static int rbd_sysfs_init(void)
2737 ret = device_register(&rbd_root_dev);
2741 ret = bus_register(&rbd_bus_type);
2743 device_unregister(&rbd_root_dev);
2748 static void rbd_sysfs_cleanup(void)
2750 bus_unregister(&rbd_bus_type);
2751 device_unregister(&rbd_root_dev);
2754 int __init rbd_init(void)
2758 rc = rbd_sysfs_init();
2761 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2765 void __exit rbd_exit(void)
2767 rbd_sysfs_cleanup();
2770 module_init(rbd_init);
2771 module_exit(rbd_exit);
2773 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2774 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2775 MODULE_DESCRIPTION("rados block device");
2777 /* following authorship retained from original osdblk.c */
2778 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2780 MODULE_LICENSE("GPL");