rbd: have rbd_parse_args() report found mon_addrs size
[linux-2.6-block.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
f0f8cef5
AE
44#define RBD_DRV_NAME "rbd"
45#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
46
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
21079786 49#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
602adf40
YS
50#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024
53
54#define RBD_SNAP_HEAD_NAME "-"
55
81a89793
AE
56/*
57 * An RBD device name will be "rbd#", where the "rbd" comes from
58 * RBD_DRV_NAME above, and # is a unique integer identifier.
59 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
60 * enough to hold all possible device names.
61 */
602adf40 62#define DEV_NAME_LEN 32
81a89793 63#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 64
59c2be1e
YS
65#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
66
602adf40
YS
67/*
68 * block device image metadata (in-memory version)
69 */
70struct rbd_image_header {
71 u64 image_size;
72 char block_name[32];
73 __u8 obj_order;
74 __u8 crypt_type;
75 __u8 comp_type;
76 struct rw_semaphore snap_rwsem;
77 struct ceph_snap_context *snapc;
78 size_t snap_names_len;
79 u64 snap_seq;
80 u32 total_snaps;
81
82 char *snap_names;
83 u64 *snap_sizes;
59c2be1e
YS
84
85 u64 obj_version;
86};
87
88struct rbd_options {
89 int notify_timeout;
602adf40
YS
90};
91
92/*
f0f8cef5 93 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
94 */
95struct rbd_client {
96 struct ceph_client *client;
59c2be1e 97 struct rbd_options *rbd_opts;
602adf40
YS
98 struct kref kref;
99 struct list_head node;
100};
101
102/*
f0f8cef5 103 * a request completion status
602adf40 104 */
1fec7093
YS
105struct rbd_req_status {
106 int done;
107 int rc;
108 u64 bytes;
109};
110
111/*
112 * a collection of requests
113 */
114struct rbd_req_coll {
115 int total;
116 int num_done;
117 struct kref kref;
118 struct rbd_req_status status[0];
602adf40
YS
119};
120
f0f8cef5
AE
121/*
122 * a single io request
123 */
124struct rbd_request {
125 struct request *rq; /* blk layer request */
126 struct bio *bio; /* cloned bio */
127 struct page **pages; /* list of used pages */
128 u64 len;
129 int coll_index;
130 struct rbd_req_coll *coll;
131};
132
dfc5606d
YS
133struct rbd_snap {
134 struct device dev;
135 const char *name;
136 size_t size;
137 struct list_head node;
138 u64 id;
139};
140
602adf40
YS
141/*
142 * a single device
143 */
144struct rbd_device {
145 int id; /* blkdev unique id */
146
147 int major; /* blkdev assigned major */
148 struct gendisk *disk; /* blkdev's gendisk and rq */
149 struct request_queue *q;
150
602adf40
YS
151 struct rbd_client *rbd_client;
152
153 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
154
155 spinlock_t lock; /* queue lock */
156
157 struct rbd_image_header header;
158 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
159 int obj_len;
160 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
161 char pool_name[RBD_MAX_POOL_NAME_LEN];
162 int poolid;
163
59c2be1e
YS
164 struct ceph_osd_event *watch_event;
165 struct ceph_osd_request *watch_request;
166
602adf40
YS
167 char snap_name[RBD_MAX_SNAP_NAME_LEN];
168 u32 cur_snap; /* index+1 of current snapshot within snap context
169 0 - for the head */
170 int read_only;
171
172 struct list_head node;
dfc5606d
YS
173
174 /* list of snapshots */
175 struct list_head snaps;
176
177 /* sysfs related */
178 struct device dev;
179};
180
602adf40 181static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 182
602adf40 183static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
184static DEFINE_SPINLOCK(rbd_dev_list_lock);
185
432b8587
AE
186static LIST_HEAD(rbd_client_list); /* clients */
187static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 188
dfc5606d
YS
189static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
190static void rbd_dev_release(struct device *dev);
dfc5606d
YS
191static ssize_t rbd_snap_add(struct device *dev,
192 struct device_attribute *attr,
193 const char *buf,
194 size_t count);
195static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 196 struct rbd_snap *snap);
dfc5606d 197
f0f8cef5
AE
198static ssize_t rbd_add(struct bus_type *bus, const char *buf,
199 size_t count);
200static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
201 size_t count);
202
203static struct bus_attribute rbd_bus_attrs[] = {
204 __ATTR(add, S_IWUSR, NULL, rbd_add),
205 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
206 __ATTR_NULL
207};
208
209static struct bus_type rbd_bus_type = {
210 .name = "rbd",
211 .bus_attrs = rbd_bus_attrs,
212};
213
214static void rbd_root_dev_release(struct device *dev)
215{
216}
217
218static struct device rbd_root_dev = {
219 .init_name = "rbd",
220 .release = rbd_root_dev_release,
221};
222
dfc5606d
YS
223
224static struct rbd_device *dev_to_rbd(struct device *dev)
225{
226 return container_of(dev, struct rbd_device, dev);
227}
228
229static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
230{
231 return get_device(&rbd_dev->dev);
232}
233
234static void rbd_put_dev(struct rbd_device *rbd_dev)
235{
236 put_device(&rbd_dev->dev);
237}
602adf40 238
59c2be1e
YS
239static int __rbd_update_snaps(struct rbd_device *rbd_dev);
240
602adf40
YS
241static int rbd_open(struct block_device *bdev, fmode_t mode)
242{
f0f8cef5 243 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 244
dfc5606d
YS
245 rbd_get_dev(rbd_dev);
246
602adf40
YS
247 set_device_ro(bdev, rbd_dev->read_only);
248
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250 return -EROFS;
251
252 return 0;
253}
254
dfc5606d
YS
255static int rbd_release(struct gendisk *disk, fmode_t mode)
256{
257 struct rbd_device *rbd_dev = disk->private_data;
258
259 rbd_put_dev(rbd_dev);
260
261 return 0;
262}
263
602adf40
YS
264static const struct block_device_operations rbd_bd_ops = {
265 .owner = THIS_MODULE,
266 .open = rbd_open,
dfc5606d 267 .release = rbd_release,
602adf40
YS
268};
269
270/*
271 * Initialize an rbd client instance.
272 * We own *opt.
273 */
59c2be1e
YS
274static struct rbd_client *rbd_client_create(struct ceph_options *opt,
275 struct rbd_options *rbd_opts)
602adf40
YS
276{
277 struct rbd_client *rbdc;
278 int ret = -ENOMEM;
279
280 dout("rbd_client_create\n");
281 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
282 if (!rbdc)
283 goto out_opt;
284
285 kref_init(&rbdc->kref);
286 INIT_LIST_HEAD(&rbdc->node);
287
bc534d86
AE
288 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
289
6ab00d46 290 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
602adf40 291 if (IS_ERR(rbdc->client))
bc534d86 292 goto out_mutex;
28f259b7 293 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
294
295 ret = ceph_open_session(rbdc->client);
296 if (ret < 0)
297 goto out_err;
298
59c2be1e
YS
299 rbdc->rbd_opts = rbd_opts;
300
432b8587 301 spin_lock(&rbd_client_list_lock);
602adf40 302 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 303 spin_unlock(&rbd_client_list_lock);
602adf40 304
bc534d86
AE
305 mutex_unlock(&ctl_mutex);
306
602adf40
YS
307 dout("rbd_client_create created %p\n", rbdc);
308 return rbdc;
309
310out_err:
311 ceph_destroy_client(rbdc->client);
bc534d86
AE
312out_mutex:
313 mutex_unlock(&ctl_mutex);
602adf40
YS
314 kfree(rbdc);
315out_opt:
28f259b7
VK
316 if (opt)
317 ceph_destroy_options(opt);
318 return ERR_PTR(ret);
602adf40
YS
319}
320
321/*
322 * Find a ceph client with specific addr and configuration.
323 */
324static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
325{
326 struct rbd_client *client_node;
327
328 if (opt->flags & CEPH_OPT_NOSHARE)
329 return NULL;
330
331 list_for_each_entry(client_node, &rbd_client_list, node)
332 if (ceph_compare_options(opt, client_node->client) == 0)
333 return client_node;
334 return NULL;
335}
336
59c2be1e
YS
337/*
338 * mount options
339 */
340enum {
341 Opt_notify_timeout,
342 Opt_last_int,
343 /* int args above */
344 Opt_last_string,
345 /* string args above */
346};
347
348static match_table_t rbdopt_tokens = {
349 {Opt_notify_timeout, "notify_timeout=%d"},
350 /* int args above */
351 /* string args above */
352 {-1, NULL}
353};
354
355static int parse_rbd_opts_token(char *c, void *private)
356{
357 struct rbd_options *rbdopt = private;
358 substring_t argstr[MAX_OPT_ARGS];
359 int token, intval, ret;
360
21079786 361 token = match_token(c, rbdopt_tokens, argstr);
59c2be1e
YS
362 if (token < 0)
363 return -EINVAL;
364
365 if (token < Opt_last_int) {
366 ret = match_int(&argstr[0], &intval);
367 if (ret < 0) {
368 pr_err("bad mount option arg (not int) "
369 "at '%s'\n", c);
370 return ret;
371 }
372 dout("got int token %d val %d\n", token, intval);
373 } else if (token > Opt_last_int && token < Opt_last_string) {
374 dout("got string token %d val %s\n", token,
375 argstr[0].from);
376 } else {
377 dout("got token %d\n", token);
378 }
379
380 switch (token) {
381 case Opt_notify_timeout:
382 rbdopt->notify_timeout = intval;
383 break;
384 default:
385 BUG_ON(token);
386 }
387 return 0;
388}
389
602adf40
YS
390/*
391 * Get a ceph client with specific addr and configuration, if one does
392 * not exist create it.
393 */
5214ecc4
AE
394static struct rbd_client *rbd_get_client(const char *mon_addr,
395 size_t mon_addr_len,
396 char *options)
602adf40
YS
397{
398 struct rbd_client *rbdc;
399 struct ceph_options *opt;
59c2be1e
YS
400 struct rbd_options *rbd_opts;
401
402 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
403 if (!rbd_opts)
d720bcb0 404 return ERR_PTR(-ENOMEM);
59c2be1e
YS
405
406 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 407
ee57741c 408 opt = ceph_parse_options(options, mon_addr,
5214ecc4 409 mon_addr + mon_addr_len,
21079786 410 parse_rbd_opts_token, rbd_opts);
ee57741c 411 if (IS_ERR(opt)) {
d720bcb0
AE
412 kfree(rbd_opts);
413 return ERR_CAST(opt);
ee57741c 414 }
602adf40 415
432b8587 416 spin_lock(&rbd_client_list_lock);
602adf40
YS
417 rbdc = __rbd_client_find(opt);
418 if (rbdc) {
602adf40
YS
419 /* using an existing client */
420 kref_get(&rbdc->kref);
432b8587 421 spin_unlock(&rbd_client_list_lock);
e6994d3d 422
e6994d3d
AE
423 ceph_destroy_options(opt);
424 kfree(rbd_opts);
425
d720bcb0 426 return rbdc;
602adf40 427 }
432b8587 428 spin_unlock(&rbd_client_list_lock);
602adf40 429
59c2be1e 430 rbdc = rbd_client_create(opt, rbd_opts);
d97081b0 431
d720bcb0
AE
432 if (IS_ERR(rbdc))
433 kfree(rbd_opts);
602adf40 434
d720bcb0 435 return rbdc;
602adf40
YS
436}
437
438/*
439 * Destroy ceph client
d23a4b3f 440 *
432b8587 441 * Caller must hold rbd_client_list_lock.
602adf40
YS
442 */
443static void rbd_client_release(struct kref *kref)
444{
445 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
446
447 dout("rbd_release_client %p\n", rbdc);
602adf40 448 list_del(&rbdc->node);
602adf40
YS
449
450 ceph_destroy_client(rbdc->client);
59c2be1e 451 kfree(rbdc->rbd_opts);
602adf40
YS
452 kfree(rbdc);
453}
454
455/*
456 * Drop reference to ceph client node. If it's not referenced anymore, release
457 * it.
458 */
459static void rbd_put_client(struct rbd_device *rbd_dev)
460{
432b8587 461 spin_lock(&rbd_client_list_lock);
602adf40 462 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
432b8587 463 spin_unlock(&rbd_client_list_lock);
602adf40 464 rbd_dev->rbd_client = NULL;
602adf40
YS
465}
466
1fec7093
YS
467/*
468 * Destroy requests collection
469 */
470static void rbd_coll_release(struct kref *kref)
471{
472 struct rbd_req_coll *coll =
473 container_of(kref, struct rbd_req_coll, kref);
474
475 dout("rbd_coll_release %p\n", coll);
476 kfree(coll);
477}
602adf40
YS
478
479/*
480 * Create a new header structure, translate header format from the on-disk
481 * header.
482 */
483static int rbd_header_from_disk(struct rbd_image_header *header,
484 struct rbd_image_header_ondisk *ondisk,
485 int allocated_snaps,
486 gfp_t gfp_flags)
487{
488 int i;
489 u32 snap_count = le32_to_cpu(ondisk->snap_count);
490 int ret = -ENOMEM;
491
21079786 492 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 493 return -ENXIO;
81e759fb 494
602adf40 495 init_rwsem(&header->snap_rwsem);
602adf40
YS
496 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
497 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
21079786 498 snap_count * sizeof (*ondisk),
602adf40
YS
499 gfp_flags);
500 if (!header->snapc)
501 return -ENOMEM;
502 if (snap_count) {
503 header->snap_names = kmalloc(header->snap_names_len,
504 GFP_KERNEL);
505 if (!header->snap_names)
506 goto err_snapc;
507 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
508 GFP_KERNEL);
509 if (!header->snap_sizes)
510 goto err_names;
511 } else {
512 header->snap_names = NULL;
513 header->snap_sizes = NULL;
514 }
515 memcpy(header->block_name, ondisk->block_name,
516 sizeof(ondisk->block_name));
517
518 header->image_size = le64_to_cpu(ondisk->image_size);
519 header->obj_order = ondisk->options.order;
520 header->crypt_type = ondisk->options.crypt_type;
521 header->comp_type = ondisk->options.comp_type;
522
523 atomic_set(&header->snapc->nref, 1);
524 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
525 header->snapc->num_snaps = snap_count;
526 header->total_snaps = snap_count;
527
21079786 528 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
529 for (i = 0; i < snap_count; i++) {
530 header->snapc->snaps[i] =
531 le64_to_cpu(ondisk->snaps[i].id);
532 header->snap_sizes[i] =
533 le64_to_cpu(ondisk->snaps[i].image_size);
534 }
535
536 /* copy snapshot names */
537 memcpy(header->snap_names, &ondisk->snaps[i],
538 header->snap_names_len);
539 }
540
541 return 0;
542
543err_names:
544 kfree(header->snap_names);
545err_snapc:
546 kfree(header->snapc);
547 return ret;
548}
549
550static int snap_index(struct rbd_image_header *header, int snap_num)
551{
552 return header->total_snaps - snap_num;
553}
554
555static u64 cur_snap_id(struct rbd_device *rbd_dev)
556{
557 struct rbd_image_header *header = &rbd_dev->header;
558
559 if (!rbd_dev->cur_snap)
560 return 0;
561
562 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
563}
564
565static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
566 u64 *seq, u64 *size)
567{
568 int i;
569 char *p = header->snap_names;
570
571 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
572 if (strcmp(snap_name, p) == 0)
573 break;
574 }
575 if (i == header->total_snaps)
576 return -ENOENT;
577 if (seq)
578 *seq = header->snapc->snaps[i];
579
580 if (size)
581 *size = header->snap_sizes[i];
582
583 return i;
584}
585
cc9d734c 586static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
602adf40
YS
587{
588 struct rbd_image_header *header = &dev->header;
589 struct ceph_snap_context *snapc = header->snapc;
590 int ret = -ENOENT;
591
cc9d734c
JD
592 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
593
602adf40
YS
594 down_write(&header->snap_rwsem);
595
cc9d734c
JD
596 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
597 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
598 if (header->total_snaps)
599 snapc->seq = header->snap_seq;
600 else
601 snapc->seq = 0;
602 dev->cur_snap = 0;
603 dev->read_only = 0;
604 if (size)
605 *size = header->image_size;
606 } else {
cc9d734c 607 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
602adf40
YS
608 if (ret < 0)
609 goto done;
610
611 dev->cur_snap = header->total_snaps - ret;
612 dev->read_only = 1;
613 }
614
615 ret = 0;
616done:
617 up_write(&header->snap_rwsem);
618 return ret;
619}
620
621static void rbd_header_free(struct rbd_image_header *header)
622{
623 kfree(header->snapc);
624 kfree(header->snap_names);
625 kfree(header->snap_sizes);
626}
627
628/*
629 * get the actual striped segment name, offset and length
630 */
631static u64 rbd_get_segment(struct rbd_image_header *header,
632 const char *block_name,
633 u64 ofs, u64 len,
634 char *seg_name, u64 *segofs)
635{
636 u64 seg = ofs >> header->obj_order;
637
638 if (seg_name)
639 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
640 "%s.%012llx", block_name, seg);
641
642 ofs = ofs & ((1 << header->obj_order) - 1);
643 len = min_t(u64, len, (1 << header->obj_order) - ofs);
644
645 if (segofs)
646 *segofs = ofs;
647
648 return len;
649}
650
1fec7093
YS
651static int rbd_get_num_segments(struct rbd_image_header *header,
652 u64 ofs, u64 len)
653{
654 u64 start_seg = ofs >> header->obj_order;
655 u64 end_seg = (ofs + len - 1) >> header->obj_order;
656 return end_seg - start_seg + 1;
657}
658
029bcbd8
JD
659/*
660 * returns the size of an object in the image
661 */
662static u64 rbd_obj_bytes(struct rbd_image_header *header)
663{
664 return 1 << header->obj_order;
665}
666
602adf40
YS
667/*
668 * bio helpers
669 */
670
671static void bio_chain_put(struct bio *chain)
672{
673 struct bio *tmp;
674
675 while (chain) {
676 tmp = chain;
677 chain = chain->bi_next;
678 bio_put(tmp);
679 }
680}
681
682/*
683 * zeros a bio chain, starting at specific offset
684 */
685static void zero_bio_chain(struct bio *chain, int start_ofs)
686{
687 struct bio_vec *bv;
688 unsigned long flags;
689 void *buf;
690 int i;
691 int pos = 0;
692
693 while (chain) {
694 bio_for_each_segment(bv, chain, i) {
695 if (pos + bv->bv_len > start_ofs) {
696 int remainder = max(start_ofs - pos, 0);
697 buf = bvec_kmap_irq(bv, &flags);
698 memset(buf + remainder, 0,
699 bv->bv_len - remainder);
85b5aaa6 700 bvec_kunmap_irq(buf, &flags);
602adf40
YS
701 }
702 pos += bv->bv_len;
703 }
704
705 chain = chain->bi_next;
706 }
707}
708
709/*
710 * bio_chain_clone - clone a chain of bios up to a certain length.
711 * might return a bio_pair that will need to be released.
712 */
713static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
714 struct bio_pair **bp,
715 int len, gfp_t gfpmask)
716{
717 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
718 int total = 0;
719
720 if (*bp) {
721 bio_pair_release(*bp);
722 *bp = NULL;
723 }
724
725 while (old_chain && (total < len)) {
726 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
727 if (!tmp)
728 goto err_out;
729
730 if (total + old_chain->bi_size > len) {
731 struct bio_pair *bp;
732
733 /*
734 * this split can only happen with a single paged bio,
735 * split_bio will BUG_ON if this is not the case
736 */
737 dout("bio_chain_clone split! total=%d remaining=%d"
738 "bi_size=%d\n",
739 (int)total, (int)len-total,
740 (int)old_chain->bi_size);
741
742 /* split the bio. We'll release it either in the next
743 call, or it will have to be released outside */
744 bp = bio_split(old_chain, (len - total) / 512ULL);
745 if (!bp)
746 goto err_out;
747
748 __bio_clone(tmp, &bp->bio1);
749
750 *next = &bp->bio2;
751 } else {
752 __bio_clone(tmp, old_chain);
753 *next = old_chain->bi_next;
754 }
755
756 tmp->bi_bdev = NULL;
757 gfpmask &= ~__GFP_WAIT;
758 tmp->bi_next = NULL;
759
760 if (!new_chain) {
761 new_chain = tail = tmp;
762 } else {
763 tail->bi_next = tmp;
764 tail = tmp;
765 }
766 old_chain = old_chain->bi_next;
767
768 total += tmp->bi_size;
769 }
770
771 BUG_ON(total < len);
772
773 if (tail)
774 tail->bi_next = NULL;
775
776 *old = old_chain;
777
778 return new_chain;
779
780err_out:
781 dout("bio_chain_clone with err\n");
782 bio_chain_put(new_chain);
783 return NULL;
784}
785
786/*
787 * helpers for osd request op vectors.
788 */
789static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
790 int num_ops,
791 int opcode,
792 u32 payload_len)
793{
794 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
795 GFP_NOIO);
796 if (!*ops)
797 return -ENOMEM;
798 (*ops)[0].op = opcode;
799 /*
800 * op extent offset and length will be set later on
801 * in calc_raw_layout()
802 */
803 (*ops)[0].payload_len = payload_len;
804 return 0;
805}
806
807static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
808{
809 kfree(ops);
810}
811
1fec7093
YS
812static void rbd_coll_end_req_index(struct request *rq,
813 struct rbd_req_coll *coll,
814 int index,
815 int ret, u64 len)
816{
817 struct request_queue *q;
818 int min, max, i;
819
820 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
821 coll, index, ret, len);
822
823 if (!rq)
824 return;
825
826 if (!coll) {
827 blk_end_request(rq, ret, len);
828 return;
829 }
830
831 q = rq->q;
832
833 spin_lock_irq(q->queue_lock);
834 coll->status[index].done = 1;
835 coll->status[index].rc = ret;
836 coll->status[index].bytes = len;
837 max = min = coll->num_done;
838 while (max < coll->total && coll->status[max].done)
839 max++;
840
841 for (i = min; i<max; i++) {
842 __blk_end_request(rq, coll->status[i].rc,
843 coll->status[i].bytes);
844 coll->num_done++;
845 kref_put(&coll->kref, rbd_coll_release);
846 }
847 spin_unlock_irq(q->queue_lock);
848}
849
850static void rbd_coll_end_req(struct rbd_request *req,
851 int ret, u64 len)
852{
853 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
854}
855
602adf40
YS
856/*
857 * Send ceph osd request
858 */
859static int rbd_do_request(struct request *rq,
860 struct rbd_device *dev,
861 struct ceph_snap_context *snapc,
862 u64 snapid,
863 const char *obj, u64 ofs, u64 len,
864 struct bio *bio,
865 struct page **pages,
866 int num_pages,
867 int flags,
868 struct ceph_osd_req_op *ops,
869 int num_reply,
1fec7093
YS
870 struct rbd_req_coll *coll,
871 int coll_index,
602adf40 872 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
873 struct ceph_msg *msg),
874 struct ceph_osd_request **linger_req,
875 u64 *ver)
602adf40
YS
876{
877 struct ceph_osd_request *req;
878 struct ceph_file_layout *layout;
879 int ret;
880 u64 bno;
881 struct timespec mtime = CURRENT_TIME;
882 struct rbd_request *req_data;
883 struct ceph_osd_request_head *reqhead;
884 struct rbd_image_header *header = &dev->header;
1dbb4399 885 struct ceph_osd_client *osdc;
602adf40 886
602adf40 887 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
888 if (!req_data) {
889 if (coll)
890 rbd_coll_end_req_index(rq, coll, coll_index,
891 -ENOMEM, len);
892 return -ENOMEM;
893 }
894
895 if (coll) {
896 req_data->coll = coll;
897 req_data->coll_index = coll_index;
898 }
602adf40 899
1fec7093 900 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40
YS
901
902 down_read(&header->snap_rwsem);
903
1dbb4399
AE
904 osdc = &dev->rbd_client->client->osdc;
905 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
906 false, GFP_NOIO, pages, bio);
4ad12621 907 if (!req) {
602adf40 908 up_read(&header->snap_rwsem);
4ad12621 909 ret = -ENOMEM;
602adf40
YS
910 goto done_pages;
911 }
912
913 req->r_callback = rbd_cb;
914
915 req_data->rq = rq;
916 req_data->bio = bio;
917 req_data->pages = pages;
918 req_data->len = len;
919
920 req->r_priv = req_data;
921
922 reqhead = req->r_request->front.iov_base;
923 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
924
925 strncpy(req->r_oid, obj, sizeof(req->r_oid));
926 req->r_oid_len = strlen(req->r_oid);
927
928 layout = &req->r_file_layout;
929 memset(layout, 0, sizeof(*layout));
930 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_stripe_count = cpu_to_le32(1);
932 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933 layout->fl_pg_preferred = cpu_to_le32(-1);
934 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
1dbb4399
AE
935 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
936 req, ops);
602adf40
YS
937
938 ceph_osdc_build_request(req, ofs, &len,
939 ops,
940 snapc,
941 &mtime,
942 req->r_oid, req->r_oid_len);
943 up_read(&header->snap_rwsem);
944
59c2be1e 945 if (linger_req) {
1dbb4399 946 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
947 *linger_req = req;
948 }
949
1dbb4399 950 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
951 if (ret < 0)
952 goto done_err;
953
954 if (!rbd_cb) {
1dbb4399 955 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
956 if (ver)
957 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
958 dout("reassert_ver=%lld\n",
959 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
960 ceph_osdc_put_request(req);
961 }
962 return ret;
963
964done_err:
965 bio_chain_put(req_data->bio);
966 ceph_osdc_put_request(req);
967done_pages:
1fec7093 968 rbd_coll_end_req(req_data, ret, len);
602adf40 969 kfree(req_data);
602adf40
YS
970 return ret;
971}
972
973/*
974 * Ceph osd op callback
975 */
976static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
977{
978 struct rbd_request *req_data = req->r_priv;
979 struct ceph_osd_reply_head *replyhead;
980 struct ceph_osd_op *op;
981 __s32 rc;
982 u64 bytes;
983 int read_op;
984
985 /* parse reply */
986 replyhead = msg->front.iov_base;
987 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
988 op = (void *)(replyhead + 1);
989 rc = le32_to_cpu(replyhead->result);
990 bytes = le64_to_cpu(op->extent.length);
991 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
992
993 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
994
995 if (rc == -ENOENT && read_op) {
996 zero_bio_chain(req_data->bio, 0);
997 rc = 0;
998 } else if (rc == 0 && read_op && bytes < req_data->len) {
999 zero_bio_chain(req_data->bio, bytes);
1000 bytes = req_data->len;
1001 }
1002
1fec7093 1003 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1004
1005 if (req_data->bio)
1006 bio_chain_put(req_data->bio);
1007
1008 ceph_osdc_put_request(req);
1009 kfree(req_data);
1010}
1011
59c2be1e
YS
1012static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1013{
1014 ceph_osdc_put_request(req);
1015}
1016
602adf40
YS
1017/*
1018 * Do a synchronous ceph osd operation
1019 */
1020static int rbd_req_sync_op(struct rbd_device *dev,
1021 struct ceph_snap_context *snapc,
1022 u64 snapid,
1023 int opcode,
1024 int flags,
1025 struct ceph_osd_req_op *orig_ops,
1026 int num_reply,
1027 const char *obj,
1028 u64 ofs, u64 len,
59c2be1e
YS
1029 char *buf,
1030 struct ceph_osd_request **linger_req,
1031 u64 *ver)
602adf40
YS
1032{
1033 int ret;
1034 struct page **pages;
1035 int num_pages;
1036 struct ceph_osd_req_op *ops = orig_ops;
1037 u32 payload_len;
1038
1039 num_pages = calc_pages_for(ofs , len);
1040 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1041 if (IS_ERR(pages))
1042 return PTR_ERR(pages);
602adf40
YS
1043
1044 if (!orig_ops) {
1045 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1046 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1047 if (ret < 0)
1048 goto done;
1049
1050 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1051 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1052 if (ret < 0)
1053 goto done_ops;
1054 }
1055 }
1056
1057 ret = rbd_do_request(NULL, dev, snapc, snapid,
1058 obj, ofs, len, NULL,
1059 pages, num_pages,
1060 flags,
1061 ops,
1062 2,
1fec7093 1063 NULL, 0,
59c2be1e
YS
1064 NULL,
1065 linger_req, ver);
602adf40
YS
1066 if (ret < 0)
1067 goto done_ops;
1068
1069 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1070 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1071
1072done_ops:
1073 if (!orig_ops)
1074 rbd_destroy_ops(ops);
1075done:
1076 ceph_release_page_vector(pages, num_pages);
1077 return ret;
1078}
1079
1080/*
1081 * Do an asynchronous ceph osd operation
1082 */
1083static int rbd_do_op(struct request *rq,
1084 struct rbd_device *rbd_dev ,
1085 struct ceph_snap_context *snapc,
1086 u64 snapid,
1087 int opcode, int flags, int num_reply,
1088 u64 ofs, u64 len,
1fec7093
YS
1089 struct bio *bio,
1090 struct rbd_req_coll *coll,
1091 int coll_index)
602adf40
YS
1092{
1093 char *seg_name;
1094 u64 seg_ofs;
1095 u64 seg_len;
1096 int ret;
1097 struct ceph_osd_req_op *ops;
1098 u32 payload_len;
1099
1100 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1101 if (!seg_name)
1102 return -ENOMEM;
1103
1104 seg_len = rbd_get_segment(&rbd_dev->header,
1105 rbd_dev->header.block_name,
1106 ofs, len,
1107 seg_name, &seg_ofs);
602adf40
YS
1108
1109 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1110
1111 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1112 if (ret < 0)
1113 goto done;
1114
1115 /* we've taken care of segment sizes earlier when we
1116 cloned the bios. We should never have a segment
1117 truncated at this point */
1118 BUG_ON(seg_len < len);
1119
1120 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121 seg_name, seg_ofs, seg_len,
1122 bio,
1123 NULL, 0,
1124 flags,
1125 ops,
1126 num_reply,
1fec7093 1127 coll, coll_index,
59c2be1e 1128 rbd_req_cb, 0, NULL);
11f77002
SW
1129
1130 rbd_destroy_ops(ops);
602adf40
YS
1131done:
1132 kfree(seg_name);
1133 return ret;
1134}
1135
1136/*
1137 * Request async osd write
1138 */
1139static int rbd_req_write(struct request *rq,
1140 struct rbd_device *rbd_dev,
1141 struct ceph_snap_context *snapc,
1142 u64 ofs, u64 len,
1fec7093
YS
1143 struct bio *bio,
1144 struct rbd_req_coll *coll,
1145 int coll_index)
602adf40
YS
1146{
1147 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1148 CEPH_OSD_OP_WRITE,
1149 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1150 2,
1fec7093 1151 ofs, len, bio, coll, coll_index);
602adf40
YS
1152}
1153
1154/*
1155 * Request async osd read
1156 */
1157static int rbd_req_read(struct request *rq,
1158 struct rbd_device *rbd_dev,
1159 u64 snapid,
1160 u64 ofs, u64 len,
1fec7093
YS
1161 struct bio *bio,
1162 struct rbd_req_coll *coll,
1163 int coll_index)
602adf40
YS
1164{
1165 return rbd_do_op(rq, rbd_dev, NULL,
1166 (snapid ? snapid : CEPH_NOSNAP),
1167 CEPH_OSD_OP_READ,
1168 CEPH_OSD_FLAG_READ,
1169 2,
1fec7093 1170 ofs, len, bio, coll, coll_index);
602adf40
YS
1171}
1172
1173/*
1174 * Request sync osd read
1175 */
1176static int rbd_req_sync_read(struct rbd_device *dev,
1177 struct ceph_snap_context *snapc,
1178 u64 snapid,
1179 const char *obj,
1180 u64 ofs, u64 len,
59c2be1e
YS
1181 char *buf,
1182 u64 *ver)
602adf40
YS
1183{
1184 return rbd_req_sync_op(dev, NULL,
1185 (snapid ? snapid : CEPH_NOSNAP),
1186 CEPH_OSD_OP_READ,
1187 CEPH_OSD_FLAG_READ,
1188 NULL,
59c2be1e 1189 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1190}
1191
1192/*
59c2be1e
YS
1193 * Request sync osd watch
1194 */
1195static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1196 u64 ver,
1197 u64 notify_id,
1198 const char *obj)
1199{
1200 struct ceph_osd_req_op *ops;
1201 struct page **pages = NULL;
11f77002
SW
1202 int ret;
1203
1204 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1205 if (ret < 0)
1206 return ret;
1207
1208 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1209 ops[0].watch.cookie = notify_id;
1210 ops[0].watch.flag = 0;
1211
1212 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1213 obj, 0, 0, NULL,
1214 pages, 0,
1215 CEPH_OSD_FLAG_READ,
1216 ops,
1217 1,
1fec7093 1218 NULL, 0,
59c2be1e
YS
1219 rbd_simple_req_cb, 0, NULL);
1220
1221 rbd_destroy_ops(ops);
1222 return ret;
1223}
1224
1225static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1226{
1227 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1228 int rc;
1229
59c2be1e
YS
1230 if (!dev)
1231 return;
1232
1233 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1234 notify_id, (int)opcode);
1235 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
13143d2d 1236 rc = __rbd_update_snaps(dev);
59c2be1e 1237 mutex_unlock(&ctl_mutex);
13143d2d 1238 if (rc)
f0f8cef5
AE
1239 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1240 " update snaps: %d\n", dev->major, rc);
59c2be1e
YS
1241
1242 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1243}
1244
1245/*
1246 * Request sync osd watch
1247 */
1248static int rbd_req_sync_watch(struct rbd_device *dev,
1249 const char *obj,
1250 u64 ver)
1251{
1252 struct ceph_osd_req_op *ops;
1dbb4399 1253 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1254
1255 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1256 if (ret < 0)
1257 return ret;
1258
1259 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1260 (void *)dev, &dev->watch_event);
1261 if (ret < 0)
1262 goto fail;
1263
1264 ops[0].watch.ver = cpu_to_le64(ver);
1265 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1266 ops[0].watch.flag = 1;
1267
1268 ret = rbd_req_sync_op(dev, NULL,
1269 CEPH_NOSNAP,
1270 0,
1271 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1272 ops,
1273 1, obj, 0, 0, NULL,
1274 &dev->watch_request, NULL);
1275
1276 if (ret < 0)
1277 goto fail_event;
1278
1279 rbd_destroy_ops(ops);
1280 return 0;
1281
1282fail_event:
1283 ceph_osdc_cancel_event(dev->watch_event);
1284 dev->watch_event = NULL;
1285fail:
1286 rbd_destroy_ops(ops);
1287 return ret;
1288}
1289
79e3057c
YS
1290/*
1291 * Request sync osd unwatch
1292 */
1293static int rbd_req_sync_unwatch(struct rbd_device *dev,
1294 const char *obj)
1295{
1296 struct ceph_osd_req_op *ops;
1297
1298 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1299 if (ret < 0)
1300 return ret;
1301
1302 ops[0].watch.ver = 0;
1303 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1304 ops[0].watch.flag = 0;
1305
1306 ret = rbd_req_sync_op(dev, NULL,
1307 CEPH_NOSNAP,
1308 0,
1309 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1310 ops,
1311 1, obj, 0, 0, NULL, NULL, NULL);
1312
1313 rbd_destroy_ops(ops);
1314 ceph_osdc_cancel_event(dev->watch_event);
1315 dev->watch_event = NULL;
1316 return ret;
1317}
1318
59c2be1e
YS
1319struct rbd_notify_info {
1320 struct rbd_device *dev;
1321};
1322
1323static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1324{
1325 struct rbd_device *dev = (struct rbd_device *)data;
1326 if (!dev)
1327 return;
1328
1329 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1330 notify_id, (int)opcode);
1331}
1332
1333/*
1334 * Request sync osd notify
1335 */
1336static int rbd_req_sync_notify(struct rbd_device *dev,
1337 const char *obj)
1338{
1339 struct ceph_osd_req_op *ops;
1dbb4399 1340 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1341 struct ceph_osd_event *event;
1342 struct rbd_notify_info info;
1343 int payload_len = sizeof(u32) + sizeof(u32);
1344 int ret;
1345
1346 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1347 if (ret < 0)
1348 return ret;
1349
1350 info.dev = dev;
1351
1352 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1353 (void *)&info, &event);
1354 if (ret < 0)
1355 goto fail;
1356
1357 ops[0].watch.ver = 1;
1358 ops[0].watch.flag = 1;
1359 ops[0].watch.cookie = event->cookie;
1360 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1361 ops[0].watch.timeout = 12;
1362
1363 ret = rbd_req_sync_op(dev, NULL,
1364 CEPH_NOSNAP,
1365 0,
1366 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1367 ops,
1368 1, obj, 0, 0, NULL, NULL, NULL);
1369 if (ret < 0)
1370 goto fail_event;
1371
1372 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1373 dout("ceph_osdc_wait_event returned %d\n", ret);
1374 rbd_destroy_ops(ops);
1375 return 0;
1376
1377fail_event:
1378 ceph_osdc_cancel_event(event);
1379fail:
1380 rbd_destroy_ops(ops);
1381 return ret;
1382}
1383
602adf40
YS
1384/*
1385 * Request sync osd read
1386 */
1387static int rbd_req_sync_exec(struct rbd_device *dev,
1388 const char *obj,
1389 const char *cls,
1390 const char *method,
1391 const char *data,
59c2be1e
YS
1392 int len,
1393 u64 *ver)
602adf40
YS
1394{
1395 struct ceph_osd_req_op *ops;
1396 int cls_len = strlen(cls);
1397 int method_len = strlen(method);
1398 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1399 cls_len + method_len + len);
1400 if (ret < 0)
1401 return ret;
1402
1403 ops[0].cls.class_name = cls;
1404 ops[0].cls.class_len = (__u8)cls_len;
1405 ops[0].cls.method_name = method;
1406 ops[0].cls.method_len = (__u8)method_len;
1407 ops[0].cls.argc = 0;
1408 ops[0].cls.indata = data;
1409 ops[0].cls.indata_len = len;
1410
1411 ret = rbd_req_sync_op(dev, NULL,
1412 CEPH_NOSNAP,
1413 0,
1414 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1415 ops,
59c2be1e 1416 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1417
1418 rbd_destroy_ops(ops);
1419
1420 dout("cls_exec returned %d\n", ret);
1421 return ret;
1422}
1423
1fec7093
YS
1424static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1425{
1426 struct rbd_req_coll *coll =
1427 kzalloc(sizeof(struct rbd_req_coll) +
1428 sizeof(struct rbd_req_status) * num_reqs,
1429 GFP_ATOMIC);
1430
1431 if (!coll)
1432 return NULL;
1433 coll->total = num_reqs;
1434 kref_init(&coll->kref);
1435 return coll;
1436}
1437
602adf40
YS
1438/*
1439 * block device queue callback
1440 */
1441static void rbd_rq_fn(struct request_queue *q)
1442{
1443 struct rbd_device *rbd_dev = q->queuedata;
1444 struct request *rq;
1445 struct bio_pair *bp = NULL;
1446
1447 rq = blk_fetch_request(q);
1448
1449 while (1) {
1450 struct bio *bio;
1451 struct bio *rq_bio, *next_bio = NULL;
1452 bool do_write;
1453 int size, op_size = 0;
1454 u64 ofs;
1fec7093
YS
1455 int num_segs, cur_seg = 0;
1456 struct rbd_req_coll *coll;
602adf40
YS
1457
1458 /* peek at request from block layer */
1459 if (!rq)
1460 break;
1461
1462 dout("fetched request\n");
1463
1464 /* filter out block requests we don't understand */
1465 if ((rq->cmd_type != REQ_TYPE_FS)) {
1466 __blk_end_request_all(rq, 0);
1467 goto next;
1468 }
1469
1470 /* deduce our operation (read, write) */
1471 do_write = (rq_data_dir(rq) == WRITE);
1472
1473 size = blk_rq_bytes(rq);
1474 ofs = blk_rq_pos(rq) * 512ULL;
1475 rq_bio = rq->bio;
1476 if (do_write && rbd_dev->read_only) {
1477 __blk_end_request_all(rq, -EROFS);
1478 goto next;
1479 }
1480
1481 spin_unlock_irq(q->queue_lock);
1482
1483 dout("%s 0x%x bytes at 0x%llx\n",
1484 do_write ? "write" : "read",
1485 size, blk_rq_pos(rq) * 512ULL);
1486
1fec7093
YS
1487 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1488 coll = rbd_alloc_coll(num_segs);
1489 if (!coll) {
1490 spin_lock_irq(q->queue_lock);
1491 __blk_end_request_all(rq, -ENOMEM);
1492 goto next;
1493 }
1494
602adf40
YS
1495 do {
1496 /* a bio clone to be passed down to OSD req */
1497 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1498 op_size = rbd_get_segment(&rbd_dev->header,
1499 rbd_dev->header.block_name,
1500 ofs, size,
1501 NULL, NULL);
1fec7093 1502 kref_get(&coll->kref);
602adf40
YS
1503 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1504 op_size, GFP_ATOMIC);
1505 if (!bio) {
1fec7093
YS
1506 rbd_coll_end_req_index(rq, coll, cur_seg,
1507 -ENOMEM, op_size);
1508 goto next_seg;
602adf40
YS
1509 }
1510
1fec7093 1511
602adf40
YS
1512 /* init OSD command: write or read */
1513 if (do_write)
1514 rbd_req_write(rq, rbd_dev,
1515 rbd_dev->header.snapc,
1516 ofs,
1fec7093
YS
1517 op_size, bio,
1518 coll, cur_seg);
602adf40
YS
1519 else
1520 rbd_req_read(rq, rbd_dev,
1521 cur_snap_id(rbd_dev),
1522 ofs,
1fec7093
YS
1523 op_size, bio,
1524 coll, cur_seg);
602adf40 1525
1fec7093 1526next_seg:
602adf40
YS
1527 size -= op_size;
1528 ofs += op_size;
1529
1fec7093 1530 cur_seg++;
602adf40
YS
1531 rq_bio = next_bio;
1532 } while (size > 0);
1fec7093 1533 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1534
1535 if (bp)
1536 bio_pair_release(bp);
602adf40
YS
1537 spin_lock_irq(q->queue_lock);
1538next:
1539 rq = blk_fetch_request(q);
1540 }
1541}
1542
1543/*
1544 * a queue callback. Makes sure that we don't create a bio that spans across
1545 * multiple osd objects. One exception would be with a single page bios,
1546 * which we handle later at bio_chain_clone
1547 */
1548static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1549 struct bio_vec *bvec)
1550{
1551 struct rbd_device *rbd_dev = q->queuedata;
1552 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1553 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1554 unsigned int bio_sectors = bmd->bi_size >> 9;
1555 int max;
1556
1557 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1558 + bio_sectors)) << 9;
1559 if (max < 0)
1560 max = 0; /* bio_add cannot handle a negative return */
1561 if (max <= bvec->bv_len && bio_sectors == 0)
1562 return bvec->bv_len;
1563 return max;
1564}
1565
1566static void rbd_free_disk(struct rbd_device *rbd_dev)
1567{
1568 struct gendisk *disk = rbd_dev->disk;
1569
1570 if (!disk)
1571 return;
1572
1573 rbd_header_free(&rbd_dev->header);
1574
1575 if (disk->flags & GENHD_FL_UP)
1576 del_gendisk(disk);
1577 if (disk->queue)
1578 blk_cleanup_queue(disk->queue);
1579 put_disk(disk);
1580}
1581
1582/*
1583 * reload the ondisk the header
1584 */
1585static int rbd_read_header(struct rbd_device *rbd_dev,
1586 struct rbd_image_header *header)
1587{
1588 ssize_t rc;
1589 struct rbd_image_header_ondisk *dh;
1590 int snap_count = 0;
1591 u64 snap_names_len = 0;
59c2be1e 1592 u64 ver;
602adf40
YS
1593
1594 while (1) {
1595 int len = sizeof(*dh) +
1596 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1597 snap_names_len;
1598
1599 rc = -ENOMEM;
1600 dh = kmalloc(len, GFP_KERNEL);
1601 if (!dh)
1602 return -ENOMEM;
1603
1604 rc = rbd_req_sync_read(rbd_dev,
1605 NULL, CEPH_NOSNAP,
1606 rbd_dev->obj_md_name,
1607 0, len,
59c2be1e 1608 (char *)dh, &ver);
602adf40
YS
1609 if (rc < 0)
1610 goto out_dh;
1611
1612 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb
JD
1613 if (rc < 0) {
1614 if (rc == -ENXIO) {
1615 pr_warning("unrecognized header format"
1616 " for image %s", rbd_dev->obj);
1617 }
602adf40 1618 goto out_dh;
81e759fb 1619 }
602adf40
YS
1620
1621 if (snap_count != header->total_snaps) {
1622 snap_count = header->total_snaps;
1623 snap_names_len = header->snap_names_len;
1624 rbd_header_free(header);
1625 kfree(dh);
1626 continue;
1627 }
1628 break;
1629 }
59c2be1e 1630 header->obj_version = ver;
602adf40
YS
1631
1632out_dh:
1633 kfree(dh);
1634 return rc;
1635}
1636
1637/*
1638 * create a snapshot
1639 */
1640static int rbd_header_add_snap(struct rbd_device *dev,
1641 const char *snap_name,
1642 gfp_t gfp_flags)
1643{
1644 int name_len = strlen(snap_name);
1645 u64 new_snapid;
1646 int ret;
916d4d67 1647 void *data, *p, *e;
59c2be1e 1648 u64 ver;
1dbb4399 1649 struct ceph_mon_client *monc;
602adf40
YS
1650
1651 /* we should create a snapshot only if we're pointing at the head */
1652 if (dev->cur_snap)
1653 return -EINVAL;
1654
1dbb4399
AE
1655 monc = &dev->rbd_client->client->monc;
1656 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
602adf40
YS
1657 dout("created snapid=%lld\n", new_snapid);
1658 if (ret < 0)
1659 return ret;
1660
1661 data = kmalloc(name_len + 16, gfp_flags);
1662 if (!data)
1663 return -ENOMEM;
1664
916d4d67
SW
1665 p = data;
1666 e = data + name_len + 16;
602adf40 1667
916d4d67
SW
1668 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1669 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1670
1671 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1672 data, p - data, &ver);
602adf40 1673
916d4d67 1674 kfree(data);
602adf40
YS
1675
1676 if (ret < 0)
1677 return ret;
1678
1679 dev->header.snapc->seq = new_snapid;
1680
1681 return 0;
1682bad:
1683 return -ERANGE;
1684}
1685
dfc5606d
YS
1686static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1687{
1688 struct rbd_snap *snap;
1689
1690 while (!list_empty(&rbd_dev->snaps)) {
1691 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1692 __rbd_remove_snap_dev(rbd_dev, snap);
1693 }
1694}
1695
602adf40
YS
1696/*
1697 * only read the first part of the ondisk header, without the snaps info
1698 */
dfc5606d 1699static int __rbd_update_snaps(struct rbd_device *rbd_dev)
602adf40
YS
1700{
1701 int ret;
1702 struct rbd_image_header h;
1703 u64 snap_seq;
59c2be1e 1704 int follow_seq = 0;
602adf40
YS
1705
1706 ret = rbd_read_header(rbd_dev, &h);
1707 if (ret < 0)
1708 return ret;
1709
9db4b3e3
SW
1710 /* resized? */
1711 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1712
602adf40
YS
1713 down_write(&rbd_dev->header.snap_rwsem);
1714
1715 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1716 if (rbd_dev->header.total_snaps &&
1717 rbd_dev->header.snapc->snaps[0] == snap_seq)
1718 /* pointing at the head, will need to follow that
1719 if head moves */
1720 follow_seq = 1;
602adf40
YS
1721
1722 kfree(rbd_dev->header.snapc);
1723 kfree(rbd_dev->header.snap_names);
1724 kfree(rbd_dev->header.snap_sizes);
1725
1726 rbd_dev->header.total_snaps = h.total_snaps;
1727 rbd_dev->header.snapc = h.snapc;
1728 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1729 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1730 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1731 if (follow_seq)
1732 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1733 else
1734 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1735
dfc5606d
YS
1736 ret = __rbd_init_snaps_header(rbd_dev);
1737
602adf40
YS
1738 up_write(&rbd_dev->header.snap_rwsem);
1739
dfc5606d 1740 return ret;
602adf40
YS
1741}
1742
1743static int rbd_init_disk(struct rbd_device *rbd_dev)
1744{
1745 struct gendisk *disk;
1746 struct request_queue *q;
1747 int rc;
1748 u64 total_size = 0;
1749
1750 /* contact OSD, request size info about the object being mapped */
1751 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1752 if (rc)
1753 return rc;
1754
dfc5606d
YS
1755 /* no need to lock here, as rbd_dev is not registered yet */
1756 rc = __rbd_init_snaps_header(rbd_dev);
1757 if (rc)
1758 return rc;
1759
cc9d734c 1760 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1761 if (rc)
1762 return rc;
1763
1764 /* create gendisk info */
1765 rc = -ENOMEM;
1766 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1767 if (!disk)
1768 goto out;
1769
f0f8cef5 1770 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
aedfec59 1771 rbd_dev->id);
602adf40
YS
1772 disk->major = rbd_dev->major;
1773 disk->first_minor = 0;
1774 disk->fops = &rbd_bd_ops;
1775 disk->private_data = rbd_dev;
1776
1777 /* init rq */
1778 rc = -ENOMEM;
1779 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1780 if (!q)
1781 goto out_disk;
029bcbd8
JD
1782
1783 /* set io sizes to object size */
1784 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1785 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1786 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1787 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1788
602adf40
YS
1789 blk_queue_merge_bvec(q, rbd_merge_bvec);
1790 disk->queue = q;
1791
1792 q->queuedata = rbd_dev;
1793
1794 rbd_dev->disk = disk;
1795 rbd_dev->q = q;
1796
1797 /* finally, announce the disk to the world */
1798 set_capacity(disk, total_size / 512ULL);
1799 add_disk(disk);
1800
1801 pr_info("%s: added with size 0x%llx\n",
1802 disk->disk_name, (unsigned long long)total_size);
1803 return 0;
1804
1805out_disk:
1806 put_disk(disk);
1807out:
1808 return rc;
1809}
1810
dfc5606d
YS
1811/*
1812 sysfs
1813*/
1814
1815static ssize_t rbd_size_show(struct device *dev,
1816 struct device_attribute *attr, char *buf)
1817{
1818 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1819
1820 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1821}
1822
1823static ssize_t rbd_major_show(struct device *dev,
1824 struct device_attribute *attr, char *buf)
1825{
1826 struct rbd_device *rbd_dev = dev_to_rbd(dev);
602adf40 1827
dfc5606d
YS
1828 return sprintf(buf, "%d\n", rbd_dev->major);
1829}
1830
1831static ssize_t rbd_client_id_show(struct device *dev,
1832 struct device_attribute *attr, char *buf)
602adf40 1833{
dfc5606d
YS
1834 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1835
1dbb4399
AE
1836 return sprintf(buf, "client%lld\n",
1837 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1838}
1839
dfc5606d
YS
1840static ssize_t rbd_pool_show(struct device *dev,
1841 struct device_attribute *attr, char *buf)
602adf40 1842{
dfc5606d
YS
1843 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1844
1845 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1846}
1847
1848static ssize_t rbd_name_show(struct device *dev,
1849 struct device_attribute *attr, char *buf)
1850{
1851 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1852
1853 return sprintf(buf, "%s\n", rbd_dev->obj);
1854}
1855
1856static ssize_t rbd_snap_show(struct device *dev,
1857 struct device_attribute *attr,
1858 char *buf)
1859{
1860 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1861
1862 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1863}
1864
1865static ssize_t rbd_image_refresh(struct device *dev,
1866 struct device_attribute *attr,
1867 const char *buf,
1868 size_t size)
1869{
1870 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1871 int rc;
1872 int ret = size;
602adf40
YS
1873
1874 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1875
dfc5606d
YS
1876 rc = __rbd_update_snaps(rbd_dev);
1877 if (rc < 0)
1878 ret = rc;
602adf40 1879
dfc5606d
YS
1880 mutex_unlock(&ctl_mutex);
1881 return ret;
1882}
602adf40 1883
dfc5606d
YS
1884static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1885static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1886static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1887static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1888static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1889static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1890static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1891static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1892
1893static struct attribute *rbd_attrs[] = {
1894 &dev_attr_size.attr,
1895 &dev_attr_major.attr,
1896 &dev_attr_client_id.attr,
1897 &dev_attr_pool.attr,
1898 &dev_attr_name.attr,
1899 &dev_attr_current_snap.attr,
1900 &dev_attr_refresh.attr,
1901 &dev_attr_create_snap.attr,
dfc5606d
YS
1902 NULL
1903};
1904
1905static struct attribute_group rbd_attr_group = {
1906 .attrs = rbd_attrs,
1907};
1908
1909static const struct attribute_group *rbd_attr_groups[] = {
1910 &rbd_attr_group,
1911 NULL
1912};
1913
1914static void rbd_sysfs_dev_release(struct device *dev)
1915{
1916}
1917
1918static struct device_type rbd_device_type = {
1919 .name = "rbd",
1920 .groups = rbd_attr_groups,
1921 .release = rbd_sysfs_dev_release,
1922};
1923
1924
1925/*
1926 sysfs - snapshots
1927*/
1928
1929static ssize_t rbd_snap_size_show(struct device *dev,
1930 struct device_attribute *attr,
1931 char *buf)
1932{
1933 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1934
1935 return sprintf(buf, "%lld\n", (long long)snap->size);
1936}
1937
1938static ssize_t rbd_snap_id_show(struct device *dev,
1939 struct device_attribute *attr,
1940 char *buf)
1941{
1942 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1943
1944 return sprintf(buf, "%lld\n", (long long)snap->id);
1945}
1946
1947static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1948static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1949
1950static struct attribute *rbd_snap_attrs[] = {
1951 &dev_attr_snap_size.attr,
1952 &dev_attr_snap_id.attr,
1953 NULL,
1954};
1955
1956static struct attribute_group rbd_snap_attr_group = {
1957 .attrs = rbd_snap_attrs,
1958};
1959
1960static void rbd_snap_dev_release(struct device *dev)
1961{
1962 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1963 kfree(snap->name);
1964 kfree(snap);
1965}
1966
1967static const struct attribute_group *rbd_snap_attr_groups[] = {
1968 &rbd_snap_attr_group,
1969 NULL
1970};
1971
1972static struct device_type rbd_snap_device_type = {
1973 .groups = rbd_snap_attr_groups,
1974 .release = rbd_snap_dev_release,
1975};
1976
1977static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1978 struct rbd_snap *snap)
1979{
1980 list_del(&snap->node);
1981 device_unregister(&snap->dev);
1982}
1983
1984static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1985 struct rbd_snap *snap,
1986 struct device *parent)
1987{
1988 struct device *dev = &snap->dev;
1989 int ret;
1990
1991 dev->type = &rbd_snap_device_type;
1992 dev->parent = parent;
1993 dev->release = rbd_snap_dev_release;
1994 dev_set_name(dev, "snap_%s", snap->name);
1995 ret = device_register(dev);
1996
1997 return ret;
1998}
1999
2000static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2001 int i, const char *name,
2002 struct rbd_snap **snapp)
2003{
2004 int ret;
2005 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2006 if (!snap)
2007 return -ENOMEM;
2008 snap->name = kstrdup(name, GFP_KERNEL);
2009 snap->size = rbd_dev->header.snap_sizes[i];
2010 snap->id = rbd_dev->header.snapc->snaps[i];
2011 if (device_is_registered(&rbd_dev->dev)) {
2012 ret = rbd_register_snap_dev(rbd_dev, snap,
2013 &rbd_dev->dev);
2014 if (ret < 0)
2015 goto err;
2016 }
2017 *snapp = snap;
2018 return 0;
2019err:
2020 kfree(snap->name);
2021 kfree(snap);
2022 return ret;
2023}
2024
2025/*
2026 * search for the previous snap in a null delimited string list
2027 */
2028const char *rbd_prev_snap_name(const char *name, const char *start)
2029{
2030 if (name < start + 2)
2031 return NULL;
2032
2033 name -= 2;
2034 while (*name) {
2035 if (name == start)
2036 return start;
2037 name--;
2038 }
2039 return name + 1;
2040}
2041
2042/*
2043 * compare the old list of snapshots that we have to what's in the header
2044 * and update it accordingly. Note that the header holds the snapshots
2045 * in a reverse order (from newest to oldest) and we need to go from
2046 * older to new so that we don't get a duplicate snap name when
2047 * doing the process (e.g., removed snapshot and recreated a new
2048 * one with the same name.
2049 */
2050static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2051{
2052 const char *name, *first_name;
2053 int i = rbd_dev->header.total_snaps;
2054 struct rbd_snap *snap, *old_snap = NULL;
2055 int ret;
2056 struct list_head *p, *n;
2057
2058 first_name = rbd_dev->header.snap_names;
2059 name = first_name + rbd_dev->header.snap_names_len;
2060
2061 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2062 u64 cur_id;
2063
2064 old_snap = list_entry(p, struct rbd_snap, node);
2065
2066 if (i)
2067 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2068
2069 if (!i || old_snap->id < cur_id) {
2070 /* old_snap->id was skipped, thus was removed */
2071 __rbd_remove_snap_dev(rbd_dev, old_snap);
2072 continue;
2073 }
2074 if (old_snap->id == cur_id) {
2075 /* we have this snapshot already */
2076 i--;
2077 name = rbd_prev_snap_name(name, first_name);
2078 continue;
2079 }
2080 for (; i > 0;
2081 i--, name = rbd_prev_snap_name(name, first_name)) {
2082 if (!name) {
2083 WARN_ON(1);
2084 return -EINVAL;
2085 }
2086 cur_id = rbd_dev->header.snapc->snaps[i];
2087 /* snapshot removal? handle it above */
2088 if (cur_id >= old_snap->id)
2089 break;
2090 /* a new snapshot */
2091 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2092 if (ret < 0)
2093 return ret;
2094
2095 /* note that we add it backward so using n and not p */
2096 list_add(&snap->node, n);
2097 p = &snap->node;
2098 }
2099 }
2100 /* we're done going over the old snap list, just add what's left */
2101 for (; i > 0; i--) {
2102 name = rbd_prev_snap_name(name, first_name);
2103 if (!name) {
2104 WARN_ON(1);
2105 return -EINVAL;
2106 }
2107 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2108 if (ret < 0)
2109 return ret;
2110 list_add(&snap->node, &rbd_dev->snaps);
2111 }
2112
2113 return 0;
2114}
2115
dfc5606d
YS
2116static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2117{
f0f8cef5 2118 int ret;
dfc5606d
YS
2119 struct device *dev;
2120 struct rbd_snap *snap;
2121
2122 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2123 dev = &rbd_dev->dev;
2124
2125 dev->bus = &rbd_bus_type;
2126 dev->type = &rbd_device_type;
2127 dev->parent = &rbd_root_dev;
2128 dev->release = rbd_dev_release;
2129 dev_set_name(dev, "%d", rbd_dev->id);
2130 ret = device_register(dev);
2131 if (ret < 0)
f0f8cef5 2132 goto out;
dfc5606d
YS
2133
2134 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2135 ret = rbd_register_snap_dev(rbd_dev, snap,
2136 &rbd_dev->dev);
2137 if (ret < 0)
602adf40
YS
2138 break;
2139 }
f0f8cef5 2140out:
dfc5606d
YS
2141 mutex_unlock(&ctl_mutex);
2142 return ret;
602adf40
YS
2143}
2144
dfc5606d
YS
2145static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2146{
2147 device_unregister(&rbd_dev->dev);
2148}
2149
59c2be1e
YS
2150static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2151{
2152 int ret, rc;
2153
2154 do {
2155 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2156 rbd_dev->header.obj_version);
2157 if (ret == -ERANGE) {
2158 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2159 rc = __rbd_update_snaps(rbd_dev);
2160 mutex_unlock(&ctl_mutex);
2161 if (rc < 0)
2162 return rc;
2163 }
2164 } while (ret == -ERANGE);
2165
2166 return ret;
2167}
2168
1ddbe94e
AE
2169static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2170
2171/*
499afd5b
AE
2172 * Get a unique rbd identifier for the given new rbd_dev, and add
2173 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2174 */
499afd5b 2175static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2176{
499afd5b
AE
2177 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2178
2179 spin_lock(&rbd_dev_list_lock);
2180 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2181 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2182}
b7f23c36 2183
1ddbe94e 2184/*
499afd5b
AE
2185 * Remove an rbd_dev from the global list, and record that its
2186 * identifier is no longer in use.
1ddbe94e 2187 */
499afd5b 2188static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2189{
d184f6bf
AE
2190 struct list_head *tmp;
2191 int rbd_id = rbd_dev->id;
2192 int max_id;
2193
2194 BUG_ON(rbd_id < 1);
499afd5b
AE
2195
2196 spin_lock(&rbd_dev_list_lock);
2197 list_del_init(&rbd_dev->node);
d184f6bf
AE
2198
2199 /*
2200 * If the id being "put" is not the current maximum, there
2201 * is nothing special we need to do.
2202 */
2203 if (rbd_id != atomic64_read(&rbd_id_max)) {
2204 spin_unlock(&rbd_dev_list_lock);
2205 return;
2206 }
2207
2208 /*
2209 * We need to update the current maximum id. Search the
2210 * list to find out what it is. We're more likely to find
2211 * the maximum at the end, so search the list backward.
2212 */
2213 max_id = 0;
2214 list_for_each_prev(tmp, &rbd_dev_list) {
2215 struct rbd_device *rbd_dev;
2216
2217 rbd_dev = list_entry(tmp, struct rbd_device, node);
2218 if (rbd_id > max_id)
2219 max_id = rbd_id;
2220 }
499afd5b 2221 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2222
1ddbe94e 2223 /*
d184f6bf
AE
2224 * The max id could have been updated by rbd_id_get(), in
2225 * which case it now accurately reflects the new maximum.
2226 * Be careful not to overwrite the maximum value in that
2227 * case.
1ddbe94e 2228 */
d184f6bf 2229 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2230}
2231
e28fff26
AE
2232/*
2233 * Skips over white space at *buf, and updates *buf to point to the
2234 * first found non-space character (if any). Returns the length of
2235 * the token (string of non-white space characters) found.
2236 */
2237static inline size_t next_token(const char **buf)
2238{
2239 /*
2240 * These are the characters that produce nonzero for
2241 * isspace() in the "C" and "POSIX" locales.
2242 */
2243 const char *spaces = " \f\n\r\t\v";
2244
2245 *buf += strspn(*buf, spaces); /* Find start of token */
2246
2247 return strcspn(*buf, spaces); /* Return token length */
2248}
2249
2250/*
2251 * Finds the next token in *buf, and if the provided token buffer is
2252 * big enough, copies the found token into it. The result, if
2253 * copied, is guaranteed to be terminated with '\0'.
2254 *
2255 * Returns the length of the token found (not including the '\0').
2256 * Return value will be 0 if no token is found, and it will be >=
2257 * token_size if the token would not fit.
2258 *
2259 * The *buf pointer will be updated point beyond the end of the
2260 * found token. Note that this occurs even if the token buffer is
2261 * too small to hold it.
2262 */
2263static inline size_t copy_token(const char **buf,
2264 char *token,
2265 size_t token_size)
2266{
2267 size_t len;
2268
2269 len = next_token(buf);
2270 if (len < token_size) {
2271 memcpy(token, *buf, len);
2272 *(token + len) = '\0';
2273 }
2274 *buf += len;
2275
2276 return len;
2277}
2278
a725f65e
AE
2279/*
2280 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2281 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2282 * on the list of monitor addresses and other options provided via
2283 * /sys/bus/rbd/add.
2284 */
2285static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2286 const char *buf,
2287 char *mon_addrs,
5214ecc4 2288 size_t *mon_addrs_size,
e28fff26
AE
2289 char *options,
2290 size_t options_size)
2291{
2292 size_t len;
2293
2294 /* The first four tokens are required */
2295
5214ecc4
AE
2296 len = copy_token(&buf, mon_addrs, *mon_addrs_size);
2297 if (!len || len >= *mon_addrs_size)
a725f65e 2298 return -EINVAL;
5214ecc4 2299 *mon_addrs_size = len + 1;
a725f65e 2300
e28fff26
AE
2301 len = copy_token(&buf, options, options_size);
2302 if (!len || len >= options_size)
2303 return -EINVAL;
2304
2305 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2306 if (!len || len >= sizeof (rbd_dev->pool_name))
2307 return -EINVAL;
2308
2309 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2310 if (!len || len >= sizeof (rbd_dev->obj))
2311 return -EINVAL;
2312
2313 /* We have the object length in hand, save it. */
2314
2315 rbd_dev->obj_len = len;
a725f65e 2316
81a89793
AE
2317 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2318 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2319 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
a725f65e 2320
e28fff26
AE
2321 /*
2322 * The snapshot name is optional, but it's an error if it's
2323 * too long. If no snapshot is supplied, fill in the default.
2324 */
2325 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2326 if (!len)
2327 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2328 sizeof (RBD_SNAP_HEAD_NAME));
2329 else if (len >= sizeof (rbd_dev->snap_name))
2330 return -EINVAL;
2331
a725f65e
AE
2332 return 0;
2333}
2334
59c2be1e
YS
2335static ssize_t rbd_add(struct bus_type *bus,
2336 const char *buf,
2337 size_t count)
602adf40 2338{
602adf40 2339 struct rbd_device *rbd_dev;
a725f65e 2340 char *mon_addrs = NULL;
5214ecc4 2341 size_t mon_addrs_size;
27cc2594
AE
2342 char *options = NULL;
2343 struct ceph_osd_client *osdc;
2344 int rc = -ENOMEM;
602adf40
YS
2345
2346 if (!try_module_get(THIS_MODULE))
2347 return -ENODEV;
2348
27cc2594
AE
2349 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2350 if (!rbd_dev)
2351 goto err_nomem;
a725f65e
AE
2352 mon_addrs = kmalloc(count, GFP_KERNEL);
2353 if (!mon_addrs)
27cc2594 2354 goto err_nomem;
60571c7d 2355 options = kmalloc(count, GFP_KERNEL);
602adf40 2356 if (!options)
27cc2594 2357 goto err_nomem;
602adf40
YS
2358
2359 /* static rbd_device initialization */
2360 spin_lock_init(&rbd_dev->lock);
2361 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2362 INIT_LIST_HEAD(&rbd_dev->snaps);
602adf40 2363
0e805a1d
AE
2364 init_rwsem(&rbd_dev->header.snap_rwsem);
2365
d184f6bf 2366 /* generate unique id: find highest unique id, add one */
499afd5b 2367 rbd_id_get(rbd_dev);
602adf40 2368
a725f65e 2369 /* Fill in the device name, now that we have its id. */
81a89793
AE
2370 BUILD_BUG_ON(DEV_NAME_LEN
2371 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2372 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
a725f65e 2373
602adf40 2374 /* parse add command */
5214ecc4
AE
2375 mon_addrs_size = count;
2376 rc = rbd_add_parse_args(rbd_dev, buf, mon_addrs, &mon_addrs_size,
e28fff26 2377 options, count);
a725f65e 2378 if (rc)
f0f8cef5 2379 goto err_put_id;
e124a82f 2380
5214ecc4
AE
2381 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2382 options);
d720bcb0
AE
2383 if (IS_ERR(rbd_dev->rbd_client)) {
2384 rc = PTR_ERR(rbd_dev->rbd_client);
f0f8cef5 2385 goto err_put_id;
d720bcb0 2386 }
602adf40 2387
602adf40 2388 /* pick the pool */
1dbb4399 2389 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2390 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2391 if (rc < 0)
2392 goto err_out_client;
2393 rbd_dev->poolid = rc;
2394
2395 /* register our block device */
27cc2594
AE
2396 rc = register_blkdev(0, rbd_dev->name);
2397 if (rc < 0)
602adf40 2398 goto err_out_client;
27cc2594 2399 rbd_dev->major = rc;
602adf40 2400
dfc5606d
YS
2401 rc = rbd_bus_add_dev(rbd_dev);
2402 if (rc)
766fc439
YS
2403 goto err_out_blkdev;
2404
602adf40
YS
2405 /* set up and announce blkdev mapping */
2406 rc = rbd_init_disk(rbd_dev);
2407 if (rc)
766fc439 2408 goto err_out_bus;
602adf40 2409
59c2be1e
YS
2410 rc = rbd_init_watch_dev(rbd_dev);
2411 if (rc)
2412 goto err_out_bus;
2413
602adf40
YS
2414 return count;
2415
766fc439 2416err_out_bus:
499afd5b 2417 rbd_id_put(rbd_dev);
766fc439
YS
2418
2419 /* this will also clean up rest of rbd_dev stuff */
2420
2421 rbd_bus_del_dev(rbd_dev);
2422 kfree(options);
a725f65e 2423 kfree(mon_addrs);
766fc439
YS
2424 return rc;
2425
602adf40
YS
2426err_out_blkdev:
2427 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2428err_out_client:
2429 rbd_put_client(rbd_dev);
f0f8cef5 2430err_put_id:
499afd5b 2431 rbd_id_put(rbd_dev);
27cc2594 2432err_nomem:
602adf40 2433 kfree(options);
a725f65e 2434 kfree(mon_addrs);
27cc2594
AE
2435 kfree(rbd_dev);
2436
602adf40
YS
2437 dout("Error adding device %s\n", buf);
2438 module_put(THIS_MODULE);
27cc2594
AE
2439
2440 return (ssize_t) rc;
602adf40
YS
2441}
2442
2443static struct rbd_device *__rbd_get_dev(unsigned long id)
2444{
2445 struct list_head *tmp;
2446 struct rbd_device *rbd_dev;
2447
e124a82f 2448 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2449 list_for_each(tmp, &rbd_dev_list) {
2450 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2451 if (rbd_dev->id == id) {
2452 spin_unlock(&rbd_dev_list_lock);
602adf40 2453 return rbd_dev;
e124a82f 2454 }
602adf40 2455 }
e124a82f 2456 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2457 return NULL;
2458}
2459
dfc5606d 2460static void rbd_dev_release(struct device *dev)
602adf40 2461{
dfc5606d
YS
2462 struct rbd_device *rbd_dev =
2463 container_of(dev, struct rbd_device, dev);
602adf40 2464
1dbb4399
AE
2465 if (rbd_dev->watch_request) {
2466 struct ceph_client *client = rbd_dev->rbd_client->client;
2467
2468 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2469 rbd_dev->watch_request);
1dbb4399 2470 }
59c2be1e 2471 if (rbd_dev->watch_event)
79e3057c 2472 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
59c2be1e 2473
602adf40
YS
2474 rbd_put_client(rbd_dev);
2475
2476 /* clean up and free blkdev */
2477 rbd_free_disk(rbd_dev);
2478 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2479 kfree(rbd_dev);
2480
2481 /* release module ref */
2482 module_put(THIS_MODULE);
602adf40
YS
2483}
2484
dfc5606d
YS
2485static ssize_t rbd_remove(struct bus_type *bus,
2486 const char *buf,
2487 size_t count)
602adf40
YS
2488{
2489 struct rbd_device *rbd_dev = NULL;
2490 int target_id, rc;
2491 unsigned long ul;
2492 int ret = count;
2493
2494 rc = strict_strtoul(buf, 10, &ul);
2495 if (rc)
2496 return rc;
2497
2498 /* convert to int; abort if we lost anything in the conversion */
2499 target_id = (int) ul;
2500 if (target_id != ul)
2501 return -EINVAL;
2502
2503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2504
2505 rbd_dev = __rbd_get_dev(target_id);
2506 if (!rbd_dev) {
2507 ret = -ENOENT;
2508 goto done;
2509 }
2510
499afd5b 2511 rbd_id_put(rbd_dev);
dfc5606d
YS
2512
2513 __rbd_remove_all_snaps(rbd_dev);
2514 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2515
2516done:
2517 mutex_unlock(&ctl_mutex);
2518 return ret;
2519}
2520
dfc5606d
YS
2521static ssize_t rbd_snap_add(struct device *dev,
2522 struct device_attribute *attr,
2523 const char *buf,
2524 size_t count)
602adf40 2525{
dfc5606d
YS
2526 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2527 int ret;
2528 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2529 if (!name)
2530 return -ENOMEM;
2531
dfc5606d 2532 snprintf(name, count, "%s", buf);
602adf40
YS
2533
2534 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2535
602adf40
YS
2536 ret = rbd_header_add_snap(rbd_dev,
2537 name, GFP_KERNEL);
2538 if (ret < 0)
59c2be1e 2539 goto err_unlock;
602adf40 2540
dfc5606d 2541 ret = __rbd_update_snaps(rbd_dev);
602adf40 2542 if (ret < 0)
59c2be1e
YS
2543 goto err_unlock;
2544
2545 /* shouldn't hold ctl_mutex when notifying.. notify might
2546 trigger a watch callback that would need to get that mutex */
2547 mutex_unlock(&ctl_mutex);
2548
2549 /* make a best effort, don't error if failed */
2550 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2551
2552 ret = count;
59c2be1e
YS
2553 kfree(name);
2554 return ret;
2555
2556err_unlock:
602adf40 2557 mutex_unlock(&ctl_mutex);
602adf40
YS
2558 kfree(name);
2559 return ret;
2560}
2561
602adf40
YS
2562/*
2563 * create control files in sysfs
dfc5606d 2564 * /sys/bus/rbd/...
602adf40
YS
2565 */
2566static int rbd_sysfs_init(void)
2567{
dfc5606d 2568 int ret;
602adf40 2569
dfc5606d 2570 ret = bus_register(&rbd_bus_type);
21079786 2571 if (ret < 0)
dfc5606d 2572 return ret;
602adf40 2573
dfc5606d 2574 ret = device_register(&rbd_root_dev);
602adf40 2575
602adf40
YS
2576 return ret;
2577}
2578
2579static void rbd_sysfs_cleanup(void)
2580{
dfc5606d
YS
2581 device_unregister(&rbd_root_dev);
2582 bus_unregister(&rbd_bus_type);
602adf40
YS
2583}
2584
2585int __init rbd_init(void)
2586{
2587 int rc;
2588
2589 rc = rbd_sysfs_init();
2590 if (rc)
2591 return rc;
f0f8cef5 2592 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2593 return 0;
2594}
2595
2596void __exit rbd_exit(void)
2597{
2598 rbd_sysfs_cleanup();
2599}
2600
2601module_init(rbd_init);
2602module_exit(rbd_exit);
2603
2604MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2605MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2606MODULE_DESCRIPTION("rados block device");
2607
2608/* following authorship retained from original osdblk.c */
2609MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2610
2611MODULE_LICENSE("GPL");