4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11 from Logicworks, Inc. for making SDP replication support possible.
13 drbd is free software; you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation; either version 2, or (at your option)
18 drbd is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
23 You should have received a copy of the GNU General Public License
24 along with drbd; see the file COPYING. If not, write to
25 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
53 #include <linux/drbd_limits.h>
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73 "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85 * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
94 static int fault_count;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
110 int proc_details; /* Detail level in proc drbd*/
112 /* Module parameter for setting the user mode helper program
113 * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119 * as member "struct gendisk *vdisk;"
122 struct list_head drbd_tconns; /* list of struct drbd_tconn */
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache; /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache; /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
133 /* I do not use a standard mempool, because:
134 1) I want to hand out the pre-allocated objects first.
135 2) I want to be able to interrupt sleeping allocation with a signal.
136 Note: This is a single linked list, the next pointer is the private
137 member of struct page.
139 struct page *drbd_pp_pool;
140 spinlock_t drbd_pp_lock;
142 wait_queue_head_t drbd_pp_wait;
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
146 static const struct block_device_operations drbd_ops = {
147 .owner = THIS_MODULE,
149 .release = drbd_release,
152 static void bio_destructor_drbd(struct bio *bio)
154 bio_free(bio, drbd_md_io_bio_set);
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
161 if (!drbd_md_io_bio_set)
162 return bio_alloc(gfp_mask, 1);
164 bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
167 bio->bi_destructor = bio_destructor_drbd;
172 /* When checking with sparse, and this is an inline function, sparse will
173 give tons of false positives. When this is a real functions sparse works.
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
179 atomic_inc(&mdev->local_cnt);
180 io_allowed = (mdev->state.disk >= mins);
182 if (atomic_dec_and_test(&mdev->local_cnt))
183 wake_up(&mdev->misc_wait);
191 * DOC: The transfer log
193 * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194 * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195 * of the list. There is always at least one &struct drbd_tl_epoch object.
197 * Each &struct drbd_tl_epoch has a circular double linked list of requests
200 static int tl_init(struct drbd_tconn *tconn)
202 struct drbd_tl_epoch *b;
204 /* during device minor initialization, we may well use GFP_KERNEL */
205 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
208 INIT_LIST_HEAD(&b->requests);
209 INIT_LIST_HEAD(&b->w.list);
211 b->br_number = atomic_inc_return(&tconn->current_tle_nr);
213 b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
215 tconn->oldest_tle = b;
216 tconn->newest_tle = b;
217 INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218 INIT_LIST_HEAD(&tconn->barrier_acked_requests);
223 static void tl_cleanup(struct drbd_tconn *tconn)
225 if (tconn->oldest_tle != tconn->newest_tle)
226 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227 if (!list_empty(&tconn->out_of_sequence_requests))
228 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229 kfree(tconn->oldest_tle);
230 tconn->oldest_tle = NULL;
231 kfree(tconn->unused_spare_tle);
232 tconn->unused_spare_tle = NULL;
236 * _tl_add_barrier() - Adds a barrier to the transfer log
237 * @mdev: DRBD device.
238 * @new: Barrier to be added before the current head of the TL.
240 * The caller must hold the req_lock.
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
244 INIT_LIST_HEAD(&new->requests);
245 INIT_LIST_HEAD(&new->w.list);
246 new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
250 new->br_number = atomic_inc_return(&tconn->current_tle_nr);
251 if (tconn->newest_tle != new) {
252 tconn->newest_tle->next = new;
253 tconn->newest_tle = new;
258 * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
259 * @mdev: DRBD device.
260 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
261 * @set_size: Expected number of requests before that barrier.
263 * In case the passed barrier_nr or set_size does not match the oldest
264 * &struct drbd_tl_epoch objects this function will cause a termination
267 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
268 unsigned int set_size)
270 struct drbd_conf *mdev;
271 struct drbd_tl_epoch *b, *nob; /* next old barrier */
272 struct list_head *le, *tle;
273 struct drbd_request *r;
275 spin_lock_irq(&tconn->req_lock);
277 b = tconn->oldest_tle;
279 /* first some paranoia code */
281 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
285 if (b->br_number != barrier_nr) {
286 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
287 barrier_nr, b->br_number);
290 if (b->n_writes != set_size) {
291 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
292 barrier_nr, set_size, b->n_writes);
296 /* Clean up list of requests processed during current epoch */
297 list_for_each_safe(le, tle, &b->requests) {
298 r = list_entry(le, struct drbd_request, tl_requests);
299 _req_mod(r, BARRIER_ACKED);
301 /* There could be requests on the list waiting for completion
302 of the write to the local disk. To avoid corruptions of
303 slab's data structures we have to remove the lists head.
305 Also there could have been a barrier ack out of sequence, overtaking
306 the write acks - which would be a bug and violating write ordering.
307 To not deadlock in case we lose connection while such requests are
308 still pending, we need some way to find them for the
309 _req_mode(CONNECTION_LOST_WHILE_PENDING).
311 These have been list_move'd to the out_of_sequence_requests list in
312 _req_mod(, BARRIER_ACKED) above.
314 list_splice_init(&b->requests, &tconn->barrier_acked_requests);
318 if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
319 _tl_add_barrier(tconn, b);
321 tconn->oldest_tle = nob;
322 /* if nob == NULL b was the only barrier, and becomes the new
323 barrier. Therefore tconn->oldest_tle points already to b */
325 D_ASSERT(nob != NULL);
326 tconn->oldest_tle = nob;
330 spin_unlock_irq(&tconn->req_lock);
331 dec_ap_pending(mdev);
336 spin_unlock_irq(&tconn->req_lock);
337 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
342 * _tl_restart() - Walks the transfer log, and applies an action to all requests
343 * @mdev: DRBD device.
344 * @what: The action/event to perform with all request objects
346 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
347 * RESTART_FROZEN_DISK_IO.
349 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
351 struct drbd_tl_epoch *b, *tmp, **pn;
352 struct list_head *le, *tle, carry_reads;
353 struct drbd_request *req;
354 int rv, n_writes, n_reads;
356 b = tconn->oldest_tle;
357 pn = &tconn->oldest_tle;
361 INIT_LIST_HEAD(&carry_reads);
362 list_for_each_safe(le, tle, &b->requests) {
363 req = list_entry(le, struct drbd_request, tl_requests);
364 rv = _req_mod(req, what);
374 if (what == RESEND) {
375 b->n_writes = n_writes;
376 if (b->w.cb == NULL) {
377 b->w.cb = w_send_barrier;
378 inc_ap_pending(b->w.mdev);
379 set_bit(CREATE_BARRIER, &tconn->flags);
382 drbd_queue_work(&tconn->sender_work, &b->w);
387 list_add(&carry_reads, &b->requests);
388 /* there could still be requests on that ring list,
389 * in case local io is still pending */
390 list_del(&b->requests);
392 /* dec_ap_pending corresponding to queue_barrier.
393 * the newest barrier may not have been queued yet,
394 * in which case w.cb is still NULL. */
396 dec_ap_pending(b->w.mdev);
398 if (b == tconn->newest_tle) {
399 /* recycle, but reinit! */
401 conn_err(tconn, "ASSERT FAILED tmp == NULL");
402 INIT_LIST_HEAD(&b->requests);
403 list_splice(&carry_reads, &b->requests);
404 INIT_LIST_HEAD(&b->w.list);
406 b->br_number = atomic_inc_return(&tconn->current_tle_nr);
416 list_splice(&carry_reads, &b->requests);
419 /* Actions operating on the disk state, also want to work on
420 requests that got barrier acked. */
422 case FAIL_FROZEN_DISK_IO:
423 case RESTART_FROZEN_DISK_IO:
424 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
425 req = list_entry(le, struct drbd_request, tl_requests);
428 case CONNECTION_LOST_WHILE_PENDING:
432 conn_err(tconn, "what = %d in _tl_restart()\n", what);
437 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
438 * @mdev: DRBD device.
440 * This is called after the connection to the peer was lost. The storage covered
441 * by the requests on the transfer gets marked as our of sync. Called from the
442 * receiver thread and the worker thread.
444 void tl_clear(struct drbd_tconn *tconn)
446 struct list_head *le, *tle;
447 struct drbd_request *r;
449 spin_lock_irq(&tconn->req_lock);
451 _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
453 /* we expect this list to be empty. */
454 if (!list_empty(&tconn->out_of_sequence_requests))
455 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
457 /* but just in case, clean it up anyways! */
458 list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
459 r = list_entry(le, struct drbd_request, tl_requests);
460 /* It would be nice to complete outside of spinlock.
461 * But this is easier for now. */
462 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
465 /* ensure bit indicating barrier is required is clear */
466 clear_bit(CREATE_BARRIER, &tconn->flags);
468 spin_unlock_irq(&tconn->req_lock);
471 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
473 spin_lock_irq(&tconn->req_lock);
474 _tl_restart(tconn, what);
475 spin_unlock_irq(&tconn->req_lock);
479 * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
480 * @mdev: DRBD device.
482 void tl_abort_disk_io(struct drbd_conf *mdev)
484 struct drbd_tconn *tconn = mdev->tconn;
485 struct drbd_tl_epoch *b;
486 struct list_head *le, *tle;
487 struct drbd_request *req;
489 spin_lock_irq(&tconn->req_lock);
490 b = tconn->oldest_tle;
492 list_for_each_safe(le, tle, &b->requests) {
493 req = list_entry(le, struct drbd_request, tl_requests);
494 if (!(req->rq_state & RQ_LOCAL_PENDING))
496 if (req->w.mdev == mdev)
497 _req_mod(req, ABORT_DISK_IO);
502 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
503 req = list_entry(le, struct drbd_request, tl_requests);
504 if (!(req->rq_state & RQ_LOCAL_PENDING))
506 if (req->w.mdev == mdev)
507 _req_mod(req, ABORT_DISK_IO);
510 spin_unlock_irq(&tconn->req_lock);
513 static int drbd_thread_setup(void *arg)
515 struct drbd_thread *thi = (struct drbd_thread *) arg;
516 struct drbd_tconn *tconn = thi->tconn;
520 snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
521 thi->name[0], thi->tconn->name);
524 retval = thi->function(thi);
526 spin_lock_irqsave(&thi->t_lock, flags);
528 /* if the receiver has been "EXITING", the last thing it did
529 * was set the conn state to "StandAlone",
530 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
531 * and receiver thread will be "started".
532 * drbd_thread_start needs to set "RESTARTING" in that case.
533 * t_state check and assignment needs to be within the same spinlock,
534 * so either thread_start sees EXITING, and can remap to RESTARTING,
535 * or thread_start see NONE, and can proceed as normal.
538 if (thi->t_state == RESTARTING) {
539 conn_info(tconn, "Restarting %s thread\n", thi->name);
540 thi->t_state = RUNNING;
541 spin_unlock_irqrestore(&thi->t_lock, flags);
548 complete_all(&thi->stop);
549 spin_unlock_irqrestore(&thi->t_lock, flags);
551 conn_info(tconn, "Terminating %s\n", current->comm);
553 /* Release mod reference taken when thread was started */
555 kref_put(&tconn->kref, &conn_destroy);
556 module_put(THIS_MODULE);
560 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
561 int (*func) (struct drbd_thread *), char *name)
563 spin_lock_init(&thi->t_lock);
566 thi->function = func;
568 strncpy(thi->name, name, ARRAY_SIZE(thi->name));
571 int drbd_thread_start(struct drbd_thread *thi)
573 struct drbd_tconn *tconn = thi->tconn;
574 struct task_struct *nt;
577 /* is used from state engine doing drbd_thread_stop_nowait,
578 * while holding the req lock irqsave */
579 spin_lock_irqsave(&thi->t_lock, flags);
581 switch (thi->t_state) {
583 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
584 thi->name, current->comm, current->pid);
586 /* Get ref on module for thread - this is released when thread exits */
587 if (!try_module_get(THIS_MODULE)) {
588 conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
589 spin_unlock_irqrestore(&thi->t_lock, flags);
593 kref_get(&thi->tconn->kref);
595 init_completion(&thi->stop);
596 thi->reset_cpu_mask = 1;
597 thi->t_state = RUNNING;
598 spin_unlock_irqrestore(&thi->t_lock, flags);
599 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
601 nt = kthread_create(drbd_thread_setup, (void *) thi,
602 "drbd_%c_%s", thi->name[0], thi->tconn->name);
605 conn_err(tconn, "Couldn't start thread\n");
607 kref_put(&tconn->kref, &conn_destroy);
608 module_put(THIS_MODULE);
611 spin_lock_irqsave(&thi->t_lock, flags);
613 thi->t_state = RUNNING;
614 spin_unlock_irqrestore(&thi->t_lock, flags);
618 thi->t_state = RESTARTING;
619 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
620 thi->name, current->comm, current->pid);
625 spin_unlock_irqrestore(&thi->t_lock, flags);
633 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
637 enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
639 /* may be called from state engine, holding the req lock irqsave */
640 spin_lock_irqsave(&thi->t_lock, flags);
642 if (thi->t_state == NONE) {
643 spin_unlock_irqrestore(&thi->t_lock, flags);
645 drbd_thread_start(thi);
649 if (thi->t_state != ns) {
650 if (thi->task == NULL) {
651 spin_unlock_irqrestore(&thi->t_lock, flags);
657 init_completion(&thi->stop);
658 if (thi->task != current)
659 force_sig(DRBD_SIGKILL, thi->task);
662 spin_unlock_irqrestore(&thi->t_lock, flags);
665 wait_for_completion(&thi->stop);
668 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
670 struct drbd_thread *thi =
671 task == tconn->receiver.task ? &tconn->receiver :
672 task == tconn->asender.task ? &tconn->asender :
673 task == tconn->worker.task ? &tconn->worker : NULL;
678 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
680 struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
681 return thi ? thi->name : task->comm;
684 int conn_lowest_minor(struct drbd_tconn *tconn)
686 struct drbd_conf *mdev;
690 mdev = idr_get_next(&tconn->volumes, &vnr);
691 m = mdev ? mdev_to_minor(mdev) : -1;
699 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
700 * @mdev: DRBD device.
702 * Forces all threads of a device onto the same CPU. This is beneficial for
703 * DRBD's performance. May be overwritten by user's configuration.
705 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
710 if (cpumask_weight(tconn->cpu_mask))
713 ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
714 for_each_online_cpu(cpu) {
716 cpumask_set_cpu(cpu, tconn->cpu_mask);
720 /* should not be reached */
721 cpumask_setall(tconn->cpu_mask);
725 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
726 * @mdev: DRBD device.
727 * @thi: drbd_thread object
729 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
732 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
734 struct task_struct *p = current;
736 if (!thi->reset_cpu_mask)
738 thi->reset_cpu_mask = 0;
739 set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
744 * drbd_header_size - size of a packet header
746 * The header size is a multiple of 8, so any payload following the header is
747 * word aligned on 64-bit architectures. (The bitmap send and receive code
750 unsigned int drbd_header_size(struct drbd_tconn *tconn)
752 if (tconn->agreed_pro_version >= 100) {
753 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
754 return sizeof(struct p_header100);
756 BUILD_BUG_ON(sizeof(struct p_header80) !=
757 sizeof(struct p_header95));
758 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
759 return sizeof(struct p_header80);
763 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
765 h->magic = cpu_to_be32(DRBD_MAGIC);
766 h->command = cpu_to_be16(cmd);
767 h->length = cpu_to_be16(size);
768 return sizeof(struct p_header80);
771 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
773 h->magic = cpu_to_be16(DRBD_MAGIC_BIG);
774 h->command = cpu_to_be16(cmd);
775 h->length = cpu_to_be32(size);
776 return sizeof(struct p_header95);
779 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
782 h->magic = cpu_to_be32(DRBD_MAGIC_100);
783 h->volume = cpu_to_be16(vnr);
784 h->command = cpu_to_be16(cmd);
785 h->length = cpu_to_be32(size);
787 return sizeof(struct p_header100);
790 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
791 void *buffer, enum drbd_packet cmd, int size)
793 if (tconn->agreed_pro_version >= 100)
794 return prepare_header100(buffer, cmd, size, vnr);
795 else if (tconn->agreed_pro_version >= 95 &&
796 size > DRBD_MAX_SIZE_H80_PACKET)
797 return prepare_header95(buffer, cmd, size);
799 return prepare_header80(buffer, cmd, size);
802 static void *__conn_prepare_command(struct drbd_tconn *tconn,
803 struct drbd_socket *sock)
807 return sock->sbuf + drbd_header_size(tconn);
810 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
814 mutex_lock(&sock->mutex);
815 p = __conn_prepare_command(tconn, sock);
817 mutex_unlock(&sock->mutex);
822 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
824 return conn_prepare_command(mdev->tconn, sock);
827 static int __send_command(struct drbd_tconn *tconn, int vnr,
828 struct drbd_socket *sock, enum drbd_packet cmd,
829 unsigned int header_size, void *data,
836 * Called with @data == NULL and the size of the data blocks in @size
837 * for commands that send data blocks. For those commands, omit the
838 * MSG_MORE flag: this will increase the likelihood that data blocks
839 * which are page aligned on the sender will end up page aligned on the
842 msg_flags = data ? MSG_MORE : 0;
844 header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
846 err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
849 err = drbd_send_all(tconn, sock->socket, data, size, 0);
853 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
854 enum drbd_packet cmd, unsigned int header_size,
855 void *data, unsigned int size)
857 return __send_command(tconn, 0, sock, cmd, header_size, data, size);
860 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
861 enum drbd_packet cmd, unsigned int header_size,
862 void *data, unsigned int size)
866 err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
867 mutex_unlock(&sock->mutex);
871 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
872 enum drbd_packet cmd, unsigned int header_size,
873 void *data, unsigned int size)
877 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
879 mutex_unlock(&sock->mutex);
883 int drbd_send_ping(struct drbd_tconn *tconn)
885 struct drbd_socket *sock;
888 if (!conn_prepare_command(tconn, sock))
890 return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
893 int drbd_send_ping_ack(struct drbd_tconn *tconn)
895 struct drbd_socket *sock;
898 if (!conn_prepare_command(tconn, sock))
900 return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
903 int drbd_send_sync_param(struct drbd_conf *mdev)
905 struct drbd_socket *sock;
906 struct p_rs_param_95 *p;
908 const int apv = mdev->tconn->agreed_pro_version;
909 enum drbd_packet cmd;
911 struct disk_conf *dc;
913 sock = &mdev->tconn->data;
914 p = drbd_prepare_command(mdev, sock);
919 nc = rcu_dereference(mdev->tconn->net_conf);
921 size = apv <= 87 ? sizeof(struct p_rs_param)
922 : apv == 88 ? sizeof(struct p_rs_param)
923 + strlen(nc->verify_alg) + 1
924 : apv <= 94 ? sizeof(struct p_rs_param_89)
925 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
927 cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
929 /* initialize verify_alg and csums_alg */
930 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
932 if (get_ldev(mdev)) {
933 dc = rcu_dereference(mdev->ldev->disk_conf);
934 p->resync_rate = cpu_to_be32(dc->resync_rate);
935 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
936 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
937 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
938 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
941 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
942 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
943 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
944 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
945 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
949 strcpy(p->verify_alg, nc->verify_alg);
951 strcpy(p->csums_alg, nc->csums_alg);
954 return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
957 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
959 struct drbd_socket *sock;
960 struct p_protocol *p;
965 p = __conn_prepare_command(tconn, sock);
970 nc = rcu_dereference(tconn->net_conf);
972 if (nc->tentative && tconn->agreed_pro_version < 92) {
974 mutex_unlock(&sock->mutex);
975 conn_err(tconn, "--dry-run is not supported by peer");
980 if (tconn->agreed_pro_version >= 87)
981 size += strlen(nc->integrity_alg) + 1;
983 p->protocol = cpu_to_be32(nc->wire_protocol);
984 p->after_sb_0p = cpu_to_be32(nc->after_sb_0p);
985 p->after_sb_1p = cpu_to_be32(nc->after_sb_1p);
986 p->after_sb_2p = cpu_to_be32(nc->after_sb_2p);
987 p->two_primaries = cpu_to_be32(nc->two_primaries);
989 if (nc->discard_my_data)
990 cf |= CF_DISCARD_MY_DATA;
993 p->conn_flags = cpu_to_be32(cf);
995 if (tconn->agreed_pro_version >= 87)
996 strcpy(p->integrity_alg, nc->integrity_alg);
999 return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1002 int drbd_send_protocol(struct drbd_tconn *tconn)
1006 mutex_lock(&tconn->data.mutex);
1007 err = __drbd_send_protocol(tconn, P_PROTOCOL);
1008 mutex_unlock(&tconn->data.mutex);
1013 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1015 struct drbd_socket *sock;
1019 if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1022 sock = &mdev->tconn->data;
1023 p = drbd_prepare_command(mdev, sock);
1028 for (i = UI_CURRENT; i < UI_SIZE; i++)
1029 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1031 mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1032 p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1034 uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1036 uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1037 uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1038 p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1041 return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1044 int drbd_send_uuids(struct drbd_conf *mdev)
1046 return _drbd_send_uuids(mdev, 0);
1049 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1051 return _drbd_send_uuids(mdev, 8);
1054 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1056 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1057 u64 *uuid = mdev->ldev->md.uuid;
1058 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1060 (unsigned long long)uuid[UI_CURRENT],
1061 (unsigned long long)uuid[UI_BITMAP],
1062 (unsigned long long)uuid[UI_HISTORY_START],
1063 (unsigned long long)uuid[UI_HISTORY_END]);
1066 dev_info(DEV, "%s effective data uuid: %016llX\n",
1068 (unsigned long long)mdev->ed_uuid);
1072 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1074 struct drbd_socket *sock;
1075 struct p_rs_uuid *p;
1078 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1080 uuid = mdev->ldev->md.uuid[UI_BITMAP];
1081 if (uuid && uuid != UUID_JUST_CREATED)
1082 uuid = uuid + UUID_NEW_BM_OFFSET;
1084 get_random_bytes(&uuid, sizeof(u64));
1085 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1086 drbd_print_uuids(mdev, "updated sync UUID");
1089 sock = &mdev->tconn->data;
1090 p = drbd_prepare_command(mdev, sock);
1092 p->uuid = cpu_to_be64(uuid);
1093 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1097 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1099 struct drbd_socket *sock;
1101 sector_t d_size, u_size;
1102 int q_order_type, max_bio_size;
1104 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1105 D_ASSERT(mdev->ldev->backing_bdev);
1106 d_size = drbd_get_max_capacity(mdev->ldev);
1108 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1110 q_order_type = drbd_queue_order_type(mdev);
1111 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1112 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1117 q_order_type = QUEUE_ORDERED_NONE;
1118 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1121 sock = &mdev->tconn->data;
1122 p = drbd_prepare_command(mdev, sock);
1126 if (mdev->tconn->agreed_pro_version <= 94)
1127 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1128 else if (mdev->tconn->agreed_pro_version < 100)
1129 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1131 p->d_size = cpu_to_be64(d_size);
1132 p->u_size = cpu_to_be64(u_size);
1133 p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1134 p->max_bio_size = cpu_to_be32(max_bio_size);
1135 p->queue_order_type = cpu_to_be16(q_order_type);
1136 p->dds_flags = cpu_to_be16(flags);
1137 return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1141 * drbd_send_current_state() - Sends the drbd state to the peer
1142 * @mdev: DRBD device.
1144 int drbd_send_current_state(struct drbd_conf *mdev)
1146 struct drbd_socket *sock;
1149 sock = &mdev->tconn->data;
1150 p = drbd_prepare_command(mdev, sock);
1153 p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1154 return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1158 * drbd_send_state() - After a state change, sends the new state to the peer
1159 * @mdev: DRBD device.
1160 * @state: the state to send, not necessarily the current state.
1162 * Each state change queues an "after_state_ch" work, which will eventually
1163 * send the resulting new state to the peer. If more state changes happen
1164 * between queuing and processing of the after_state_ch work, we still
1165 * want to send each intermediary state in the order it occurred.
1167 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
1169 struct drbd_socket *sock;
1172 sock = &mdev->tconn->data;
1173 p = drbd_prepare_command(mdev, sock);
1176 p->state = cpu_to_be32(state.i); /* Within the send mutex */
1177 return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1180 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1182 struct drbd_socket *sock;
1183 struct p_req_state *p;
1185 sock = &mdev->tconn->data;
1186 p = drbd_prepare_command(mdev, sock);
1189 p->mask = cpu_to_be32(mask.i);
1190 p->val = cpu_to_be32(val.i);
1191 return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1194 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1196 enum drbd_packet cmd;
1197 struct drbd_socket *sock;
1198 struct p_req_state *p;
1200 cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1201 sock = &tconn->data;
1202 p = conn_prepare_command(tconn, sock);
1205 p->mask = cpu_to_be32(mask.i);
1206 p->val = cpu_to_be32(val.i);
1207 return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1210 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1212 struct drbd_socket *sock;
1213 struct p_req_state_reply *p;
1215 sock = &mdev->tconn->meta;
1216 p = drbd_prepare_command(mdev, sock);
1218 p->retcode = cpu_to_be32(retcode);
1219 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1223 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1225 struct drbd_socket *sock;
1226 struct p_req_state_reply *p;
1227 enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1229 sock = &tconn->meta;
1230 p = conn_prepare_command(tconn, sock);
1232 p->retcode = cpu_to_be32(retcode);
1233 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1237 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1239 BUG_ON(code & ~0xf);
1240 p->encoding = (p->encoding & ~0xf) | code;
1243 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1245 p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1248 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1251 p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1254 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1255 struct p_compressed_bm *p,
1257 struct bm_xfer_ctx *c)
1259 struct bitstream bs;
1260 unsigned long plain_bits;
1267 /* may we use this feature? */
1269 use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1271 if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1274 if (c->bit_offset >= c->bm_bits)
1275 return 0; /* nothing to do. */
1277 /* use at most thus many bytes */
1278 bitstream_init(&bs, p->code, size, 0);
1279 memset(p->code, 0, size);
1280 /* plain bits covered in this code string */
1283 /* p->encoding & 0x80 stores whether the first run length is set.
1284 * bit offset is implicit.
1285 * start with toggle == 2 to be able to tell the first iteration */
1288 /* see how much plain bits we can stuff into one packet
1289 * using RLE and VLI. */
1291 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1292 : _drbd_bm_find_next(mdev, c->bit_offset);
1295 rl = tmp - c->bit_offset;
1297 if (toggle == 2) { /* first iteration */
1299 /* the first checked bit was set,
1300 * store start value, */
1301 dcbp_set_start(p, 1);
1302 /* but skip encoding of zero run length */
1306 dcbp_set_start(p, 0);
1309 /* paranoia: catch zero runlength.
1310 * can only happen if bitmap is modified while we scan it. */
1312 dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1313 "t:%u bo:%lu\n", toggle, c->bit_offset);
1317 bits = vli_encode_bits(&bs, rl);
1318 if (bits == -ENOBUFS) /* buffer full */
1321 dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1327 c->bit_offset = tmp;
1328 } while (c->bit_offset < c->bm_bits);
1330 len = bs.cur.b - p->code + !!bs.cur.bit;
1332 if (plain_bits < (len << 3)) {
1333 /* incompressible with this method.
1334 * we need to rewind both word and bit position. */
1335 c->bit_offset -= plain_bits;
1336 bm_xfer_ctx_bit_to_word_offset(c);
1337 c->bit_offset = c->word_offset * BITS_PER_LONG;
1341 /* RLE + VLI was able to compress it just fine.
1342 * update c->word_offset. */
1343 bm_xfer_ctx_bit_to_word_offset(c);
1345 /* store pad_bits */
1346 dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1352 * send_bitmap_rle_or_plain
1354 * Return 0 when done, 1 when another iteration is needed, and a negative error
1355 * code upon failure.
1358 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1360 struct drbd_socket *sock = &mdev->tconn->data;
1361 unsigned int header_size = drbd_header_size(mdev->tconn);
1362 struct p_compressed_bm *p = sock->sbuf + header_size;
1365 len = fill_bitmap_rle_bits(mdev, p,
1366 DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1371 dcbp_set_code(p, RLE_VLI_Bits);
1372 err = __send_command(mdev->tconn, mdev->vnr, sock,
1373 P_COMPRESSED_BITMAP, sizeof(*p) + len,
1376 c->bytes[0] += header_size + sizeof(*p) + len;
1378 if (c->bit_offset >= c->bm_bits)
1381 /* was not compressible.
1382 * send a buffer full of plain text bits instead. */
1383 unsigned int data_size;
1384 unsigned long num_words;
1385 unsigned long *p = sock->sbuf + header_size;
1387 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1388 num_words = min_t(size_t, data_size / sizeof(*p),
1389 c->bm_words - c->word_offset);
1390 len = num_words * sizeof(*p);
1392 drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1393 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1394 c->word_offset += num_words;
1395 c->bit_offset = c->word_offset * BITS_PER_LONG;
1398 c->bytes[1] += header_size + len;
1400 if (c->bit_offset > c->bm_bits)
1401 c->bit_offset = c->bm_bits;
1405 INFO_bm_xfer_stats(mdev, "send", c);
1413 /* See the comment at receive_bitmap() */
1414 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1416 struct bm_xfer_ctx c;
1419 if (!expect(mdev->bitmap))
1422 if (get_ldev(mdev)) {
1423 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1424 dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1425 drbd_bm_set_all(mdev);
1426 if (drbd_bm_write(mdev)) {
1427 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1428 * but otherwise process as per normal - need to tell other
1429 * side that a full resync is required! */
1430 dev_err(DEV, "Failed to write bitmap to disk!\n");
1432 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1439 c = (struct bm_xfer_ctx) {
1440 .bm_bits = drbd_bm_bits(mdev),
1441 .bm_words = drbd_bm_words(mdev),
1445 err = send_bitmap_rle_or_plain(mdev, &c);
1451 int drbd_send_bitmap(struct drbd_conf *mdev)
1453 struct drbd_socket *sock = &mdev->tconn->data;
1456 mutex_lock(&sock->mutex);
1458 err = !_drbd_send_bitmap(mdev);
1459 mutex_unlock(&sock->mutex);
1463 void drbd_send_b_ack(struct drbd_tconn *tconn, u32 barrier_nr, u32 set_size)
1465 struct drbd_socket *sock;
1466 struct p_barrier_ack *p;
1468 if (tconn->cstate < C_WF_REPORT_PARAMS)
1471 sock = &tconn->meta;
1472 p = conn_prepare_command(tconn, sock);
1475 p->barrier = barrier_nr;
1476 p->set_size = cpu_to_be32(set_size);
1477 conn_send_command(tconn, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1481 * _drbd_send_ack() - Sends an ack packet
1482 * @mdev: DRBD device.
1483 * @cmd: Packet command code.
1484 * @sector: sector, needs to be in big endian byte order
1485 * @blksize: size in byte, needs to be in big endian byte order
1486 * @block_id: Id, big endian byte order
1488 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1489 u64 sector, u32 blksize, u64 block_id)
1491 struct drbd_socket *sock;
1492 struct p_block_ack *p;
1494 if (mdev->state.conn < C_CONNECTED)
1497 sock = &mdev->tconn->meta;
1498 p = drbd_prepare_command(mdev, sock);
1502 p->block_id = block_id;
1503 p->blksize = blksize;
1504 p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1505 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1508 /* dp->sector and dp->block_id already/still in network byte order,
1509 * data_size is payload size according to dp->head,
1510 * and may need to be corrected for digest size. */
1511 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1512 struct p_data *dp, int data_size)
1514 if (mdev->tconn->peer_integrity_tfm)
1515 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1516 _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1520 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1521 struct p_block_req *rp)
1523 _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1527 * drbd_send_ack() - Sends an ack packet
1528 * @mdev: DRBD device
1529 * @cmd: packet command code
1530 * @peer_req: peer request
1532 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1533 struct drbd_peer_request *peer_req)
1535 return _drbd_send_ack(mdev, cmd,
1536 cpu_to_be64(peer_req->i.sector),
1537 cpu_to_be32(peer_req->i.size),
1538 peer_req->block_id);
1541 /* This function misuses the block_id field to signal if the blocks
1542 * are is sync or not. */
1543 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1544 sector_t sector, int blksize, u64 block_id)
1546 return _drbd_send_ack(mdev, cmd,
1547 cpu_to_be64(sector),
1548 cpu_to_be32(blksize),
1549 cpu_to_be64(block_id));
1552 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1553 sector_t sector, int size, u64 block_id)
1555 struct drbd_socket *sock;
1556 struct p_block_req *p;
1558 sock = &mdev->tconn->data;
1559 p = drbd_prepare_command(mdev, sock);
1562 p->sector = cpu_to_be64(sector);
1563 p->block_id = block_id;
1564 p->blksize = cpu_to_be32(size);
1565 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1568 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1569 void *digest, int digest_size, enum drbd_packet cmd)
1571 struct drbd_socket *sock;
1572 struct p_block_req *p;
1574 /* FIXME: Put the digest into the preallocated socket buffer. */
1576 sock = &mdev->tconn->data;
1577 p = drbd_prepare_command(mdev, sock);
1580 p->sector = cpu_to_be64(sector);
1581 p->block_id = ID_SYNCER /* unused */;
1582 p->blksize = cpu_to_be32(size);
1583 return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1584 digest, digest_size);
1587 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1589 struct drbd_socket *sock;
1590 struct p_block_req *p;
1592 sock = &mdev->tconn->data;
1593 p = drbd_prepare_command(mdev, sock);
1596 p->sector = cpu_to_be64(sector);
1597 p->block_id = ID_SYNCER /* unused */;
1598 p->blksize = cpu_to_be32(size);
1599 return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1602 /* called on sndtimeo
1603 * returns false if we should retry,
1604 * true if we think connection is dead
1606 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1609 /* long elapsed = (long)(jiffies - mdev->last_received); */
1611 drop_it = tconn->meta.socket == sock
1612 || !tconn->asender.task
1613 || get_t_state(&tconn->asender) != RUNNING
1614 || tconn->cstate < C_WF_REPORT_PARAMS;
1619 drop_it = !--tconn->ko_count;
1621 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1622 current->comm, current->pid, tconn->ko_count);
1623 request_ping(tconn);
1626 return drop_it; /* && (mdev->state == R_PRIMARY) */;
1629 static void drbd_update_congested(struct drbd_tconn *tconn)
1631 struct sock *sk = tconn->data.socket->sk;
1632 if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1633 set_bit(NET_CONGESTED, &tconn->flags);
1636 /* The idea of sendpage seems to be to put some kind of reference
1637 * to the page into the skb, and to hand it over to the NIC. In
1638 * this process get_page() gets called.
1640 * As soon as the page was really sent over the network put_page()
1641 * gets called by some part of the network layer. [ NIC driver? ]
1643 * [ get_page() / put_page() increment/decrement the count. If count
1644 * reaches 0 the page will be freed. ]
1646 * This works nicely with pages from FSs.
1647 * But this means that in protocol A we might signal IO completion too early!
1649 * In order not to corrupt data during a resync we must make sure
1650 * that we do not reuse our own buffer pages (EEs) to early, therefore
1651 * we have the net_ee list.
1653 * XFS seems to have problems, still, it submits pages with page_count == 0!
1654 * As a workaround, we disable sendpage on pages
1655 * with page_count == 0 or PageSlab.
1657 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1658 int offset, size_t size, unsigned msg_flags)
1660 struct socket *socket;
1664 socket = mdev->tconn->data.socket;
1665 addr = kmap(page) + offset;
1666 err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1669 mdev->send_cnt += size >> 9;
1673 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1674 int offset, size_t size, unsigned msg_flags)
1676 struct socket *socket = mdev->tconn->data.socket;
1677 mm_segment_t oldfs = get_fs();
1681 /* e.g. XFS meta- & log-data is in slab pages, which have a
1682 * page_count of 0 and/or have PageSlab() set.
1683 * we cannot use send_page for those, as that does get_page();
1684 * put_page(); and would cause either a VM_BUG directly, or
1685 * __page_cache_release a page that would actually still be referenced
1686 * by someone, leading to some obscure delayed Oops somewhere else. */
1687 if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1688 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1690 msg_flags |= MSG_NOSIGNAL;
1691 drbd_update_congested(mdev->tconn);
1696 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1698 if (sent == -EAGAIN) {
1699 if (we_should_drop_the_connection(mdev->tconn, socket))
1703 dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1704 __func__, (int)size, len, sent);
1711 } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1713 clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1717 mdev->send_cnt += size >> 9;
1722 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1724 struct bio_vec *bvec;
1726 /* hint all but last page with MSG_MORE */
1727 bio_for_each_segment(bvec, bio, i) {
1730 err = _drbd_no_send_page(mdev, bvec->bv_page,
1731 bvec->bv_offset, bvec->bv_len,
1732 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1739 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1741 struct bio_vec *bvec;
1743 /* hint all but last page with MSG_MORE */
1744 bio_for_each_segment(bvec, bio, i) {
1747 err = _drbd_send_page(mdev, bvec->bv_page,
1748 bvec->bv_offset, bvec->bv_len,
1749 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1756 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1757 struct drbd_peer_request *peer_req)
1759 struct page *page = peer_req->pages;
1760 unsigned len = peer_req->i.size;
1763 /* hint all but last page with MSG_MORE */
1764 page_chain_for_each(page) {
1765 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1767 err = _drbd_send_page(mdev, page, 0, l,
1768 page_chain_next(page) ? MSG_MORE : 0);
1776 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1778 if (mdev->tconn->agreed_pro_version >= 95)
1779 return (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1780 (bi_rw & REQ_FUA ? DP_FUA : 0) |
1781 (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1782 (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1784 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1787 /* Used to send write requests
1788 * R_PRIMARY -> Peer (P_DATA)
1790 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1792 struct drbd_socket *sock;
1794 unsigned int dp_flags = 0;
1798 sock = &mdev->tconn->data;
1799 p = drbd_prepare_command(mdev, sock);
1800 dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1804 p->sector = cpu_to_be64(req->i.sector);
1805 p->block_id = (unsigned long)req;
1806 p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1807 dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1808 if (mdev->state.conn >= C_SYNC_SOURCE &&
1809 mdev->state.conn <= C_PAUSED_SYNC_T)
1810 dp_flags |= DP_MAY_SET_IN_SYNC;
1811 if (mdev->tconn->agreed_pro_version >= 100) {
1812 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1813 dp_flags |= DP_SEND_RECEIVE_ACK;
1814 if (req->rq_state & RQ_EXP_WRITE_ACK)
1815 dp_flags |= DP_SEND_WRITE_ACK;
1817 p->dp_flags = cpu_to_be32(dp_flags);
1819 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1820 err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1822 /* For protocol A, we have to memcpy the payload into
1823 * socket buffers, as we may complete right away
1824 * as soon as we handed it over to tcp, at which point the data
1825 * pages may become invalid.
1827 * For data-integrity enabled, we copy it as well, so we can be
1828 * sure that even if the bio pages may still be modified, it
1829 * won't change the data on the wire, thus if the digest checks
1830 * out ok after sending on this side, but does not fit on the
1831 * receiving side, we sure have detected corruption elsewhere.
1833 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1834 err = _drbd_send_bio(mdev, req->master_bio);
1836 err = _drbd_send_zc_bio(mdev, req->master_bio);
1838 /* double check digest, sometimes buffers have been modified in flight. */
1839 if (dgs > 0 && dgs <= 64) {
1840 /* 64 byte, 512 bit, is the largest digest size
1841 * currently supported in kernel crypto. */
1842 unsigned char digest[64];
1843 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1844 if (memcmp(p + 1, digest, dgs)) {
1846 "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1847 (unsigned long long)req->i.sector, req->i.size);
1849 } /* else if (dgs > 64) {
1850 ... Be noisy about digest too large ...
1853 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1858 /* answer packet, used to send data back for read requests:
1859 * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY)
1860 * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
1862 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1863 struct drbd_peer_request *peer_req)
1865 struct drbd_socket *sock;
1870 sock = &mdev->tconn->data;
1871 p = drbd_prepare_command(mdev, sock);
1873 dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1877 p->sector = cpu_to_be64(peer_req->i.sector);
1878 p->block_id = peer_req->block_id;
1879 p->seq_num = 0; /* unused */
1882 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1883 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1885 err = _drbd_send_zc_ee(mdev, peer_req);
1886 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1891 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1893 struct drbd_socket *sock;
1894 struct p_block_desc *p;
1896 sock = &mdev->tconn->data;
1897 p = drbd_prepare_command(mdev, sock);
1900 p->sector = cpu_to_be64(req->i.sector);
1901 p->blksize = cpu_to_be32(req->i.size);
1902 return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1906 drbd_send distinguishes two cases:
1908 Packets sent via the data socket "sock"
1909 and packets sent via the meta data socket "msock"
1912 -----------------+-------------------------+------------------------------
1913 timeout conf.timeout / 2 conf.timeout / 2
1914 timeout action send a ping via msock Abort communication
1915 and close all sockets
1919 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1921 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1922 void *buf, size_t size, unsigned msg_flags)
1931 /* THINK if (signal_pending) return ... ? */
1936 msg.msg_name = NULL;
1937 msg.msg_namelen = 0;
1938 msg.msg_control = NULL;
1939 msg.msg_controllen = 0;
1940 msg.msg_flags = msg_flags | MSG_NOSIGNAL;
1942 if (sock == tconn->data.socket) {
1944 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1946 drbd_update_congested(tconn);
1950 * tcp_sendmsg does _not_ use its size parameter at all ?
1952 * -EAGAIN on timeout, -EINTR on signal.
1955 * do we need to block DRBD_SIG if sock == &meta.socket ??
1956 * otherwise wake_asender() might interrupt some send_*Ack !
1958 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1959 if (rv == -EAGAIN) {
1960 if (we_should_drop_the_connection(tconn, sock))
1966 flush_signals(current);
1974 } while (sent < size);
1976 if (sock == tconn->data.socket)
1977 clear_bit(NET_CONGESTED, &tconn->flags);
1980 if (rv != -EAGAIN) {
1981 conn_err(tconn, "%s_sendmsg returned %d\n",
1982 sock == tconn->meta.socket ? "msock" : "sock",
1984 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1986 conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1993 * drbd_send_all - Send an entire buffer
1995 * Returns 0 upon success and a negative error value otherwise.
1997 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1998 size_t size, unsigned msg_flags)
2002 err = drbd_send(tconn, sock, buffer, size, msg_flags);
2010 static int drbd_open(struct block_device *bdev, fmode_t mode)
2012 struct drbd_conf *mdev = bdev->bd_disk->private_data;
2013 unsigned long flags;
2016 mutex_lock(&drbd_main_mutex);
2017 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2018 /* to have a stable mdev->state.role
2019 * and no race with updating open_cnt */
2021 if (mdev->state.role != R_PRIMARY) {
2022 if (mode & FMODE_WRITE)
2024 else if (!allow_oos)
2030 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2031 mutex_unlock(&drbd_main_mutex);
2036 static int drbd_release(struct gendisk *gd, fmode_t mode)
2038 struct drbd_conf *mdev = gd->private_data;
2039 mutex_lock(&drbd_main_mutex);
2041 mutex_unlock(&drbd_main_mutex);
2045 static void drbd_set_defaults(struct drbd_conf *mdev)
2047 /* Beware! The actual layout differs
2048 * between big endian and little endian */
2049 mdev->state = (union drbd_dev_state) {
2050 { .role = R_SECONDARY,
2052 .conn = C_STANDALONE,
2058 void drbd_init_set_defaults(struct drbd_conf *mdev)
2060 /* the memset(,0,) did most of this.
2061 * note: only assignments, no allocation in here */
2063 drbd_set_defaults(mdev);
2065 atomic_set(&mdev->ap_bio_cnt, 0);
2066 atomic_set(&mdev->ap_pending_cnt, 0);
2067 atomic_set(&mdev->rs_pending_cnt, 0);
2068 atomic_set(&mdev->unacked_cnt, 0);
2069 atomic_set(&mdev->local_cnt, 0);
2070 atomic_set(&mdev->pp_in_use_by_net, 0);
2071 atomic_set(&mdev->rs_sect_in, 0);
2072 atomic_set(&mdev->rs_sect_ev, 0);
2073 atomic_set(&mdev->ap_in_flight, 0);
2074 atomic_set(&mdev->md_io_in_use, 0);
2076 mutex_init(&mdev->own_state_mutex);
2077 mdev->state_mutex = &mdev->own_state_mutex;
2079 spin_lock_init(&mdev->al_lock);
2080 spin_lock_init(&mdev->peer_seq_lock);
2082 INIT_LIST_HEAD(&mdev->active_ee);
2083 INIT_LIST_HEAD(&mdev->sync_ee);
2084 INIT_LIST_HEAD(&mdev->done_ee);
2085 INIT_LIST_HEAD(&mdev->read_ee);
2086 INIT_LIST_HEAD(&mdev->net_ee);
2087 INIT_LIST_HEAD(&mdev->resync_reads);
2088 INIT_LIST_HEAD(&mdev->resync_work.list);
2089 INIT_LIST_HEAD(&mdev->unplug_work.list);
2090 INIT_LIST_HEAD(&mdev->go_diskless.list);
2091 INIT_LIST_HEAD(&mdev->md_sync_work.list);
2092 INIT_LIST_HEAD(&mdev->start_resync_work.list);
2093 INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2095 mdev->resync_work.cb = w_resync_timer;
2096 mdev->unplug_work.cb = w_send_write_hint;
2097 mdev->go_diskless.cb = w_go_diskless;
2098 mdev->md_sync_work.cb = w_md_sync;
2099 mdev->bm_io_work.w.cb = w_bitmap_io;
2100 mdev->start_resync_work.cb = w_start_resync;
2102 mdev->resync_work.mdev = mdev;
2103 mdev->unplug_work.mdev = mdev;
2104 mdev->go_diskless.mdev = mdev;
2105 mdev->md_sync_work.mdev = mdev;
2106 mdev->bm_io_work.w.mdev = mdev;
2107 mdev->start_resync_work.mdev = mdev;
2109 init_timer(&mdev->resync_timer);
2110 init_timer(&mdev->md_sync_timer);
2111 init_timer(&mdev->start_resync_timer);
2112 init_timer(&mdev->request_timer);
2113 mdev->resync_timer.function = resync_timer_fn;
2114 mdev->resync_timer.data = (unsigned long) mdev;
2115 mdev->md_sync_timer.function = md_sync_timer_fn;
2116 mdev->md_sync_timer.data = (unsigned long) mdev;
2117 mdev->start_resync_timer.function = start_resync_timer_fn;
2118 mdev->start_resync_timer.data = (unsigned long) mdev;
2119 mdev->request_timer.function = request_timer_fn;
2120 mdev->request_timer.data = (unsigned long) mdev;
2122 init_waitqueue_head(&mdev->misc_wait);
2123 init_waitqueue_head(&mdev->state_wait);
2124 init_waitqueue_head(&mdev->ee_wait);
2125 init_waitqueue_head(&mdev->al_wait);
2126 init_waitqueue_head(&mdev->seq_wait);
2128 mdev->resync_wenr = LC_FREE;
2129 mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2130 mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2133 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2136 if (mdev->tconn->receiver.t_state != NONE)
2137 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2138 mdev->tconn->receiver.t_state);
2149 mdev->rs_failed = 0;
2150 mdev->rs_last_events = 0;
2151 mdev->rs_last_sect_ev = 0;
2152 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2153 mdev->rs_mark_left[i] = 0;
2154 mdev->rs_mark_time[i] = 0;
2156 D_ASSERT(mdev->tconn->net_conf == NULL);
2158 drbd_set_my_capacity(mdev, 0);
2160 /* maybe never allocated. */
2161 drbd_bm_resize(mdev, 0, 1);
2162 drbd_bm_cleanup(mdev);
2165 drbd_free_bc(mdev->ldev);
2168 clear_bit(AL_SUSPENDED, &mdev->flags);
2170 D_ASSERT(list_empty(&mdev->active_ee));
2171 D_ASSERT(list_empty(&mdev->sync_ee));
2172 D_ASSERT(list_empty(&mdev->done_ee));
2173 D_ASSERT(list_empty(&mdev->read_ee));
2174 D_ASSERT(list_empty(&mdev->net_ee));
2175 D_ASSERT(list_empty(&mdev->resync_reads));
2176 D_ASSERT(list_empty(&mdev->tconn->sender_work.q));
2177 D_ASSERT(list_empty(&mdev->resync_work.list));
2178 D_ASSERT(list_empty(&mdev->unplug_work.list));
2179 D_ASSERT(list_empty(&mdev->go_diskless.list));
2181 drbd_set_defaults(mdev);
2185 static void drbd_destroy_mempools(void)
2189 while (drbd_pp_pool) {
2190 page = drbd_pp_pool;
2191 drbd_pp_pool = (struct page *)page_private(page);
2196 /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2198 if (drbd_md_io_bio_set)
2199 bioset_free(drbd_md_io_bio_set);
2200 if (drbd_md_io_page_pool)
2201 mempool_destroy(drbd_md_io_page_pool);
2202 if (drbd_ee_mempool)
2203 mempool_destroy(drbd_ee_mempool);
2204 if (drbd_request_mempool)
2205 mempool_destroy(drbd_request_mempool);
2207 kmem_cache_destroy(drbd_ee_cache);
2208 if (drbd_request_cache)
2209 kmem_cache_destroy(drbd_request_cache);
2210 if (drbd_bm_ext_cache)
2211 kmem_cache_destroy(drbd_bm_ext_cache);
2212 if (drbd_al_ext_cache)
2213 kmem_cache_destroy(drbd_al_ext_cache);
2215 drbd_md_io_bio_set = NULL;
2216 drbd_md_io_page_pool = NULL;
2217 drbd_ee_mempool = NULL;
2218 drbd_request_mempool = NULL;
2219 drbd_ee_cache = NULL;
2220 drbd_request_cache = NULL;
2221 drbd_bm_ext_cache = NULL;
2222 drbd_al_ext_cache = NULL;
2227 static int drbd_create_mempools(void)
2230 const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2233 /* prepare our caches and mempools */
2234 drbd_request_mempool = NULL;
2235 drbd_ee_cache = NULL;
2236 drbd_request_cache = NULL;
2237 drbd_bm_ext_cache = NULL;
2238 drbd_al_ext_cache = NULL;
2239 drbd_pp_pool = NULL;
2240 drbd_md_io_page_pool = NULL;
2241 drbd_md_io_bio_set = NULL;
2244 drbd_request_cache = kmem_cache_create(
2245 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2246 if (drbd_request_cache == NULL)
2249 drbd_ee_cache = kmem_cache_create(
2250 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2251 if (drbd_ee_cache == NULL)
2254 drbd_bm_ext_cache = kmem_cache_create(
2255 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2256 if (drbd_bm_ext_cache == NULL)
2259 drbd_al_ext_cache = kmem_cache_create(
2260 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2261 if (drbd_al_ext_cache == NULL)
2265 drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2266 if (drbd_md_io_bio_set == NULL)
2269 drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2270 if (drbd_md_io_page_pool == NULL)
2273 drbd_request_mempool = mempool_create(number,
2274 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2275 if (drbd_request_mempool == NULL)
2278 drbd_ee_mempool = mempool_create(number,
2279 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2280 if (drbd_ee_mempool == NULL)
2283 /* drbd's page pool */
2284 spin_lock_init(&drbd_pp_lock);
2286 for (i = 0; i < number; i++) {
2287 page = alloc_page(GFP_HIGHUSER);
2290 set_page_private(page, (unsigned long)drbd_pp_pool);
2291 drbd_pp_pool = page;
2293 drbd_pp_vacant = number;
2298 drbd_destroy_mempools(); /* in case we allocated some */
2302 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2305 /* just so we have it. you never know what interesting things we
2306 * might want to do here some day...
2312 static struct notifier_block drbd_notifier = {
2313 .notifier_call = drbd_notify_sys,
2316 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2320 rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2322 dev_err(DEV, "%d EEs in active list found!\n", rr);
2324 rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2326 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2328 rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2330 dev_err(DEV, "%d EEs in read list found!\n", rr);
2332 rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2334 dev_err(DEV, "%d EEs in done list found!\n", rr);
2336 rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2338 dev_err(DEV, "%d EEs in net list found!\n", rr);
2341 /* caution. no locking. */
2342 void drbd_minor_destroy(struct kref *kref)
2344 struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2345 struct drbd_tconn *tconn = mdev->tconn;
2347 del_timer_sync(&mdev->request_timer);
2349 /* paranoia asserts */
2350 D_ASSERT(mdev->open_cnt == 0);
2351 /* end paranoia asserts */
2353 /* cleanup stuff that may have been allocated during
2354 * device (re-)configuration or state changes */
2356 if (mdev->this_bdev)
2357 bdput(mdev->this_bdev);
2359 drbd_free_bc(mdev->ldev);
2362 drbd_release_all_peer_reqs(mdev);
2364 lc_destroy(mdev->act_log);
2365 lc_destroy(mdev->resync);
2367 kfree(mdev->p_uuid);
2368 /* mdev->p_uuid = NULL; */
2370 if (mdev->bitmap) /* should no longer be there. */
2371 drbd_bm_cleanup(mdev);
2372 __free_page(mdev->md_io_page);
2373 put_disk(mdev->vdisk);
2374 blk_cleanup_queue(mdev->rq_queue);
2375 kfree(mdev->rs_plan_s);
2378 kref_put(&tconn->kref, &conn_destroy);
2381 /* One global retry thread, if we need to push back some bio and have it
2382 * reinserted through our make request function.
2384 static struct retry_worker {
2385 struct workqueue_struct *wq;
2386 struct work_struct worker;
2389 struct list_head writes;
2392 static void do_retry(struct work_struct *ws)
2394 struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
2396 struct drbd_request *req, *tmp;
2398 spin_lock_irq(&retry->lock);
2399 list_splice_init(&retry->writes, &writes);
2400 spin_unlock_irq(&retry->lock);
2402 list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
2403 struct drbd_conf *mdev = req->w.mdev;
2404 struct bio *bio = req->master_bio;
2405 unsigned long start_time = req->start_time;
2407 /* We have exclusive access to this request object.
2408 * If it had not been RQ_POSTPONED, the code path which queued
2409 * it here would have completed and freed it already.
2411 mempool_free(req, drbd_request_mempool);
2413 /* A single suspended or otherwise blocking device may stall
2414 * all others as well. Fortunately, this code path is to
2415 * recover from a situation that "should not happen":
2416 * concurrent writes in multi-primary setup.
2417 * In a "normal" lifecycle, this workqueue is supposed to be
2418 * destroyed without ever doing anything.
2419 * If it turns out to be an issue anyways, we can do per
2420 * resource (replication group) or per device (minor) retry
2421 * workqueues instead.
2424 /* We are not just doing generic_make_request(),
2425 * as we want to keep the start_time information. */
2428 } while(__drbd_make_request(mdev, bio, start_time));
2432 void drbd_restart_request(struct drbd_request *req)
2434 unsigned long flags;
2435 spin_lock_irqsave(&retry.lock, flags);
2436 list_move_tail(&req->tl_requests, &retry.writes);
2437 spin_unlock_irqrestore(&retry.lock, flags);
2439 /* Drop the extra reference that would otherwise
2440 * have been dropped by complete_master_bio.
2441 * do_retry() needs to grab a new one. */
2442 dec_ap_bio(req->w.mdev);
2444 queue_work(retry.wq, &retry.worker);
2448 static void drbd_cleanup(void)
2451 struct drbd_conf *mdev;
2452 struct drbd_tconn *tconn, *tmp;
2454 unregister_reboot_notifier(&drbd_notifier);
2456 /* first remove proc,
2457 * drbdsetup uses it's presence to detect
2458 * whether DRBD is loaded.
2459 * If we would get stuck in proc removal,
2460 * but have netlink already deregistered,
2461 * some drbdsetup commands may wait forever
2465 remove_proc_entry("drbd", NULL);
2468 destroy_workqueue(retry.wq);
2470 drbd_genl_unregister();
2472 idr_for_each_entry(&minors, mdev, i) {
2473 idr_remove(&minors, mdev_to_minor(mdev));
2474 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2475 del_gendisk(mdev->vdisk);
2476 /* synchronize_rcu(); No other threads running at this point */
2477 kref_put(&mdev->kref, &drbd_minor_destroy);
2480 /* not _rcu since, no other updater anymore. Genl already unregistered */
2481 list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2482 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2483 /* synchronize_rcu(); */
2484 kref_put(&tconn->kref, &conn_destroy);
2487 drbd_destroy_mempools();
2488 unregister_blkdev(DRBD_MAJOR, "drbd");
2490 idr_destroy(&minors);
2492 printk(KERN_INFO "drbd: module cleanup done.\n");
2496 * drbd_congested() - Callback for pdflush
2497 * @congested_data: User data
2498 * @bdi_bits: Bits pdflush is currently interested in
2500 * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2502 static int drbd_congested(void *congested_data, int bdi_bits)
2504 struct drbd_conf *mdev = congested_data;
2505 struct request_queue *q;
2509 if (!may_inc_ap_bio(mdev)) {
2510 /* DRBD has frozen IO */
2516 if (get_ldev(mdev)) {
2517 q = bdev_get_queue(mdev->ldev->backing_bdev);
2518 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2524 if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2525 r |= (1 << BDI_async_congested);
2526 reason = reason == 'b' ? 'a' : 'n';
2530 mdev->congestion_reason = reason;
2534 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2536 spin_lock_init(&wq->q_lock);
2537 INIT_LIST_HEAD(&wq->q);
2538 init_waitqueue_head(&wq->q_wait);
2541 struct drbd_tconn *conn_get_by_name(const char *name)
2543 struct drbd_tconn *tconn;
2545 if (!name || !name[0])
2549 list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2550 if (!strcmp(tconn->name, name)) {
2551 kref_get(&tconn->kref);
2561 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2562 void *peer_addr, int peer_addr_len)
2564 struct drbd_tconn *tconn;
2567 list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2568 if (tconn->my_addr_len == my_addr_len &&
2569 tconn->peer_addr_len == peer_addr_len &&
2570 !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2571 !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2572 kref_get(&tconn->kref);
2582 static int drbd_alloc_socket(struct drbd_socket *socket)
2584 socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2587 socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2593 static void drbd_free_socket(struct drbd_socket *socket)
2595 free_page((unsigned long) socket->sbuf);
2596 free_page((unsigned long) socket->rbuf);
2599 void conn_free_crypto(struct drbd_tconn *tconn)
2601 drbd_free_sock(tconn);
2603 crypto_free_hash(tconn->csums_tfm);
2604 crypto_free_hash(tconn->verify_tfm);
2605 crypto_free_hash(tconn->cram_hmac_tfm);
2606 crypto_free_hash(tconn->integrity_tfm);
2607 crypto_free_hash(tconn->peer_integrity_tfm);
2608 kfree(tconn->int_dig_in);
2609 kfree(tconn->int_dig_vv);
2611 tconn->csums_tfm = NULL;
2612 tconn->verify_tfm = NULL;
2613 tconn->cram_hmac_tfm = NULL;
2614 tconn->integrity_tfm = NULL;
2615 tconn->peer_integrity_tfm = NULL;
2616 tconn->int_dig_in = NULL;
2617 tconn->int_dig_vv = NULL;
2620 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2622 cpumask_var_t new_cpu_mask;
2625 if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2628 retcode = ERR_NOMEM;
2629 drbd_msg_put_info("unable to allocate cpumask");
2632 /* silently ignore cpu mask on UP kernel */
2633 if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2634 /* FIXME: Get rid of constant 32 here */
2635 err = bitmap_parse(res_opts->cpu_mask, 32,
2636 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2638 conn_warn(tconn, "bitmap_parse() failed with %d\n", err);
2639 /* retcode = ERR_CPU_MASK_PARSE; */
2643 tconn->res_opts = *res_opts;
2644 if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2645 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2646 drbd_calc_cpu_mask(tconn);
2647 tconn->receiver.reset_cpu_mask = 1;
2648 tconn->asender.reset_cpu_mask = 1;
2649 tconn->worker.reset_cpu_mask = 1;
2654 free_cpumask_var(new_cpu_mask);
2659 /* caller must be under genl_lock() */
2660 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2662 struct drbd_tconn *tconn;
2664 tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2668 tconn->name = kstrdup(name, GFP_KERNEL);
2672 if (drbd_alloc_socket(&tconn->data))
2674 if (drbd_alloc_socket(&tconn->meta))
2677 if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2680 if (set_resource_options(tconn, res_opts))
2683 if (!tl_init(tconn))
2686 tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2687 if (!tconn->current_epoch)
2689 INIT_LIST_HEAD(&tconn->current_epoch->list);
2691 spin_lock_init(&tconn->epoch_lock);
2692 tconn->write_ordering = WO_bdev_flush;
2694 tconn->cstate = C_STANDALONE;
2695 mutex_init(&tconn->cstate_mutex);
2696 spin_lock_init(&tconn->req_lock);
2697 mutex_init(&tconn->conf_update);
2698 init_waitqueue_head(&tconn->ping_wait);
2699 idr_init(&tconn->volumes);
2701 drbd_init_workqueue(&tconn->sender_work);
2702 mutex_init(&tconn->data.mutex);
2703 mutex_init(&tconn->meta.mutex);
2705 drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2706 drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2707 drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2709 kref_init(&tconn->kref);
2710 list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2715 kfree(tconn->current_epoch);
2717 free_cpumask_var(tconn->cpu_mask);
2718 drbd_free_socket(&tconn->meta);
2719 drbd_free_socket(&tconn->data);
2726 void conn_destroy(struct kref *kref)
2728 struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2730 if (atomic_read(&tconn->current_epoch->epoch_size) != 0)
2731 conn_err(tconn, "epoch_size:%d\n", atomic_read(&tconn->current_epoch->epoch_size));
2732 kfree(tconn->current_epoch);
2734 idr_destroy(&tconn->volumes);
2736 free_cpumask_var(tconn->cpu_mask);
2737 drbd_free_socket(&tconn->meta);
2738 drbd_free_socket(&tconn->data);
2740 kfree(tconn->int_dig_in);
2741 kfree(tconn->int_dig_vv);
2745 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2747 struct drbd_conf *mdev;
2748 struct gendisk *disk;
2749 struct request_queue *q;
2751 int minor_got = minor;
2752 enum drbd_ret_code err = ERR_NOMEM;
2754 mdev = minor_to_mdev(minor);
2756 return ERR_MINOR_EXISTS;
2758 /* GFP_KERNEL, we are outside of all write-out paths */
2759 mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2763 kref_get(&tconn->kref);
2764 mdev->tconn = tconn;
2766 mdev->minor = minor;
2769 drbd_init_set_defaults(mdev);
2771 q = blk_alloc_queue(GFP_KERNEL);
2775 q->queuedata = mdev;
2777 disk = alloc_disk(1);
2782 set_disk_ro(disk, true);
2785 disk->major = DRBD_MAJOR;
2786 disk->first_minor = minor;
2787 disk->fops = &drbd_ops;
2788 sprintf(disk->disk_name, "drbd%d", minor);
2789 disk->private_data = mdev;
2791 mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2792 /* we have no partitions. we contain only ourselves. */
2793 mdev->this_bdev->bd_contains = mdev->this_bdev;
2795 q->backing_dev_info.congested_fn = drbd_congested;
2796 q->backing_dev_info.congested_data = mdev;
2798 blk_queue_make_request(q, drbd_make_request);
2799 /* Setting the max_hw_sectors to an odd value of 8kibyte here
2800 This triggers a max_bio_size message upon first attach or connect */
2801 blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2802 blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2803 blk_queue_merge_bvec(q, drbd_merge_bvec);
2804 q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2806 mdev->md_io_page = alloc_page(GFP_KERNEL);
2807 if (!mdev->md_io_page)
2808 goto out_no_io_page;
2810 if (drbd_bm_init(mdev))
2812 mdev->read_requests = RB_ROOT;
2813 mdev->write_requests = RB_ROOT;
2815 if (!idr_pre_get(&minors, GFP_KERNEL))
2816 goto out_no_minor_idr;
2817 if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2818 goto out_no_minor_idr;
2819 if (minor_got != minor) {
2820 err = ERR_MINOR_EXISTS;
2821 drbd_msg_put_info("requested minor exists already");
2822 goto out_idr_remove_minor;
2825 if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2826 goto out_idr_remove_minor;
2827 if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2828 goto out_idr_remove_minor;
2829 if (vnr_got != vnr) {
2830 err = ERR_INVALID_REQUEST;
2831 drbd_msg_put_info("requested volume exists already");
2832 goto out_idr_remove_vol;
2835 kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2837 /* inherit the connection state */
2838 mdev->state.conn = tconn->cstate;
2839 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2840 drbd_connected(mdev);
2845 idr_remove(&tconn->volumes, vnr_got);
2846 out_idr_remove_minor:
2847 idr_remove(&minors, minor_got);
2850 drbd_bm_cleanup(mdev);
2852 __free_page(mdev->md_io_page);
2856 blk_cleanup_queue(q);
2859 kref_put(&tconn->kref, &conn_destroy);
2863 int __init drbd_init(void)
2867 if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2869 "drbd: invalid minor_count (%d)\n", minor_count);
2873 minor_count = DRBD_MINOR_COUNT_DEF;
2877 err = register_blkdev(DRBD_MAJOR, "drbd");
2880 "drbd: unable to register block device major %d\n",
2885 err = drbd_genl_register();
2887 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2892 register_reboot_notifier(&drbd_notifier);
2895 * allocate all necessary structs
2899 init_waitqueue_head(&drbd_pp_wait);
2901 drbd_proc = NULL; /* play safe for drbd_cleanup */
2904 err = drbd_create_mempools();
2908 drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2910 printk(KERN_ERR "drbd: unable to register proc file\n");
2914 rwlock_init(&global_state_lock);
2915 INIT_LIST_HEAD(&drbd_tconns);
2917 retry.wq = create_singlethread_workqueue("drbd-reissue");
2919 printk(KERN_ERR "drbd: unable to create retry workqueue\n");
2922 INIT_WORK(&retry.worker, do_retry);
2923 spin_lock_init(&retry.lock);
2924 INIT_LIST_HEAD(&retry.writes);
2926 printk(KERN_INFO "drbd: initialized. "
2927 "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2928 API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2929 printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2930 printk(KERN_INFO "drbd: registered as block device major %d\n",
2933 return 0; /* Success! */
2938 /* currently always the case */
2939 printk(KERN_ERR "drbd: ran out of memory\n");
2941 printk(KERN_ERR "drbd: initialization failure\n");
2945 void drbd_free_bc(struct drbd_backing_dev *ldev)
2950 blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2951 blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2956 void drbd_free_sock(struct drbd_tconn *tconn)
2958 if (tconn->data.socket) {
2959 mutex_lock(&tconn->data.mutex);
2960 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2961 sock_release(tconn->data.socket);
2962 tconn->data.socket = NULL;
2963 mutex_unlock(&tconn->data.mutex);
2965 if (tconn->meta.socket) {
2966 mutex_lock(&tconn->meta.mutex);
2967 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2968 sock_release(tconn->meta.socket);
2969 tconn->meta.socket = NULL;
2970 mutex_unlock(&tconn->meta.mutex);
2974 /* meta data management */
2976 struct meta_data_on_disk {
2977 u64 la_size; /* last agreed size. */
2978 u64 uuid[UI_SIZE]; /* UUIDs. */
2981 u32 flags; /* MDF */
2984 u32 al_offset; /* offset to this block */
2985 u32 al_nr_extents; /* important for restoring the AL */
2986 /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2987 u32 bm_offset; /* offset to the bitmap, from here */
2988 u32 bm_bytes_per_bit; /* BM_BLOCK_SIZE */
2989 u32 la_peer_max_bio_size; /* last peer max_bio_size */
2990 u32 reserved_u32[3];
2995 * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2996 * @mdev: DRBD device.
2998 void drbd_md_sync(struct drbd_conf *mdev)
3000 struct meta_data_on_disk *buffer;
3004 del_timer(&mdev->md_sync_timer);
3005 /* timer may be rearmed by drbd_md_mark_dirty() now. */
3006 if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3009 /* We use here D_FAILED and not D_ATTACHING because we try to write
3010 * metadata even if we detach due to a disk failure! */
3011 if (!get_ldev_if_state(mdev, D_FAILED))
3014 buffer = drbd_md_get_buffer(mdev);
3018 memset(buffer, 0, 512);
3020 buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3021 for (i = UI_CURRENT; i < UI_SIZE; i++)
3022 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3023 buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3024 buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
3026 buffer->md_size_sect = cpu_to_be32(mdev->ldev->md.md_size_sect);
3027 buffer->al_offset = cpu_to_be32(mdev->ldev->md.al_offset);
3028 buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3029 buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3030 buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3032 buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3033 buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3035 D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3036 sector = mdev->ldev->md.md_offset;
3038 if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3039 /* this was a try anyways ... */
3040 dev_err(DEV, "meta data update failed!\n");
3041 drbd_chk_io_error(mdev, 1, true);
3044 /* Update mdev->ldev->md.la_size_sect,
3045 * since we updated it on metadata. */
3046 mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3048 drbd_md_put_buffer(mdev);
3054 * drbd_md_read() - Reads in the meta data super block
3055 * @mdev: DRBD device.
3056 * @bdev: Device from which the meta data should be read in.
3058 * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3059 * something goes wrong.
3061 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3063 struct meta_data_on_disk *buffer;
3065 int i, rv = NO_ERROR;
3067 if (!get_ldev_if_state(mdev, D_ATTACHING))
3068 return ERR_IO_MD_DISK;
3070 buffer = drbd_md_get_buffer(mdev);
3074 if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3075 /* NOTE: can't do normal error processing here as this is
3076 called BEFORE disk is attached */
3077 dev_err(DEV, "Error while reading metadata.\n");
3078 rv = ERR_IO_MD_DISK;
3082 magic = be32_to_cpu(buffer->magic);
3083 flags = be32_to_cpu(buffer->flags);
3084 if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
3085 (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
3086 /* btw: that's Activity Log clean, not "all" clean. */
3087 dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
3088 rv = ERR_MD_UNCLEAN;
3091 if (magic != DRBD_MD_MAGIC_08) {
3092 if (magic == DRBD_MD_MAGIC_07)
3093 dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3095 dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3096 rv = ERR_MD_INVALID;
3099 if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3100 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3101 be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3102 rv = ERR_MD_INVALID;
3105 if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3106 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3107 be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3108 rv = ERR_MD_INVALID;
3111 if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3112 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3113 be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3114 rv = ERR_MD_INVALID;
3118 if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3119 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3120 be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3121 rv = ERR_MD_INVALID;
3125 bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3126 for (i = UI_CURRENT; i < UI_SIZE; i++)
3127 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3128 bdev->md.flags = be32_to_cpu(buffer->flags);
3129 bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3131 spin_lock_irq(&mdev->tconn->req_lock);
3132 if (mdev->state.conn < C_CONNECTED) {
3134 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3135 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3136 mdev->peer_max_bio_size = peer;
3138 spin_unlock_irq(&mdev->tconn->req_lock);
3141 drbd_md_put_buffer(mdev);
3149 * drbd_md_mark_dirty() - Mark meta data super block as dirty
3150 * @mdev: DRBD device.
3152 * Call this function if you change anything that should be written to
3153 * the meta-data super block. This function sets MD_DIRTY, and starts a
3154 * timer that ensures that within five seconds you have to call drbd_md_sync().
3157 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3159 if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3160 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3161 mdev->last_md_mark_dirty.line = line;
3162 mdev->last_md_mark_dirty.func = func;
3166 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3168 if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3169 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3173 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3177 for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3178 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3181 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3183 if (idx == UI_CURRENT) {
3184 if (mdev->state.role == R_PRIMARY)
3189 drbd_set_ed_uuid(mdev, val);
3192 mdev->ldev->md.uuid[idx] = val;
3193 drbd_md_mark_dirty(mdev);
3197 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3199 if (mdev->ldev->md.uuid[idx]) {
3200 drbd_uuid_move_history(mdev);
3201 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3203 _drbd_uuid_set(mdev, idx, val);
3207 * drbd_uuid_new_current() - Creates a new current UUID
3208 * @mdev: DRBD device.
3210 * Creates a new current UUID, and rotates the old current UUID into
3211 * the bitmap slot. Causes an incremental resync upon next connect.
3213 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3216 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3219 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3221 mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3223 get_random_bytes(&val, sizeof(u64));
3224 _drbd_uuid_set(mdev, UI_CURRENT, val);
3225 drbd_print_uuids(mdev, "new current UUID");
3226 /* get it to stable storage _now_ */
3230 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3232 if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3236 drbd_uuid_move_history(mdev);
3237 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3238 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3240 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3242 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3244 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3246 drbd_md_mark_dirty(mdev);
3250 * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3251 * @mdev: DRBD device.
3253 * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3255 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3259 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3260 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3262 drbd_bm_set_all(mdev);
3264 rv = drbd_bm_write(mdev);
3267 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3278 * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3279 * @mdev: DRBD device.
3281 * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3283 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3287 drbd_resume_al(mdev);
3288 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3289 drbd_bm_clear_all(mdev);
3290 rv = drbd_bm_write(mdev);
3297 static int w_bitmap_io(struct drbd_work *w, int unused)
3299 struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3300 struct drbd_conf *mdev = w->mdev;
3303 D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3305 if (get_ldev(mdev)) {
3306 drbd_bm_lock(mdev, work->why, work->flags);
3307 rv = work->io_fn(mdev);
3308 drbd_bm_unlock(mdev);
3312 clear_bit_unlock(BITMAP_IO, &mdev->flags);
3313 wake_up(&mdev->misc_wait);
3316 work->done(mdev, rv);
3318 clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3325 void drbd_ldev_destroy(struct drbd_conf *mdev)
3327 lc_destroy(mdev->resync);
3328 mdev->resync = NULL;
3329 lc_destroy(mdev->act_log);
3330 mdev->act_log = NULL;
3332 drbd_free_bc(mdev->ldev);
3333 mdev->ldev = NULL;);
3335 clear_bit(GO_DISKLESS, &mdev->flags);
3338 static int w_go_diskless(struct drbd_work *w, int unused)
3340 struct drbd_conf *mdev = w->mdev;
3342 D_ASSERT(mdev->state.disk == D_FAILED);
3343 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3344 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3345 * the protected members anymore, though, so once put_ldev reaches zero
3346 * again, it will be safe to free them. */
3347 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3351 void drbd_go_diskless(struct drbd_conf *mdev)
3353 D_ASSERT(mdev->state.disk == D_FAILED);
3354 if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3355 drbd_queue_work(&mdev->tconn->sender_work, &mdev->go_diskless);
3359 * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3360 * @mdev: DRBD device.
3361 * @io_fn: IO callback to be called when bitmap IO is possible
3362 * @done: callback to be called after the bitmap IO was performed
3363 * @why: Descriptive text of the reason for doing the IO
3365 * While IO on the bitmap happens we freeze application IO thus we ensure
3366 * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3367 * called from worker context. It MUST NOT be used while a previous such
3368 * work is still pending!
3370 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3371 int (*io_fn)(struct drbd_conf *),
3372 void (*done)(struct drbd_conf *, int),
3373 char *why, enum bm_flag flags)
3375 D_ASSERT(current == mdev->tconn->worker.task);
3377 D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3378 D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3379 D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3380 if (mdev->bm_io_work.why)
3381 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3382 why, mdev->bm_io_work.why);
3384 mdev->bm_io_work.io_fn = io_fn;
3385 mdev->bm_io_work.done = done;
3386 mdev->bm_io_work.why = why;
3387 mdev->bm_io_work.flags = flags;
3389 spin_lock_irq(&mdev->tconn->req_lock);
3390 set_bit(BITMAP_IO, &mdev->flags);
3391 if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3392 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3393 drbd_queue_work(&mdev->tconn->sender_work, &mdev->bm_io_work.w);
3395 spin_unlock_irq(&mdev->tconn->req_lock);
3399 * drbd_bitmap_io() - Does an IO operation on the whole bitmap
3400 * @mdev: DRBD device.
3401 * @io_fn: IO callback to be called when bitmap IO is possible
3402 * @why: Descriptive text of the reason for doing the IO
3404 * freezes application IO while that the actual IO operations runs. This
3405 * functions MAY NOT be called from worker context.
3407 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3408 char *why, enum bm_flag flags)
3412 D_ASSERT(current != mdev->tconn->worker.task);
3414 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3415 drbd_suspend_io(mdev);
3417 drbd_bm_lock(mdev, why, flags);
3419 drbd_bm_unlock(mdev);
3421 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3422 drbd_resume_io(mdev);
3427 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3429 if ((mdev->ldev->md.flags & flag) != flag) {
3430 drbd_md_mark_dirty(mdev);
3431 mdev->ldev->md.flags |= flag;
3435 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3437 if ((mdev->ldev->md.flags & flag) != 0) {
3438 drbd_md_mark_dirty(mdev);
3439 mdev->ldev->md.flags &= ~flag;
3442 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3444 return (bdev->md.flags & flag) != 0;
3447 static void md_sync_timer_fn(unsigned long data)
3449 struct drbd_conf *mdev = (struct drbd_conf *) data;
3451 drbd_queue_work_front(&mdev->tconn->sender_work, &mdev->md_sync_work);
3454 static int w_md_sync(struct drbd_work *w, int unused)
3456 struct drbd_conf *mdev = w->mdev;
3458 dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3460 dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3461 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3467 const char *cmdname(enum drbd_packet cmd)
3469 /* THINK may need to become several global tables
3470 * when we want to support more than
3471 * one PRO_VERSION */
3472 static const char *cmdnames[] = {
3474 [P_DATA_REPLY] = "DataReply",
3475 [P_RS_DATA_REPLY] = "RSDataReply",
3476 [P_BARRIER] = "Barrier",
3477 [P_BITMAP] = "ReportBitMap",
3478 [P_BECOME_SYNC_TARGET] = "BecomeSyncTarget",
3479 [P_BECOME_SYNC_SOURCE] = "BecomeSyncSource",
3480 [P_UNPLUG_REMOTE] = "UnplugRemote",
3481 [P_DATA_REQUEST] = "DataRequest",
3482 [P_RS_DATA_REQUEST] = "RSDataRequest",
3483 [P_SYNC_PARAM] = "SyncParam",
3484 [P_SYNC_PARAM89] = "SyncParam89",
3485 [P_PROTOCOL] = "ReportProtocol",
3486 [P_UUIDS] = "ReportUUIDs",
3487 [P_SIZES] = "ReportSizes",
3488 [P_STATE] = "ReportState",
3489 [P_SYNC_UUID] = "ReportSyncUUID",
3490 [P_AUTH_CHALLENGE] = "AuthChallenge",
3491 [P_AUTH_RESPONSE] = "AuthResponse",
3493 [P_PING_ACK] = "PingAck",
3494 [P_RECV_ACK] = "RecvAck",
3495 [P_WRITE_ACK] = "WriteAck",
3496 [P_RS_WRITE_ACK] = "RSWriteAck",
3497 [P_DISCARD_WRITE] = "DiscardWrite",
3498 [P_NEG_ACK] = "NegAck",
3499 [P_NEG_DREPLY] = "NegDReply",
3500 [P_NEG_RS_DREPLY] = "NegRSDReply",
3501 [P_BARRIER_ACK] = "BarrierAck",
3502 [P_STATE_CHG_REQ] = "StateChgRequest",
3503 [P_STATE_CHG_REPLY] = "StateChgReply",
3504 [P_OV_REQUEST] = "OVRequest",
3505 [P_OV_REPLY] = "OVReply",
3506 [P_OV_RESULT] = "OVResult",
3507 [P_CSUM_RS_REQUEST] = "CsumRSRequest",
3508 [P_RS_IS_IN_SYNC] = "CsumRSIsInSync",
3509 [P_COMPRESSED_BITMAP] = "CBitmap",
3510 [P_DELAY_PROBE] = "DelayProbe",
3511 [P_OUT_OF_SYNC] = "OutOfSync",
3512 [P_RETRY_WRITE] = "RetryWrite",
3513 [P_RS_CANCEL] = "RSCancel",
3514 [P_CONN_ST_CHG_REQ] = "conn_st_chg_req",
3515 [P_CONN_ST_CHG_REPLY] = "conn_st_chg_reply",
3516 [P_RETRY_WRITE] = "retry_write",
3517 [P_PROTOCOL_UPDATE] = "protocol_update",
3519 /* enum drbd_packet, but not commands - obsoleted flags:
3525 /* too big for the array: 0xfffX */
3526 if (cmd == P_INITIAL_META)
3527 return "InitialMeta";
3528 if (cmd == P_INITIAL_DATA)
3529 return "InitialData";
3530 if (cmd == P_CONNECTION_FEATURES)
3531 return "ConnectionFeatures";
3532 if (cmd >= ARRAY_SIZE(cmdnames))
3534 return cmdnames[cmd];
3538 * drbd_wait_misc - wait for a request to make progress
3539 * @mdev: device associated with the request
3540 * @i: the struct drbd_interval embedded in struct drbd_request or
3541 * struct drbd_peer_request
3543 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3545 struct net_conf *nc;
3550 nc = rcu_dereference(mdev->tconn->net_conf);
3555 timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3558 /* Indicate to wake up mdev->misc_wait on progress. */
3560 prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3561 spin_unlock_irq(&mdev->tconn->req_lock);
3562 timeout = schedule_timeout(timeout);
3563 finish_wait(&mdev->misc_wait, &wait);
3564 spin_lock_irq(&mdev->tconn->req_lock);
3565 if (!timeout || mdev->state.conn < C_CONNECTED)
3567 if (signal_pending(current))
3568 return -ERESTARTSYS;
3572 #ifdef CONFIG_DRBD_FAULT_INJECTION
3573 /* Fault insertion support including random number generator shamelessly
3574 * stolen from kernel/rcutorture.c */
3575 struct fault_random_state {
3576 unsigned long state;
3577 unsigned long count;
3580 #define FAULT_RANDOM_MULT 39916801 /* prime */
3581 #define FAULT_RANDOM_ADD 479001701 /* prime */
3582 #define FAULT_RANDOM_REFRESH 10000
3585 * Crude but fast random-number generator. Uses a linear congruential
3586 * generator, with occasional help from get_random_bytes().
3588 static unsigned long
3589 _drbd_fault_random(struct fault_random_state *rsp)
3593 if (!rsp->count--) {
3594 get_random_bytes(&refresh, sizeof(refresh));
3595 rsp->state += refresh;
3596 rsp->count = FAULT_RANDOM_REFRESH;
3598 rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3599 return swahw32(rsp->state);
3603 _drbd_fault_str(unsigned int type) {
3604 static char *_faults[] = {
3605 [DRBD_FAULT_MD_WR] = "Meta-data write",
3606 [DRBD_FAULT_MD_RD] = "Meta-data read",
3607 [DRBD_FAULT_RS_WR] = "Resync write",
3608 [DRBD_FAULT_RS_RD] = "Resync read",
3609 [DRBD_FAULT_DT_WR] = "Data write",
3610 [DRBD_FAULT_DT_RD] = "Data read",
3611 [DRBD_FAULT_DT_RA] = "Data read ahead",
3612 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3613 [DRBD_FAULT_AL_EE] = "EE allocation",
3614 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3617 return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3621 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3623 static struct fault_random_state rrs = {0, 0};
3625 unsigned int ret = (
3627 ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3628 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3633 if (__ratelimit(&drbd_ratelimit_state))
3634 dev_warn(DEV, "***Simulating %s failure\n",
3635 _drbd_fault_str(type));
3642 const char *drbd_buildtag(void)
3644 /* DRBD built from external sources has here a reference to the
3645 git hash of the source code. */
3647 static char buildtag[38] = "\0uilt-in";
3649 if (buildtag[0] == 0) {
3650 #ifdef CONFIG_MODULES
3651 if (THIS_MODULE != NULL)
3652 sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3661 module_init(drbd_init)
3662 module_exit(drbd_cleanup)
3664 EXPORT_SYMBOL(drbd_conn_str);
3665 EXPORT_SYMBOL(drbd_role_str);
3666 EXPORT_SYMBOL(drbd_disk_str);
3667 EXPORT_SYMBOL(drbd_set_st_err_str);