drbd: move the drbd_work_queue from drbd_socket to drbd_connection
[linux-2.6-block.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = atomic_inc_return(&tconn->current_tle_nr);
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218         INIT_LIST_HEAD(&tconn->barrier_acked_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         INIT_LIST_HEAD(&new->requests);
245         INIT_LIST_HEAD(&new->w.list);
246         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
247         new->next = NULL;
248         new->n_writes = 0;
249
250         new->br_number = atomic_inc_return(&tconn->current_tle_nr);
251         if (tconn->newest_tle != new) {
252                 tconn->newest_tle->next = new;
253                 tconn->newest_tle = new;
254         }
255 }
256
257 /**
258  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
259  * @mdev:       DRBD device.
260  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
261  * @set_size:   Expected number of requests before that barrier.
262  *
263  * In case the passed barrier_nr or set_size does not match the oldest
264  * &struct drbd_tl_epoch objects this function will cause a termination
265  * of the connection.
266  */
267 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
268                 unsigned int set_size)
269 {
270         struct drbd_conf *mdev;
271         struct drbd_tl_epoch *b, *nob; /* next old barrier */
272         struct list_head *le, *tle;
273         struct drbd_request *r;
274
275         spin_lock_irq(&tconn->req_lock);
276
277         b = tconn->oldest_tle;
278
279         /* first some paranoia code */
280         if (b == NULL) {
281                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
282                          barrier_nr);
283                 goto bail;
284         }
285         if (b->br_number != barrier_nr) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
287                          barrier_nr, b->br_number);
288                 goto bail;
289         }
290         if (b->n_writes != set_size) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
292                          barrier_nr, set_size, b->n_writes);
293                 goto bail;
294         }
295
296         /* Clean up list of requests processed during current epoch */
297         list_for_each_safe(le, tle, &b->requests) {
298                 r = list_entry(le, struct drbd_request, tl_requests);
299                 _req_mod(r, BARRIER_ACKED);
300         }
301         /* There could be requests on the list waiting for completion
302            of the write to the local disk. To avoid corruptions of
303            slab's data structures we have to remove the lists head.
304
305            Also there could have been a barrier ack out of sequence, overtaking
306            the write acks - which would be a bug and violating write ordering.
307            To not deadlock in case we lose connection while such requests are
308            still pending, we need some way to find them for the
309            _req_mode(CONNECTION_LOST_WHILE_PENDING).
310
311            These have been list_move'd to the out_of_sequence_requests list in
312            _req_mod(, BARRIER_ACKED) above.
313            */
314         list_splice_init(&b->requests, &tconn->barrier_acked_requests);
315         mdev = b->w.mdev;
316
317         nob = b->next;
318         if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
319                 _tl_add_barrier(tconn, b);
320                 if (nob)
321                         tconn->oldest_tle = nob;
322                 /* if nob == NULL b was the only barrier, and becomes the new
323                    barrier. Therefore tconn->oldest_tle points already to b */
324         } else {
325                 D_ASSERT(nob != NULL);
326                 tconn->oldest_tle = nob;
327                 kfree(b);
328         }
329
330         spin_unlock_irq(&tconn->req_lock);
331         dec_ap_pending(mdev);
332
333         return;
334
335 bail:
336         spin_unlock_irq(&tconn->req_lock);
337         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
338 }
339
340
341 /**
342  * _tl_restart() - Walks the transfer log, and applies an action to all requests
343  * @mdev:       DRBD device.
344  * @what:       The action/event to perform with all request objects
345  *
346  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
347  * RESTART_FROZEN_DISK_IO.
348  */
349 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
350 {
351         struct drbd_tl_epoch *b, *tmp, **pn;
352         struct list_head *le, *tle, carry_reads;
353         struct drbd_request *req;
354         int rv, n_writes, n_reads;
355
356         b = tconn->oldest_tle;
357         pn = &tconn->oldest_tle;
358         while (b) {
359                 n_writes = 0;
360                 n_reads = 0;
361                 INIT_LIST_HEAD(&carry_reads);
362                 list_for_each_safe(le, tle, &b->requests) {
363                         req = list_entry(le, struct drbd_request, tl_requests);
364                         rv = _req_mod(req, what);
365
366                         if (rv & MR_WRITE)
367                                 n_writes++;
368                         if (rv & MR_READ)
369                                 n_reads++;
370                 }
371                 tmp = b->next;
372
373                 if (n_writes) {
374                         if (what == RESEND) {
375                                 b->n_writes = n_writes;
376                                 if (b->w.cb == NULL) {
377                                         b->w.cb = w_send_barrier;
378                                         inc_ap_pending(b->w.mdev);
379                                         set_bit(CREATE_BARRIER, &tconn->flags);
380                                 }
381
382                                 drbd_queue_work(&tconn->sender_work, &b->w);
383                         }
384                         pn = &b->next;
385                 } else {
386                         if (n_reads)
387                                 list_add(&carry_reads, &b->requests);
388                         /* there could still be requests on that ring list,
389                          * in case local io is still pending */
390                         list_del(&b->requests);
391
392                         /* dec_ap_pending corresponding to queue_barrier.
393                          * the newest barrier may not have been queued yet,
394                          * in which case w.cb is still NULL. */
395                         if (b->w.cb != NULL)
396                                 dec_ap_pending(b->w.mdev);
397
398                         if (b == tconn->newest_tle) {
399                                 /* recycle, but reinit! */
400                                 if (tmp != NULL)
401                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
402                                 INIT_LIST_HEAD(&b->requests);
403                                 list_splice(&carry_reads, &b->requests);
404                                 INIT_LIST_HEAD(&b->w.list);
405                                 b->w.cb = NULL;
406                                 b->br_number = atomic_inc_return(&tconn->current_tle_nr);
407                                 b->n_writes = 0;
408
409                                 *pn = b;
410                                 break;
411                         }
412                         *pn = tmp;
413                         kfree(b);
414                 }
415                 b = tmp;
416                 list_splice(&carry_reads, &b->requests);
417         }
418
419         /* Actions operating on the disk state, also want to work on
420            requests that got barrier acked. */
421         switch (what) {
422         case FAIL_FROZEN_DISK_IO:
423         case RESTART_FROZEN_DISK_IO:
424                 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
425                         req = list_entry(le, struct drbd_request, tl_requests);
426                         _req_mod(req, what);
427                 }
428         case CONNECTION_LOST_WHILE_PENDING:
429         case RESEND:
430                 break;
431         default:
432                 conn_err(tconn, "what = %d in _tl_restart()\n", what);
433         }
434 }
435
436 /**
437  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
438  * @mdev:       DRBD device.
439  *
440  * This is called after the connection to the peer was lost. The storage covered
441  * by the requests on the transfer gets marked as our of sync. Called from the
442  * receiver thread and the worker thread.
443  */
444 void tl_clear(struct drbd_tconn *tconn)
445 {
446         struct list_head *le, *tle;
447         struct drbd_request *r;
448
449         spin_lock_irq(&tconn->req_lock);
450
451         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
452
453         /* we expect this list to be empty. */
454         if (!list_empty(&tconn->out_of_sequence_requests))
455                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
456
457         /* but just in case, clean it up anyways! */
458         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
459                 r = list_entry(le, struct drbd_request, tl_requests);
460                 /* It would be nice to complete outside of spinlock.
461                  * But this is easier for now. */
462                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
463         }
464
465         /* ensure bit indicating barrier is required is clear */
466         clear_bit(CREATE_BARRIER, &tconn->flags);
467
468         spin_unlock_irq(&tconn->req_lock);
469 }
470
471 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
472 {
473         spin_lock_irq(&tconn->req_lock);
474         _tl_restart(tconn, what);
475         spin_unlock_irq(&tconn->req_lock);
476 }
477
478 /**
479  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
480  * @mdev:       DRBD device.
481  */
482 void tl_abort_disk_io(struct drbd_conf *mdev)
483 {
484         struct drbd_tconn *tconn = mdev->tconn;
485         struct drbd_tl_epoch *b;
486         struct list_head *le, *tle;
487         struct drbd_request *req;
488
489         spin_lock_irq(&tconn->req_lock);
490         b = tconn->oldest_tle;
491         while (b) {
492                 list_for_each_safe(le, tle, &b->requests) {
493                         req = list_entry(le, struct drbd_request, tl_requests);
494                         if (!(req->rq_state & RQ_LOCAL_PENDING))
495                                 continue;
496                         if (req->w.mdev == mdev)
497                                 _req_mod(req, ABORT_DISK_IO);
498                 }
499                 b = b->next;
500         }
501
502         list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
503                 req = list_entry(le, struct drbd_request, tl_requests);
504                 if (!(req->rq_state & RQ_LOCAL_PENDING))
505                         continue;
506                 if (req->w.mdev == mdev)
507                         _req_mod(req, ABORT_DISK_IO);
508         }
509
510         spin_unlock_irq(&tconn->req_lock);
511 }
512
513 static int drbd_thread_setup(void *arg)
514 {
515         struct drbd_thread *thi = (struct drbd_thread *) arg;
516         struct drbd_tconn *tconn = thi->tconn;
517         unsigned long flags;
518         int retval;
519
520         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
521                  thi->name[0], thi->tconn->name);
522
523 restart:
524         retval = thi->function(thi);
525
526         spin_lock_irqsave(&thi->t_lock, flags);
527
528         /* if the receiver has been "EXITING", the last thing it did
529          * was set the conn state to "StandAlone",
530          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
531          * and receiver thread will be "started".
532          * drbd_thread_start needs to set "RESTARTING" in that case.
533          * t_state check and assignment needs to be within the same spinlock,
534          * so either thread_start sees EXITING, and can remap to RESTARTING,
535          * or thread_start see NONE, and can proceed as normal.
536          */
537
538         if (thi->t_state == RESTARTING) {
539                 conn_info(tconn, "Restarting %s thread\n", thi->name);
540                 thi->t_state = RUNNING;
541                 spin_unlock_irqrestore(&thi->t_lock, flags);
542                 goto restart;
543         }
544
545         thi->task = NULL;
546         thi->t_state = NONE;
547         smp_mb();
548         complete_all(&thi->stop);
549         spin_unlock_irqrestore(&thi->t_lock, flags);
550
551         conn_info(tconn, "Terminating %s\n", current->comm);
552
553         /* Release mod reference taken when thread was started */
554
555         kref_put(&tconn->kref, &conn_destroy);
556         module_put(THIS_MODULE);
557         return retval;
558 }
559
560 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
561                              int (*func) (struct drbd_thread *), char *name)
562 {
563         spin_lock_init(&thi->t_lock);
564         thi->task    = NULL;
565         thi->t_state = NONE;
566         thi->function = func;
567         thi->tconn = tconn;
568         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
569 }
570
571 int drbd_thread_start(struct drbd_thread *thi)
572 {
573         struct drbd_tconn *tconn = thi->tconn;
574         struct task_struct *nt;
575         unsigned long flags;
576
577         /* is used from state engine doing drbd_thread_stop_nowait,
578          * while holding the req lock irqsave */
579         spin_lock_irqsave(&thi->t_lock, flags);
580
581         switch (thi->t_state) {
582         case NONE:
583                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
584                          thi->name, current->comm, current->pid);
585
586                 /* Get ref on module for thread - this is released when thread exits */
587                 if (!try_module_get(THIS_MODULE)) {
588                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
589                         spin_unlock_irqrestore(&thi->t_lock, flags);
590                         return false;
591                 }
592
593                 kref_get(&thi->tconn->kref);
594
595                 init_completion(&thi->stop);
596                 thi->reset_cpu_mask = 1;
597                 thi->t_state = RUNNING;
598                 spin_unlock_irqrestore(&thi->t_lock, flags);
599                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
600
601                 nt = kthread_create(drbd_thread_setup, (void *) thi,
602                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
603
604                 if (IS_ERR(nt)) {
605                         conn_err(tconn, "Couldn't start thread\n");
606
607                         kref_put(&tconn->kref, &conn_destroy);
608                         module_put(THIS_MODULE);
609                         return false;
610                 }
611                 spin_lock_irqsave(&thi->t_lock, flags);
612                 thi->task = nt;
613                 thi->t_state = RUNNING;
614                 spin_unlock_irqrestore(&thi->t_lock, flags);
615                 wake_up_process(nt);
616                 break;
617         case EXITING:
618                 thi->t_state = RESTARTING;
619                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
620                                 thi->name, current->comm, current->pid);
621                 /* fall through */
622         case RUNNING:
623         case RESTARTING:
624         default:
625                 spin_unlock_irqrestore(&thi->t_lock, flags);
626                 break;
627         }
628
629         return true;
630 }
631
632
633 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
634 {
635         unsigned long flags;
636
637         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
638
639         /* may be called from state engine, holding the req lock irqsave */
640         spin_lock_irqsave(&thi->t_lock, flags);
641
642         if (thi->t_state == NONE) {
643                 spin_unlock_irqrestore(&thi->t_lock, flags);
644                 if (restart)
645                         drbd_thread_start(thi);
646                 return;
647         }
648
649         if (thi->t_state != ns) {
650                 if (thi->task == NULL) {
651                         spin_unlock_irqrestore(&thi->t_lock, flags);
652                         return;
653                 }
654
655                 thi->t_state = ns;
656                 smp_mb();
657                 init_completion(&thi->stop);
658                 if (thi->task != current)
659                         force_sig(DRBD_SIGKILL, thi->task);
660         }
661
662         spin_unlock_irqrestore(&thi->t_lock, flags);
663
664         if (wait)
665                 wait_for_completion(&thi->stop);
666 }
667
668 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
669 {
670         struct drbd_thread *thi =
671                 task == tconn->receiver.task ? &tconn->receiver :
672                 task == tconn->asender.task  ? &tconn->asender :
673                 task == tconn->worker.task   ? &tconn->worker : NULL;
674
675         return thi;
676 }
677
678 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
679 {
680         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
681         return thi ? thi->name : task->comm;
682 }
683
684 int conn_lowest_minor(struct drbd_tconn *tconn)
685 {
686         struct drbd_conf *mdev;
687         int vnr = 0, m;
688
689         rcu_read_lock();
690         mdev = idr_get_next(&tconn->volumes, &vnr);
691         m = mdev ? mdev_to_minor(mdev) : -1;
692         rcu_read_unlock();
693
694         return m;
695 }
696
697 #ifdef CONFIG_SMP
698 /**
699  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
700  * @mdev:       DRBD device.
701  *
702  * Forces all threads of a device onto the same CPU. This is beneficial for
703  * DRBD's performance. May be overwritten by user's configuration.
704  */
705 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
706 {
707         int ord, cpu;
708
709         /* user override. */
710         if (cpumask_weight(tconn->cpu_mask))
711                 return;
712
713         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
714         for_each_online_cpu(cpu) {
715                 if (ord-- == 0) {
716                         cpumask_set_cpu(cpu, tconn->cpu_mask);
717                         return;
718                 }
719         }
720         /* should not be reached */
721         cpumask_setall(tconn->cpu_mask);
722 }
723
724 /**
725  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
726  * @mdev:       DRBD device.
727  * @thi:        drbd_thread object
728  *
729  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
730  * prematurely.
731  */
732 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
733 {
734         struct task_struct *p = current;
735
736         if (!thi->reset_cpu_mask)
737                 return;
738         thi->reset_cpu_mask = 0;
739         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
740 }
741 #endif
742
743 /**
744  * drbd_header_size  -  size of a packet header
745  *
746  * The header size is a multiple of 8, so any payload following the header is
747  * word aligned on 64-bit architectures.  (The bitmap send and receive code
748  * relies on this.)
749  */
750 unsigned int drbd_header_size(struct drbd_tconn *tconn)
751 {
752         if (tconn->agreed_pro_version >= 100) {
753                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
754                 return sizeof(struct p_header100);
755         } else {
756                 BUILD_BUG_ON(sizeof(struct p_header80) !=
757                              sizeof(struct p_header95));
758                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
759                 return sizeof(struct p_header80);
760         }
761 }
762
763 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
764 {
765         h->magic   = cpu_to_be32(DRBD_MAGIC);
766         h->command = cpu_to_be16(cmd);
767         h->length  = cpu_to_be16(size);
768         return sizeof(struct p_header80);
769 }
770
771 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
772 {
773         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
774         h->command = cpu_to_be16(cmd);
775         h->length = cpu_to_be32(size);
776         return sizeof(struct p_header95);
777 }
778
779 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
780                                       int size, int vnr)
781 {
782         h->magic = cpu_to_be32(DRBD_MAGIC_100);
783         h->volume = cpu_to_be16(vnr);
784         h->command = cpu_to_be16(cmd);
785         h->length = cpu_to_be32(size);
786         h->pad = 0;
787         return sizeof(struct p_header100);
788 }
789
790 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
791                                    void *buffer, enum drbd_packet cmd, int size)
792 {
793         if (tconn->agreed_pro_version >= 100)
794                 return prepare_header100(buffer, cmd, size, vnr);
795         else if (tconn->agreed_pro_version >= 95 &&
796                  size > DRBD_MAX_SIZE_H80_PACKET)
797                 return prepare_header95(buffer, cmd, size);
798         else
799                 return prepare_header80(buffer, cmd, size);
800 }
801
802 static void *__conn_prepare_command(struct drbd_tconn *tconn,
803                                     struct drbd_socket *sock)
804 {
805         if (!sock->socket)
806                 return NULL;
807         return sock->sbuf + drbd_header_size(tconn);
808 }
809
810 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
811 {
812         void *p;
813
814         mutex_lock(&sock->mutex);
815         p = __conn_prepare_command(tconn, sock);
816         if (!p)
817                 mutex_unlock(&sock->mutex);
818
819         return p;
820 }
821
822 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
823 {
824         return conn_prepare_command(mdev->tconn, sock);
825 }
826
827 static int __send_command(struct drbd_tconn *tconn, int vnr,
828                           struct drbd_socket *sock, enum drbd_packet cmd,
829                           unsigned int header_size, void *data,
830                           unsigned int size)
831 {
832         int msg_flags;
833         int err;
834
835         /*
836          * Called with @data == NULL and the size of the data blocks in @size
837          * for commands that send data blocks.  For those commands, omit the
838          * MSG_MORE flag: this will increase the likelihood that data blocks
839          * which are page aligned on the sender will end up page aligned on the
840          * receiver.
841          */
842         msg_flags = data ? MSG_MORE : 0;
843
844         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
845                                       header_size + size);
846         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
847                             msg_flags);
848         if (data && !err)
849                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
850         return err;
851 }
852
853 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
854                                enum drbd_packet cmd, unsigned int header_size,
855                                void *data, unsigned int size)
856 {
857         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
858 }
859
860 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
861                       enum drbd_packet cmd, unsigned int header_size,
862                       void *data, unsigned int size)
863 {
864         int err;
865
866         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
867         mutex_unlock(&sock->mutex);
868         return err;
869 }
870
871 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
872                       enum drbd_packet cmd, unsigned int header_size,
873                       void *data, unsigned int size)
874 {
875         int err;
876
877         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
878                              data, size);
879         mutex_unlock(&sock->mutex);
880         return err;
881 }
882
883 int drbd_send_ping(struct drbd_tconn *tconn)
884 {
885         struct drbd_socket *sock;
886
887         sock = &tconn->meta;
888         if (!conn_prepare_command(tconn, sock))
889                 return -EIO;
890         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
891 }
892
893 int drbd_send_ping_ack(struct drbd_tconn *tconn)
894 {
895         struct drbd_socket *sock;
896
897         sock = &tconn->meta;
898         if (!conn_prepare_command(tconn, sock))
899                 return -EIO;
900         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
901 }
902
903 int drbd_send_sync_param(struct drbd_conf *mdev)
904 {
905         struct drbd_socket *sock;
906         struct p_rs_param_95 *p;
907         int size;
908         const int apv = mdev->tconn->agreed_pro_version;
909         enum drbd_packet cmd;
910         struct net_conf *nc;
911         struct disk_conf *dc;
912
913         sock = &mdev->tconn->data;
914         p = drbd_prepare_command(mdev, sock);
915         if (!p)
916                 return -EIO;
917
918         rcu_read_lock();
919         nc = rcu_dereference(mdev->tconn->net_conf);
920
921         size = apv <= 87 ? sizeof(struct p_rs_param)
922                 : apv == 88 ? sizeof(struct p_rs_param)
923                         + strlen(nc->verify_alg) + 1
924                 : apv <= 94 ? sizeof(struct p_rs_param_89)
925                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
926
927         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
928
929         /* initialize verify_alg and csums_alg */
930         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
931
932         if (get_ldev(mdev)) {
933                 dc = rcu_dereference(mdev->ldev->disk_conf);
934                 p->resync_rate = cpu_to_be32(dc->resync_rate);
935                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
936                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
937                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
938                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
939                 put_ldev(mdev);
940         } else {
941                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
942                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
943                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
944                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
945                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
946         }
947
948         if (apv >= 88)
949                 strcpy(p->verify_alg, nc->verify_alg);
950         if (apv >= 89)
951                 strcpy(p->csums_alg, nc->csums_alg);
952         rcu_read_unlock();
953
954         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
955 }
956
957 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
958 {
959         struct drbd_socket *sock;
960         struct p_protocol *p;
961         struct net_conf *nc;
962         int size, cf;
963
964         sock = &tconn->data;
965         p = __conn_prepare_command(tconn, sock);
966         if (!p)
967                 return -EIO;
968
969         rcu_read_lock();
970         nc = rcu_dereference(tconn->net_conf);
971
972         if (nc->tentative && tconn->agreed_pro_version < 92) {
973                 rcu_read_unlock();
974                 mutex_unlock(&sock->mutex);
975                 conn_err(tconn, "--dry-run is not supported by peer");
976                 return -EOPNOTSUPP;
977         }
978
979         size = sizeof(*p);
980         if (tconn->agreed_pro_version >= 87)
981                 size += strlen(nc->integrity_alg) + 1;
982
983         p->protocol      = cpu_to_be32(nc->wire_protocol);
984         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
985         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
986         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
987         p->two_primaries = cpu_to_be32(nc->two_primaries);
988         cf = 0;
989         if (nc->discard_my_data)
990                 cf |= CF_DISCARD_MY_DATA;
991         if (nc->tentative)
992                 cf |= CF_DRY_RUN;
993         p->conn_flags    = cpu_to_be32(cf);
994
995         if (tconn->agreed_pro_version >= 87)
996                 strcpy(p->integrity_alg, nc->integrity_alg);
997         rcu_read_unlock();
998
999         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1000 }
1001
1002 int drbd_send_protocol(struct drbd_tconn *tconn)
1003 {
1004         int err;
1005
1006         mutex_lock(&tconn->data.mutex);
1007         err = __drbd_send_protocol(tconn, P_PROTOCOL);
1008         mutex_unlock(&tconn->data.mutex);
1009
1010         return err;
1011 }
1012
1013 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1014 {
1015         struct drbd_socket *sock;
1016         struct p_uuids *p;
1017         int i;
1018
1019         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1020                 return 0;
1021
1022         sock = &mdev->tconn->data;
1023         p = drbd_prepare_command(mdev, sock);
1024         if (!p) {
1025                 put_ldev(mdev);
1026                 return -EIO;
1027         }
1028         for (i = UI_CURRENT; i < UI_SIZE; i++)
1029                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1030
1031         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1032         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1033         rcu_read_lock();
1034         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1035         rcu_read_unlock();
1036         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1037         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1038         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1039
1040         put_ldev(mdev);
1041         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1042 }
1043
1044 int drbd_send_uuids(struct drbd_conf *mdev)
1045 {
1046         return _drbd_send_uuids(mdev, 0);
1047 }
1048
1049 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1050 {
1051         return _drbd_send_uuids(mdev, 8);
1052 }
1053
1054 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1055 {
1056         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1057                 u64 *uuid = mdev->ldev->md.uuid;
1058                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1059                      text,
1060                      (unsigned long long)uuid[UI_CURRENT],
1061                      (unsigned long long)uuid[UI_BITMAP],
1062                      (unsigned long long)uuid[UI_HISTORY_START],
1063                      (unsigned long long)uuid[UI_HISTORY_END]);
1064                 put_ldev(mdev);
1065         } else {
1066                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1067                                 text,
1068                                 (unsigned long long)mdev->ed_uuid);
1069         }
1070 }
1071
1072 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1073 {
1074         struct drbd_socket *sock;
1075         struct p_rs_uuid *p;
1076         u64 uuid;
1077
1078         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1079
1080         uuid = mdev->ldev->md.uuid[UI_BITMAP];
1081         if (uuid && uuid != UUID_JUST_CREATED)
1082                 uuid = uuid + UUID_NEW_BM_OFFSET;
1083         else
1084                 get_random_bytes(&uuid, sizeof(u64));
1085         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1086         drbd_print_uuids(mdev, "updated sync UUID");
1087         drbd_md_sync(mdev);
1088
1089         sock = &mdev->tconn->data;
1090         p = drbd_prepare_command(mdev, sock);
1091         if (p) {
1092                 p->uuid = cpu_to_be64(uuid);
1093                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1094         }
1095 }
1096
1097 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1098 {
1099         struct drbd_socket *sock;
1100         struct p_sizes *p;
1101         sector_t d_size, u_size;
1102         int q_order_type, max_bio_size;
1103
1104         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1105                 D_ASSERT(mdev->ldev->backing_bdev);
1106                 d_size = drbd_get_max_capacity(mdev->ldev);
1107                 rcu_read_lock();
1108                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1109                 rcu_read_unlock();
1110                 q_order_type = drbd_queue_order_type(mdev);
1111                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1112                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1113                 put_ldev(mdev);
1114         } else {
1115                 d_size = 0;
1116                 u_size = 0;
1117                 q_order_type = QUEUE_ORDERED_NONE;
1118                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1119         }
1120
1121         sock = &mdev->tconn->data;
1122         p = drbd_prepare_command(mdev, sock);
1123         if (!p)
1124                 return -EIO;
1125
1126         if (mdev->tconn->agreed_pro_version <= 94)
1127                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1128         else if (mdev->tconn->agreed_pro_version < 100)
1129                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1130
1131         p->d_size = cpu_to_be64(d_size);
1132         p->u_size = cpu_to_be64(u_size);
1133         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1134         p->max_bio_size = cpu_to_be32(max_bio_size);
1135         p->queue_order_type = cpu_to_be16(q_order_type);
1136         p->dds_flags = cpu_to_be16(flags);
1137         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1138 }
1139
1140 /**
1141  * drbd_send_current_state() - Sends the drbd state to the peer
1142  * @mdev:       DRBD device.
1143  */
1144 int drbd_send_current_state(struct drbd_conf *mdev)
1145 {
1146         struct drbd_socket *sock;
1147         struct p_state *p;
1148
1149         sock = &mdev->tconn->data;
1150         p = drbd_prepare_command(mdev, sock);
1151         if (!p)
1152                 return -EIO;
1153         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1154         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1155 }
1156
1157 /**
1158  * drbd_send_state() - After a state change, sends the new state to the peer
1159  * @mdev:      DRBD device.
1160  * @state:     the state to send, not necessarily the current state.
1161  *
1162  * Each state change queues an "after_state_ch" work, which will eventually
1163  * send the resulting new state to the peer. If more state changes happen
1164  * between queuing and processing of the after_state_ch work, we still
1165  * want to send each intermediary state in the order it occurred.
1166  */
1167 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
1168 {
1169         struct drbd_socket *sock;
1170         struct p_state *p;
1171
1172         sock = &mdev->tconn->data;
1173         p = drbd_prepare_command(mdev, sock);
1174         if (!p)
1175                 return -EIO;
1176         p->state = cpu_to_be32(state.i); /* Within the send mutex */
1177         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1178 }
1179
1180 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1181 {
1182         struct drbd_socket *sock;
1183         struct p_req_state *p;
1184
1185         sock = &mdev->tconn->data;
1186         p = drbd_prepare_command(mdev, sock);
1187         if (!p)
1188                 return -EIO;
1189         p->mask = cpu_to_be32(mask.i);
1190         p->val = cpu_to_be32(val.i);
1191         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1192 }
1193
1194 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1195 {
1196         enum drbd_packet cmd;
1197         struct drbd_socket *sock;
1198         struct p_req_state *p;
1199
1200         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1201         sock = &tconn->data;
1202         p = conn_prepare_command(tconn, sock);
1203         if (!p)
1204                 return -EIO;
1205         p->mask = cpu_to_be32(mask.i);
1206         p->val = cpu_to_be32(val.i);
1207         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1208 }
1209
1210 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1211 {
1212         struct drbd_socket *sock;
1213         struct p_req_state_reply *p;
1214
1215         sock = &mdev->tconn->meta;
1216         p = drbd_prepare_command(mdev, sock);
1217         if (p) {
1218                 p->retcode = cpu_to_be32(retcode);
1219                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1220         }
1221 }
1222
1223 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1224 {
1225         struct drbd_socket *sock;
1226         struct p_req_state_reply *p;
1227         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1228
1229         sock = &tconn->meta;
1230         p = conn_prepare_command(tconn, sock);
1231         if (p) {
1232                 p->retcode = cpu_to_be32(retcode);
1233                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1234         }
1235 }
1236
1237 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1238 {
1239         BUG_ON(code & ~0xf);
1240         p->encoding = (p->encoding & ~0xf) | code;
1241 }
1242
1243 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1244 {
1245         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1246 }
1247
1248 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1249 {
1250         BUG_ON(n & ~0x7);
1251         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1252 }
1253
1254 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1255                          struct p_compressed_bm *p,
1256                          unsigned int size,
1257                          struct bm_xfer_ctx *c)
1258 {
1259         struct bitstream bs;
1260         unsigned long plain_bits;
1261         unsigned long tmp;
1262         unsigned long rl;
1263         unsigned len;
1264         unsigned toggle;
1265         int bits, use_rle;
1266
1267         /* may we use this feature? */
1268         rcu_read_lock();
1269         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1270         rcu_read_unlock();
1271         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1272                 return 0;
1273
1274         if (c->bit_offset >= c->bm_bits)
1275                 return 0; /* nothing to do. */
1276
1277         /* use at most thus many bytes */
1278         bitstream_init(&bs, p->code, size, 0);
1279         memset(p->code, 0, size);
1280         /* plain bits covered in this code string */
1281         plain_bits = 0;
1282
1283         /* p->encoding & 0x80 stores whether the first run length is set.
1284          * bit offset is implicit.
1285          * start with toggle == 2 to be able to tell the first iteration */
1286         toggle = 2;
1287
1288         /* see how much plain bits we can stuff into one packet
1289          * using RLE and VLI. */
1290         do {
1291                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1292                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1293                 if (tmp == -1UL)
1294                         tmp = c->bm_bits;
1295                 rl = tmp - c->bit_offset;
1296
1297                 if (toggle == 2) { /* first iteration */
1298                         if (rl == 0) {
1299                                 /* the first checked bit was set,
1300                                  * store start value, */
1301                                 dcbp_set_start(p, 1);
1302                                 /* but skip encoding of zero run length */
1303                                 toggle = !toggle;
1304                                 continue;
1305                         }
1306                         dcbp_set_start(p, 0);
1307                 }
1308
1309                 /* paranoia: catch zero runlength.
1310                  * can only happen if bitmap is modified while we scan it. */
1311                 if (rl == 0) {
1312                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1313                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1314                         return -1;
1315                 }
1316
1317                 bits = vli_encode_bits(&bs, rl);
1318                 if (bits == -ENOBUFS) /* buffer full */
1319                         break;
1320                 if (bits <= 0) {
1321                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1322                         return 0;
1323                 }
1324
1325                 toggle = !toggle;
1326                 plain_bits += rl;
1327                 c->bit_offset = tmp;
1328         } while (c->bit_offset < c->bm_bits);
1329
1330         len = bs.cur.b - p->code + !!bs.cur.bit;
1331
1332         if (plain_bits < (len << 3)) {
1333                 /* incompressible with this method.
1334                  * we need to rewind both word and bit position. */
1335                 c->bit_offset -= plain_bits;
1336                 bm_xfer_ctx_bit_to_word_offset(c);
1337                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1338                 return 0;
1339         }
1340
1341         /* RLE + VLI was able to compress it just fine.
1342          * update c->word_offset. */
1343         bm_xfer_ctx_bit_to_word_offset(c);
1344
1345         /* store pad_bits */
1346         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1347
1348         return len;
1349 }
1350
1351 /**
1352  * send_bitmap_rle_or_plain
1353  *
1354  * Return 0 when done, 1 when another iteration is needed, and a negative error
1355  * code upon failure.
1356  */
1357 static int
1358 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1359 {
1360         struct drbd_socket *sock = &mdev->tconn->data;
1361         unsigned int header_size = drbd_header_size(mdev->tconn);
1362         struct p_compressed_bm *p = sock->sbuf + header_size;
1363         int len, err;
1364
1365         len = fill_bitmap_rle_bits(mdev, p,
1366                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1367         if (len < 0)
1368                 return -EIO;
1369
1370         if (len) {
1371                 dcbp_set_code(p, RLE_VLI_Bits);
1372                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1373                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1374                                      NULL, 0);
1375                 c->packets[0]++;
1376                 c->bytes[0] += header_size + sizeof(*p) + len;
1377
1378                 if (c->bit_offset >= c->bm_bits)
1379                         len = 0; /* DONE */
1380         } else {
1381                 /* was not compressible.
1382                  * send a buffer full of plain text bits instead. */
1383                 unsigned int data_size;
1384                 unsigned long num_words;
1385                 unsigned long *p = sock->sbuf + header_size;
1386
1387                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1388                 num_words = min_t(size_t, data_size / sizeof(*p),
1389                                   c->bm_words - c->word_offset);
1390                 len = num_words * sizeof(*p);
1391                 if (len)
1392                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1393                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1394                 c->word_offset += num_words;
1395                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1396
1397                 c->packets[1]++;
1398                 c->bytes[1] += header_size + len;
1399
1400                 if (c->bit_offset > c->bm_bits)
1401                         c->bit_offset = c->bm_bits;
1402         }
1403         if (!err) {
1404                 if (len == 0) {
1405                         INFO_bm_xfer_stats(mdev, "send", c);
1406                         return 0;
1407                 } else
1408                         return 1;
1409         }
1410         return -EIO;
1411 }
1412
1413 /* See the comment at receive_bitmap() */
1414 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1415 {
1416         struct bm_xfer_ctx c;
1417         int err;
1418
1419         if (!expect(mdev->bitmap))
1420                 return false;
1421
1422         if (get_ldev(mdev)) {
1423                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1424                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1425                         drbd_bm_set_all(mdev);
1426                         if (drbd_bm_write(mdev)) {
1427                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1428                                  * but otherwise process as per normal - need to tell other
1429                                  * side that a full resync is required! */
1430                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1431                         } else {
1432                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1433                                 drbd_md_sync(mdev);
1434                         }
1435                 }
1436                 put_ldev(mdev);
1437         }
1438
1439         c = (struct bm_xfer_ctx) {
1440                 .bm_bits = drbd_bm_bits(mdev),
1441                 .bm_words = drbd_bm_words(mdev),
1442         };
1443
1444         do {
1445                 err = send_bitmap_rle_or_plain(mdev, &c);
1446         } while (err > 0);
1447
1448         return err == 0;
1449 }
1450
1451 int drbd_send_bitmap(struct drbd_conf *mdev)
1452 {
1453         struct drbd_socket *sock = &mdev->tconn->data;
1454         int err = -1;
1455
1456         mutex_lock(&sock->mutex);
1457         if (sock->socket)
1458                 err = !_drbd_send_bitmap(mdev);
1459         mutex_unlock(&sock->mutex);
1460         return err;
1461 }
1462
1463 void drbd_send_b_ack(struct drbd_tconn *tconn, u32 barrier_nr, u32 set_size)
1464 {
1465         struct drbd_socket *sock;
1466         struct p_barrier_ack *p;
1467
1468         if (tconn->cstate < C_WF_REPORT_PARAMS)
1469                 return;
1470
1471         sock = &tconn->meta;
1472         p = conn_prepare_command(tconn, sock);
1473         if (!p)
1474                 return;
1475         p->barrier = barrier_nr;
1476         p->set_size = cpu_to_be32(set_size);
1477         conn_send_command(tconn, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1478 }
1479
1480 /**
1481  * _drbd_send_ack() - Sends an ack packet
1482  * @mdev:       DRBD device.
1483  * @cmd:        Packet command code.
1484  * @sector:     sector, needs to be in big endian byte order
1485  * @blksize:    size in byte, needs to be in big endian byte order
1486  * @block_id:   Id, big endian byte order
1487  */
1488 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1489                           u64 sector, u32 blksize, u64 block_id)
1490 {
1491         struct drbd_socket *sock;
1492         struct p_block_ack *p;
1493
1494         if (mdev->state.conn < C_CONNECTED)
1495                 return -EIO;
1496
1497         sock = &mdev->tconn->meta;
1498         p = drbd_prepare_command(mdev, sock);
1499         if (!p)
1500                 return -EIO;
1501         p->sector = sector;
1502         p->block_id = block_id;
1503         p->blksize = blksize;
1504         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1505         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1506 }
1507
1508 /* dp->sector and dp->block_id already/still in network byte order,
1509  * data_size is payload size according to dp->head,
1510  * and may need to be corrected for digest size. */
1511 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1512                       struct p_data *dp, int data_size)
1513 {
1514         if (mdev->tconn->peer_integrity_tfm)
1515                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1516         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1517                        dp->block_id);
1518 }
1519
1520 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1521                       struct p_block_req *rp)
1522 {
1523         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1524 }
1525
1526 /**
1527  * drbd_send_ack() - Sends an ack packet
1528  * @mdev:       DRBD device
1529  * @cmd:        packet command code
1530  * @peer_req:   peer request
1531  */
1532 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1533                   struct drbd_peer_request *peer_req)
1534 {
1535         return _drbd_send_ack(mdev, cmd,
1536                               cpu_to_be64(peer_req->i.sector),
1537                               cpu_to_be32(peer_req->i.size),
1538                               peer_req->block_id);
1539 }
1540
1541 /* This function misuses the block_id field to signal if the blocks
1542  * are is sync or not. */
1543 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1544                      sector_t sector, int blksize, u64 block_id)
1545 {
1546         return _drbd_send_ack(mdev, cmd,
1547                               cpu_to_be64(sector),
1548                               cpu_to_be32(blksize),
1549                               cpu_to_be64(block_id));
1550 }
1551
1552 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1553                        sector_t sector, int size, u64 block_id)
1554 {
1555         struct drbd_socket *sock;
1556         struct p_block_req *p;
1557
1558         sock = &mdev->tconn->data;
1559         p = drbd_prepare_command(mdev, sock);
1560         if (!p)
1561                 return -EIO;
1562         p->sector = cpu_to_be64(sector);
1563         p->block_id = block_id;
1564         p->blksize = cpu_to_be32(size);
1565         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1566 }
1567
1568 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1569                             void *digest, int digest_size, enum drbd_packet cmd)
1570 {
1571         struct drbd_socket *sock;
1572         struct p_block_req *p;
1573
1574         /* FIXME: Put the digest into the preallocated socket buffer.  */
1575
1576         sock = &mdev->tconn->data;
1577         p = drbd_prepare_command(mdev, sock);
1578         if (!p)
1579                 return -EIO;
1580         p->sector = cpu_to_be64(sector);
1581         p->block_id = ID_SYNCER /* unused */;
1582         p->blksize = cpu_to_be32(size);
1583         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1584                                  digest, digest_size);
1585 }
1586
1587 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1588 {
1589         struct drbd_socket *sock;
1590         struct p_block_req *p;
1591
1592         sock = &mdev->tconn->data;
1593         p = drbd_prepare_command(mdev, sock);
1594         if (!p)
1595                 return -EIO;
1596         p->sector = cpu_to_be64(sector);
1597         p->block_id = ID_SYNCER /* unused */;
1598         p->blksize = cpu_to_be32(size);
1599         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1600 }
1601
1602 /* called on sndtimeo
1603  * returns false if we should retry,
1604  * true if we think connection is dead
1605  */
1606 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1607 {
1608         int drop_it;
1609         /* long elapsed = (long)(jiffies - mdev->last_received); */
1610
1611         drop_it =   tconn->meta.socket == sock
1612                 || !tconn->asender.task
1613                 || get_t_state(&tconn->asender) != RUNNING
1614                 || tconn->cstate < C_WF_REPORT_PARAMS;
1615
1616         if (drop_it)
1617                 return true;
1618
1619         drop_it = !--tconn->ko_count;
1620         if (!drop_it) {
1621                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1622                          current->comm, current->pid, tconn->ko_count);
1623                 request_ping(tconn);
1624         }
1625
1626         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1627 }
1628
1629 static void drbd_update_congested(struct drbd_tconn *tconn)
1630 {
1631         struct sock *sk = tconn->data.socket->sk;
1632         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1633                 set_bit(NET_CONGESTED, &tconn->flags);
1634 }
1635
1636 /* The idea of sendpage seems to be to put some kind of reference
1637  * to the page into the skb, and to hand it over to the NIC. In
1638  * this process get_page() gets called.
1639  *
1640  * As soon as the page was really sent over the network put_page()
1641  * gets called by some part of the network layer. [ NIC driver? ]
1642  *
1643  * [ get_page() / put_page() increment/decrement the count. If count
1644  *   reaches 0 the page will be freed. ]
1645  *
1646  * This works nicely with pages from FSs.
1647  * But this means that in protocol A we might signal IO completion too early!
1648  *
1649  * In order not to corrupt data during a resync we must make sure
1650  * that we do not reuse our own buffer pages (EEs) to early, therefore
1651  * we have the net_ee list.
1652  *
1653  * XFS seems to have problems, still, it submits pages with page_count == 0!
1654  * As a workaround, we disable sendpage on pages
1655  * with page_count == 0 or PageSlab.
1656  */
1657 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1658                               int offset, size_t size, unsigned msg_flags)
1659 {
1660         struct socket *socket;
1661         void *addr;
1662         int err;
1663
1664         socket = mdev->tconn->data.socket;
1665         addr = kmap(page) + offset;
1666         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1667         kunmap(page);
1668         if (!err)
1669                 mdev->send_cnt += size >> 9;
1670         return err;
1671 }
1672
1673 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1674                     int offset, size_t size, unsigned msg_flags)
1675 {
1676         struct socket *socket = mdev->tconn->data.socket;
1677         mm_segment_t oldfs = get_fs();
1678         int len = size;
1679         int err = -EIO;
1680
1681         /* e.g. XFS meta- & log-data is in slab pages, which have a
1682          * page_count of 0 and/or have PageSlab() set.
1683          * we cannot use send_page for those, as that does get_page();
1684          * put_page(); and would cause either a VM_BUG directly, or
1685          * __page_cache_release a page that would actually still be referenced
1686          * by someone, leading to some obscure delayed Oops somewhere else. */
1687         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1688                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1689
1690         msg_flags |= MSG_NOSIGNAL;
1691         drbd_update_congested(mdev->tconn);
1692         set_fs(KERNEL_DS);
1693         do {
1694                 int sent;
1695
1696                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1697                 if (sent <= 0) {
1698                         if (sent == -EAGAIN) {
1699                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1700                                         break;
1701                                 continue;
1702                         }
1703                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1704                              __func__, (int)size, len, sent);
1705                         if (sent < 0)
1706                                 err = sent;
1707                         break;
1708                 }
1709                 len    -= sent;
1710                 offset += sent;
1711         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1712         set_fs(oldfs);
1713         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1714
1715         if (len == 0) {
1716                 err = 0;
1717                 mdev->send_cnt += size >> 9;
1718         }
1719         return err;
1720 }
1721
1722 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1723 {
1724         struct bio_vec *bvec;
1725         int i;
1726         /* hint all but last page with MSG_MORE */
1727         bio_for_each_segment(bvec, bio, i) {
1728                 int err;
1729
1730                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1731                                          bvec->bv_offset, bvec->bv_len,
1732                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1733                 if (err)
1734                         return err;
1735         }
1736         return 0;
1737 }
1738
1739 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1740 {
1741         struct bio_vec *bvec;
1742         int i;
1743         /* hint all but last page with MSG_MORE */
1744         bio_for_each_segment(bvec, bio, i) {
1745                 int err;
1746
1747                 err = _drbd_send_page(mdev, bvec->bv_page,
1748                                       bvec->bv_offset, bvec->bv_len,
1749                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1750                 if (err)
1751                         return err;
1752         }
1753         return 0;
1754 }
1755
1756 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1757                             struct drbd_peer_request *peer_req)
1758 {
1759         struct page *page = peer_req->pages;
1760         unsigned len = peer_req->i.size;
1761         int err;
1762
1763         /* hint all but last page with MSG_MORE */
1764         page_chain_for_each(page) {
1765                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1766
1767                 err = _drbd_send_page(mdev, page, 0, l,
1768                                       page_chain_next(page) ? MSG_MORE : 0);
1769                 if (err)
1770                         return err;
1771                 len -= l;
1772         }
1773         return 0;
1774 }
1775
1776 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1777 {
1778         if (mdev->tconn->agreed_pro_version >= 95)
1779                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1780                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1781                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1782                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1783         else
1784                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1785 }
1786
1787 /* Used to send write requests
1788  * R_PRIMARY -> Peer    (P_DATA)
1789  */
1790 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1791 {
1792         struct drbd_socket *sock;
1793         struct p_data *p;
1794         unsigned int dp_flags = 0;
1795         int dgs;
1796         int err;
1797
1798         sock = &mdev->tconn->data;
1799         p = drbd_prepare_command(mdev, sock);
1800         dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1801
1802         if (!p)
1803                 return -EIO;
1804         p->sector = cpu_to_be64(req->i.sector);
1805         p->block_id = (unsigned long)req;
1806         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1807         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1808         if (mdev->state.conn >= C_SYNC_SOURCE &&
1809             mdev->state.conn <= C_PAUSED_SYNC_T)
1810                 dp_flags |= DP_MAY_SET_IN_SYNC;
1811         if (mdev->tconn->agreed_pro_version >= 100) {
1812                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1813                         dp_flags |= DP_SEND_RECEIVE_ACK;
1814                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1815                         dp_flags |= DP_SEND_WRITE_ACK;
1816         }
1817         p->dp_flags = cpu_to_be32(dp_flags);
1818         if (dgs)
1819                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1820         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1821         if (!err) {
1822                 /* For protocol A, we have to memcpy the payload into
1823                  * socket buffers, as we may complete right away
1824                  * as soon as we handed it over to tcp, at which point the data
1825                  * pages may become invalid.
1826                  *
1827                  * For data-integrity enabled, we copy it as well, so we can be
1828                  * sure that even if the bio pages may still be modified, it
1829                  * won't change the data on the wire, thus if the digest checks
1830                  * out ok after sending on this side, but does not fit on the
1831                  * receiving side, we sure have detected corruption elsewhere.
1832                  */
1833                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1834                         err = _drbd_send_bio(mdev, req->master_bio);
1835                 else
1836                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1837
1838                 /* double check digest, sometimes buffers have been modified in flight. */
1839                 if (dgs > 0 && dgs <= 64) {
1840                         /* 64 byte, 512 bit, is the largest digest size
1841                          * currently supported in kernel crypto. */
1842                         unsigned char digest[64];
1843                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1844                         if (memcmp(p + 1, digest, dgs)) {
1845                                 dev_warn(DEV,
1846                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1847                                         (unsigned long long)req->i.sector, req->i.size);
1848                         }
1849                 } /* else if (dgs > 64) {
1850                      ... Be noisy about digest too large ...
1851                 } */
1852         }
1853         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1854
1855         return err;
1856 }
1857
1858 /* answer packet, used to send data back for read requests:
1859  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1860  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1861  */
1862 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1863                     struct drbd_peer_request *peer_req)
1864 {
1865         struct drbd_socket *sock;
1866         struct p_data *p;
1867         int err;
1868         int dgs;
1869
1870         sock = &mdev->tconn->data;
1871         p = drbd_prepare_command(mdev, sock);
1872
1873         dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1874
1875         if (!p)
1876                 return -EIO;
1877         p->sector = cpu_to_be64(peer_req->i.sector);
1878         p->block_id = peer_req->block_id;
1879         p->seq_num = 0;  /* unused */
1880         p->dp_flags = 0;
1881         if (dgs)
1882                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1883         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1884         if (!err)
1885                 err = _drbd_send_zc_ee(mdev, peer_req);
1886         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1887
1888         return err;
1889 }
1890
1891 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1892 {
1893         struct drbd_socket *sock;
1894         struct p_block_desc *p;
1895
1896         sock = &mdev->tconn->data;
1897         p = drbd_prepare_command(mdev, sock);
1898         if (!p)
1899                 return -EIO;
1900         p->sector = cpu_to_be64(req->i.sector);
1901         p->blksize = cpu_to_be32(req->i.size);
1902         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1903 }
1904
1905 /*
1906   drbd_send distinguishes two cases:
1907
1908   Packets sent via the data socket "sock"
1909   and packets sent via the meta data socket "msock"
1910
1911                     sock                      msock
1912   -----------------+-------------------------+------------------------------
1913   timeout           conf.timeout / 2          conf.timeout / 2
1914   timeout action    send a ping via msock     Abort communication
1915                                               and close all sockets
1916 */
1917
1918 /*
1919  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1920  */
1921 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1922               void *buf, size_t size, unsigned msg_flags)
1923 {
1924         struct kvec iov;
1925         struct msghdr msg;
1926         int rv, sent = 0;
1927
1928         if (!sock)
1929                 return -EBADR;
1930
1931         /* THINK  if (signal_pending) return ... ? */
1932
1933         iov.iov_base = buf;
1934         iov.iov_len  = size;
1935
1936         msg.msg_name       = NULL;
1937         msg.msg_namelen    = 0;
1938         msg.msg_control    = NULL;
1939         msg.msg_controllen = 0;
1940         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1941
1942         if (sock == tconn->data.socket) {
1943                 rcu_read_lock();
1944                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1945                 rcu_read_unlock();
1946                 drbd_update_congested(tconn);
1947         }
1948         do {
1949                 /* STRANGE
1950                  * tcp_sendmsg does _not_ use its size parameter at all ?
1951                  *
1952                  * -EAGAIN on timeout, -EINTR on signal.
1953                  */
1954 /* THINK
1955  * do we need to block DRBD_SIG if sock == &meta.socket ??
1956  * otherwise wake_asender() might interrupt some send_*Ack !
1957  */
1958                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1959                 if (rv == -EAGAIN) {
1960                         if (we_should_drop_the_connection(tconn, sock))
1961                                 break;
1962                         else
1963                                 continue;
1964                 }
1965                 if (rv == -EINTR) {
1966                         flush_signals(current);
1967                         rv = 0;
1968                 }
1969                 if (rv < 0)
1970                         break;
1971                 sent += rv;
1972                 iov.iov_base += rv;
1973                 iov.iov_len  -= rv;
1974         } while (sent < size);
1975
1976         if (sock == tconn->data.socket)
1977                 clear_bit(NET_CONGESTED, &tconn->flags);
1978
1979         if (rv <= 0) {
1980                 if (rv != -EAGAIN) {
1981                         conn_err(tconn, "%s_sendmsg returned %d\n",
1982                                  sock == tconn->meta.socket ? "msock" : "sock",
1983                                  rv);
1984                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1985                 } else
1986                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1987         }
1988
1989         return sent;
1990 }
1991
1992 /**
1993  * drbd_send_all  -  Send an entire buffer
1994  *
1995  * Returns 0 upon success and a negative error value otherwise.
1996  */
1997 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1998                   size_t size, unsigned msg_flags)
1999 {
2000         int err;
2001
2002         err = drbd_send(tconn, sock, buffer, size, msg_flags);
2003         if (err < 0)
2004                 return err;
2005         if (err != size)
2006                 return -EIO;
2007         return 0;
2008 }
2009
2010 static int drbd_open(struct block_device *bdev, fmode_t mode)
2011 {
2012         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2013         unsigned long flags;
2014         int rv = 0;
2015
2016         mutex_lock(&drbd_main_mutex);
2017         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2018         /* to have a stable mdev->state.role
2019          * and no race with updating open_cnt */
2020
2021         if (mdev->state.role != R_PRIMARY) {
2022                 if (mode & FMODE_WRITE)
2023                         rv = -EROFS;
2024                 else if (!allow_oos)
2025                         rv = -EMEDIUMTYPE;
2026         }
2027
2028         if (!rv)
2029                 mdev->open_cnt++;
2030         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2031         mutex_unlock(&drbd_main_mutex);
2032
2033         return rv;
2034 }
2035
2036 static int drbd_release(struct gendisk *gd, fmode_t mode)
2037 {
2038         struct drbd_conf *mdev = gd->private_data;
2039         mutex_lock(&drbd_main_mutex);
2040         mdev->open_cnt--;
2041         mutex_unlock(&drbd_main_mutex);
2042         return 0;
2043 }
2044
2045 static void drbd_set_defaults(struct drbd_conf *mdev)
2046 {
2047         /* Beware! The actual layout differs
2048          * between big endian and little endian */
2049         mdev->state = (union drbd_dev_state) {
2050                 { .role = R_SECONDARY,
2051                   .peer = R_UNKNOWN,
2052                   .conn = C_STANDALONE,
2053                   .disk = D_DISKLESS,
2054                   .pdsk = D_UNKNOWN,
2055                 } };
2056 }
2057
2058 void drbd_init_set_defaults(struct drbd_conf *mdev)
2059 {
2060         /* the memset(,0,) did most of this.
2061          * note: only assignments, no allocation in here */
2062
2063         drbd_set_defaults(mdev);
2064
2065         atomic_set(&mdev->ap_bio_cnt, 0);
2066         atomic_set(&mdev->ap_pending_cnt, 0);
2067         atomic_set(&mdev->rs_pending_cnt, 0);
2068         atomic_set(&mdev->unacked_cnt, 0);
2069         atomic_set(&mdev->local_cnt, 0);
2070         atomic_set(&mdev->pp_in_use_by_net, 0);
2071         atomic_set(&mdev->rs_sect_in, 0);
2072         atomic_set(&mdev->rs_sect_ev, 0);
2073         atomic_set(&mdev->ap_in_flight, 0);
2074         atomic_set(&mdev->md_io_in_use, 0);
2075
2076         mutex_init(&mdev->own_state_mutex);
2077         mdev->state_mutex = &mdev->own_state_mutex;
2078
2079         spin_lock_init(&mdev->al_lock);
2080         spin_lock_init(&mdev->peer_seq_lock);
2081
2082         INIT_LIST_HEAD(&mdev->active_ee);
2083         INIT_LIST_HEAD(&mdev->sync_ee);
2084         INIT_LIST_HEAD(&mdev->done_ee);
2085         INIT_LIST_HEAD(&mdev->read_ee);
2086         INIT_LIST_HEAD(&mdev->net_ee);
2087         INIT_LIST_HEAD(&mdev->resync_reads);
2088         INIT_LIST_HEAD(&mdev->resync_work.list);
2089         INIT_LIST_HEAD(&mdev->unplug_work.list);
2090         INIT_LIST_HEAD(&mdev->go_diskless.list);
2091         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2092         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2093         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2094
2095         mdev->resync_work.cb  = w_resync_timer;
2096         mdev->unplug_work.cb  = w_send_write_hint;
2097         mdev->go_diskless.cb  = w_go_diskless;
2098         mdev->md_sync_work.cb = w_md_sync;
2099         mdev->bm_io_work.w.cb = w_bitmap_io;
2100         mdev->start_resync_work.cb = w_start_resync;
2101
2102         mdev->resync_work.mdev  = mdev;
2103         mdev->unplug_work.mdev  = mdev;
2104         mdev->go_diskless.mdev  = mdev;
2105         mdev->md_sync_work.mdev = mdev;
2106         mdev->bm_io_work.w.mdev = mdev;
2107         mdev->start_resync_work.mdev = mdev;
2108
2109         init_timer(&mdev->resync_timer);
2110         init_timer(&mdev->md_sync_timer);
2111         init_timer(&mdev->start_resync_timer);
2112         init_timer(&mdev->request_timer);
2113         mdev->resync_timer.function = resync_timer_fn;
2114         mdev->resync_timer.data = (unsigned long) mdev;
2115         mdev->md_sync_timer.function = md_sync_timer_fn;
2116         mdev->md_sync_timer.data = (unsigned long) mdev;
2117         mdev->start_resync_timer.function = start_resync_timer_fn;
2118         mdev->start_resync_timer.data = (unsigned long) mdev;
2119         mdev->request_timer.function = request_timer_fn;
2120         mdev->request_timer.data = (unsigned long) mdev;
2121
2122         init_waitqueue_head(&mdev->misc_wait);
2123         init_waitqueue_head(&mdev->state_wait);
2124         init_waitqueue_head(&mdev->ee_wait);
2125         init_waitqueue_head(&mdev->al_wait);
2126         init_waitqueue_head(&mdev->seq_wait);
2127
2128         mdev->resync_wenr = LC_FREE;
2129         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2130         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2131 }
2132
2133 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2134 {
2135         int i;
2136         if (mdev->tconn->receiver.t_state != NONE)
2137                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2138                                 mdev->tconn->receiver.t_state);
2139
2140         mdev->al_writ_cnt  =
2141         mdev->bm_writ_cnt  =
2142         mdev->read_cnt     =
2143         mdev->recv_cnt     =
2144         mdev->send_cnt     =
2145         mdev->writ_cnt     =
2146         mdev->p_size       =
2147         mdev->rs_start     =
2148         mdev->rs_total     =
2149         mdev->rs_failed    = 0;
2150         mdev->rs_last_events = 0;
2151         mdev->rs_last_sect_ev = 0;
2152         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2153                 mdev->rs_mark_left[i] = 0;
2154                 mdev->rs_mark_time[i] = 0;
2155         }
2156         D_ASSERT(mdev->tconn->net_conf == NULL);
2157
2158         drbd_set_my_capacity(mdev, 0);
2159         if (mdev->bitmap) {
2160                 /* maybe never allocated. */
2161                 drbd_bm_resize(mdev, 0, 1);
2162                 drbd_bm_cleanup(mdev);
2163         }
2164
2165         drbd_free_bc(mdev->ldev);
2166         mdev->ldev = NULL;
2167
2168         clear_bit(AL_SUSPENDED, &mdev->flags);
2169
2170         D_ASSERT(list_empty(&mdev->active_ee));
2171         D_ASSERT(list_empty(&mdev->sync_ee));
2172         D_ASSERT(list_empty(&mdev->done_ee));
2173         D_ASSERT(list_empty(&mdev->read_ee));
2174         D_ASSERT(list_empty(&mdev->net_ee));
2175         D_ASSERT(list_empty(&mdev->resync_reads));
2176         D_ASSERT(list_empty(&mdev->tconn->sender_work.q));
2177         D_ASSERT(list_empty(&mdev->resync_work.list));
2178         D_ASSERT(list_empty(&mdev->unplug_work.list));
2179         D_ASSERT(list_empty(&mdev->go_diskless.list));
2180
2181         drbd_set_defaults(mdev);
2182 }
2183
2184
2185 static void drbd_destroy_mempools(void)
2186 {
2187         struct page *page;
2188
2189         while (drbd_pp_pool) {
2190                 page = drbd_pp_pool;
2191                 drbd_pp_pool = (struct page *)page_private(page);
2192                 __free_page(page);
2193                 drbd_pp_vacant--;
2194         }
2195
2196         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2197
2198         if (drbd_md_io_bio_set)
2199                 bioset_free(drbd_md_io_bio_set);
2200         if (drbd_md_io_page_pool)
2201                 mempool_destroy(drbd_md_io_page_pool);
2202         if (drbd_ee_mempool)
2203                 mempool_destroy(drbd_ee_mempool);
2204         if (drbd_request_mempool)
2205                 mempool_destroy(drbd_request_mempool);
2206         if (drbd_ee_cache)
2207                 kmem_cache_destroy(drbd_ee_cache);
2208         if (drbd_request_cache)
2209                 kmem_cache_destroy(drbd_request_cache);
2210         if (drbd_bm_ext_cache)
2211                 kmem_cache_destroy(drbd_bm_ext_cache);
2212         if (drbd_al_ext_cache)
2213                 kmem_cache_destroy(drbd_al_ext_cache);
2214
2215         drbd_md_io_bio_set   = NULL;
2216         drbd_md_io_page_pool = NULL;
2217         drbd_ee_mempool      = NULL;
2218         drbd_request_mempool = NULL;
2219         drbd_ee_cache        = NULL;
2220         drbd_request_cache   = NULL;
2221         drbd_bm_ext_cache    = NULL;
2222         drbd_al_ext_cache    = NULL;
2223
2224         return;
2225 }
2226
2227 static int drbd_create_mempools(void)
2228 {
2229         struct page *page;
2230         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2231         int i;
2232
2233         /* prepare our caches and mempools */
2234         drbd_request_mempool = NULL;
2235         drbd_ee_cache        = NULL;
2236         drbd_request_cache   = NULL;
2237         drbd_bm_ext_cache    = NULL;
2238         drbd_al_ext_cache    = NULL;
2239         drbd_pp_pool         = NULL;
2240         drbd_md_io_page_pool = NULL;
2241         drbd_md_io_bio_set   = NULL;
2242
2243         /* caches */
2244         drbd_request_cache = kmem_cache_create(
2245                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2246         if (drbd_request_cache == NULL)
2247                 goto Enomem;
2248
2249         drbd_ee_cache = kmem_cache_create(
2250                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2251         if (drbd_ee_cache == NULL)
2252                 goto Enomem;
2253
2254         drbd_bm_ext_cache = kmem_cache_create(
2255                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2256         if (drbd_bm_ext_cache == NULL)
2257                 goto Enomem;
2258
2259         drbd_al_ext_cache = kmem_cache_create(
2260                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2261         if (drbd_al_ext_cache == NULL)
2262                 goto Enomem;
2263
2264         /* mempools */
2265         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2266         if (drbd_md_io_bio_set == NULL)
2267                 goto Enomem;
2268
2269         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2270         if (drbd_md_io_page_pool == NULL)
2271                 goto Enomem;
2272
2273         drbd_request_mempool = mempool_create(number,
2274                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2275         if (drbd_request_mempool == NULL)
2276                 goto Enomem;
2277
2278         drbd_ee_mempool = mempool_create(number,
2279                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2280         if (drbd_ee_mempool == NULL)
2281                 goto Enomem;
2282
2283         /* drbd's page pool */
2284         spin_lock_init(&drbd_pp_lock);
2285
2286         for (i = 0; i < number; i++) {
2287                 page = alloc_page(GFP_HIGHUSER);
2288                 if (!page)
2289                         goto Enomem;
2290                 set_page_private(page, (unsigned long)drbd_pp_pool);
2291                 drbd_pp_pool = page;
2292         }
2293         drbd_pp_vacant = number;
2294
2295         return 0;
2296
2297 Enomem:
2298         drbd_destroy_mempools(); /* in case we allocated some */
2299         return -ENOMEM;
2300 }
2301
2302 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2303         void *unused)
2304 {
2305         /* just so we have it.  you never know what interesting things we
2306          * might want to do here some day...
2307          */
2308
2309         return NOTIFY_DONE;
2310 }
2311
2312 static struct notifier_block drbd_notifier = {
2313         .notifier_call = drbd_notify_sys,
2314 };
2315
2316 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2317 {
2318         int rr;
2319
2320         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2321         if (rr)
2322                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2323
2324         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2325         if (rr)
2326                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2327
2328         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2329         if (rr)
2330                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2331
2332         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2333         if (rr)
2334                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2335
2336         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2337         if (rr)
2338                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2339 }
2340
2341 /* caution. no locking. */
2342 void drbd_minor_destroy(struct kref *kref)
2343 {
2344         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2345         struct drbd_tconn *tconn = mdev->tconn;
2346
2347         del_timer_sync(&mdev->request_timer);
2348
2349         /* paranoia asserts */
2350         D_ASSERT(mdev->open_cnt == 0);
2351         /* end paranoia asserts */
2352
2353         /* cleanup stuff that may have been allocated during
2354          * device (re-)configuration or state changes */
2355
2356         if (mdev->this_bdev)
2357                 bdput(mdev->this_bdev);
2358
2359         drbd_free_bc(mdev->ldev);
2360         mdev->ldev = NULL;
2361
2362         drbd_release_all_peer_reqs(mdev);
2363
2364         lc_destroy(mdev->act_log);
2365         lc_destroy(mdev->resync);
2366
2367         kfree(mdev->p_uuid);
2368         /* mdev->p_uuid = NULL; */
2369
2370         if (mdev->bitmap) /* should no longer be there. */
2371                 drbd_bm_cleanup(mdev);
2372         __free_page(mdev->md_io_page);
2373         put_disk(mdev->vdisk);
2374         blk_cleanup_queue(mdev->rq_queue);
2375         kfree(mdev->rs_plan_s);
2376         kfree(mdev);
2377
2378         kref_put(&tconn->kref, &conn_destroy);
2379 }
2380
2381 /* One global retry thread, if we need to push back some bio and have it
2382  * reinserted through our make request function.
2383  */
2384 static struct retry_worker {
2385         struct workqueue_struct *wq;
2386         struct work_struct worker;
2387
2388         spinlock_t lock;
2389         struct list_head writes;
2390 } retry;
2391
2392 static void do_retry(struct work_struct *ws)
2393 {
2394         struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
2395         LIST_HEAD(writes);
2396         struct drbd_request *req, *tmp;
2397
2398         spin_lock_irq(&retry->lock);
2399         list_splice_init(&retry->writes, &writes);
2400         spin_unlock_irq(&retry->lock);
2401
2402         list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
2403                 struct drbd_conf *mdev = req->w.mdev;
2404                 struct bio *bio = req->master_bio;
2405                 unsigned long start_time = req->start_time;
2406
2407                 /* We have exclusive access to this request object.
2408                  * If it had not been RQ_POSTPONED, the code path which queued
2409                  * it here would have completed and freed it already.
2410                  */
2411                 mempool_free(req, drbd_request_mempool);
2412
2413                 /* A single suspended or otherwise blocking device may stall
2414                  * all others as well.  Fortunately, this code path is to
2415                  * recover from a situation that "should not happen":
2416                  * concurrent writes in multi-primary setup.
2417                  * In a "normal" lifecycle, this workqueue is supposed to be
2418                  * destroyed without ever doing anything.
2419                  * If it turns out to be an issue anyways, we can do per
2420                  * resource (replication group) or per device (minor) retry
2421                  * workqueues instead.
2422                  */
2423
2424                 /* We are not just doing generic_make_request(),
2425                  * as we want to keep the start_time information. */
2426                 do {
2427                         inc_ap_bio(mdev);
2428                 } while(__drbd_make_request(mdev, bio, start_time));
2429         }
2430 }
2431
2432 void drbd_restart_request(struct drbd_request *req)
2433 {
2434         unsigned long flags;
2435         spin_lock_irqsave(&retry.lock, flags);
2436         list_move_tail(&req->tl_requests, &retry.writes);
2437         spin_unlock_irqrestore(&retry.lock, flags);
2438
2439         /* Drop the extra reference that would otherwise
2440          * have been dropped by complete_master_bio.
2441          * do_retry() needs to grab a new one. */
2442         dec_ap_bio(req->w.mdev);
2443
2444         queue_work(retry.wq, &retry.worker);
2445 }
2446
2447
2448 static void drbd_cleanup(void)
2449 {
2450         unsigned int i;
2451         struct drbd_conf *mdev;
2452         struct drbd_tconn *tconn, *tmp;
2453
2454         unregister_reboot_notifier(&drbd_notifier);
2455
2456         /* first remove proc,
2457          * drbdsetup uses it's presence to detect
2458          * whether DRBD is loaded.
2459          * If we would get stuck in proc removal,
2460          * but have netlink already deregistered,
2461          * some drbdsetup commands may wait forever
2462          * for an answer.
2463          */
2464         if (drbd_proc)
2465                 remove_proc_entry("drbd", NULL);
2466
2467         if (retry.wq)
2468                 destroy_workqueue(retry.wq);
2469
2470         drbd_genl_unregister();
2471
2472         idr_for_each_entry(&minors, mdev, i) {
2473                 idr_remove(&minors, mdev_to_minor(mdev));
2474                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2475                 del_gendisk(mdev->vdisk);
2476                 /* synchronize_rcu(); No other threads running at this point */
2477                 kref_put(&mdev->kref, &drbd_minor_destroy);
2478         }
2479
2480         /* not _rcu since, no other updater anymore. Genl already unregistered */
2481         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2482                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2483                 /* synchronize_rcu(); */
2484                 kref_put(&tconn->kref, &conn_destroy);
2485         }
2486
2487         drbd_destroy_mempools();
2488         unregister_blkdev(DRBD_MAJOR, "drbd");
2489
2490         idr_destroy(&minors);
2491
2492         printk(KERN_INFO "drbd: module cleanup done.\n");
2493 }
2494
2495 /**
2496  * drbd_congested() - Callback for pdflush
2497  * @congested_data:     User data
2498  * @bdi_bits:           Bits pdflush is currently interested in
2499  *
2500  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2501  */
2502 static int drbd_congested(void *congested_data, int bdi_bits)
2503 {
2504         struct drbd_conf *mdev = congested_data;
2505         struct request_queue *q;
2506         char reason = '-';
2507         int r = 0;
2508
2509         if (!may_inc_ap_bio(mdev)) {
2510                 /* DRBD has frozen IO */
2511                 r = bdi_bits;
2512                 reason = 'd';
2513                 goto out;
2514         }
2515
2516         if (get_ldev(mdev)) {
2517                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2518                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2519                 put_ldev(mdev);
2520                 if (r)
2521                         reason = 'b';
2522         }
2523
2524         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2525                 r |= (1 << BDI_async_congested);
2526                 reason = reason == 'b' ? 'a' : 'n';
2527         }
2528
2529 out:
2530         mdev->congestion_reason = reason;
2531         return r;
2532 }
2533
2534 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2535 {
2536         spin_lock_init(&wq->q_lock);
2537         INIT_LIST_HEAD(&wq->q);
2538         init_waitqueue_head(&wq->q_wait);
2539 }
2540
2541 struct drbd_tconn *conn_get_by_name(const char *name)
2542 {
2543         struct drbd_tconn *tconn;
2544
2545         if (!name || !name[0])
2546                 return NULL;
2547
2548         rcu_read_lock();
2549         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2550                 if (!strcmp(tconn->name, name)) {
2551                         kref_get(&tconn->kref);
2552                         goto found;
2553                 }
2554         }
2555         tconn = NULL;
2556 found:
2557         rcu_read_unlock();
2558         return tconn;
2559 }
2560
2561 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2562                                      void *peer_addr, int peer_addr_len)
2563 {
2564         struct drbd_tconn *tconn;
2565
2566         rcu_read_lock();
2567         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2568                 if (tconn->my_addr_len == my_addr_len &&
2569                     tconn->peer_addr_len == peer_addr_len &&
2570                     !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2571                     !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2572                         kref_get(&tconn->kref);
2573                         goto found;
2574                 }
2575         }
2576         tconn = NULL;
2577 found:
2578         rcu_read_unlock();
2579         return tconn;
2580 }
2581
2582 static int drbd_alloc_socket(struct drbd_socket *socket)
2583 {
2584         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2585         if (!socket->rbuf)
2586                 return -ENOMEM;
2587         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2588         if (!socket->sbuf)
2589                 return -ENOMEM;
2590         return 0;
2591 }
2592
2593 static void drbd_free_socket(struct drbd_socket *socket)
2594 {
2595         free_page((unsigned long) socket->sbuf);
2596         free_page((unsigned long) socket->rbuf);
2597 }
2598
2599 void conn_free_crypto(struct drbd_tconn *tconn)
2600 {
2601         drbd_free_sock(tconn);
2602
2603         crypto_free_hash(tconn->csums_tfm);
2604         crypto_free_hash(tconn->verify_tfm);
2605         crypto_free_hash(tconn->cram_hmac_tfm);
2606         crypto_free_hash(tconn->integrity_tfm);
2607         crypto_free_hash(tconn->peer_integrity_tfm);
2608         kfree(tconn->int_dig_in);
2609         kfree(tconn->int_dig_vv);
2610
2611         tconn->csums_tfm = NULL;
2612         tconn->verify_tfm = NULL;
2613         tconn->cram_hmac_tfm = NULL;
2614         tconn->integrity_tfm = NULL;
2615         tconn->peer_integrity_tfm = NULL;
2616         tconn->int_dig_in = NULL;
2617         tconn->int_dig_vv = NULL;
2618 }
2619
2620 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2621 {
2622         cpumask_var_t new_cpu_mask;
2623         int err;
2624
2625         if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2626                 return -ENOMEM;
2627                 /*
2628                 retcode = ERR_NOMEM;
2629                 drbd_msg_put_info("unable to allocate cpumask");
2630                 */
2631
2632         /* silently ignore cpu mask on UP kernel */
2633         if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2634                 /* FIXME: Get rid of constant 32 here */
2635                 err = bitmap_parse(res_opts->cpu_mask, 32,
2636                                    cpumask_bits(new_cpu_mask), nr_cpu_ids);
2637                 if (err) {
2638                         conn_warn(tconn, "bitmap_parse() failed with %d\n", err);
2639                         /* retcode = ERR_CPU_MASK_PARSE; */
2640                         goto fail;
2641                 }
2642         }
2643         tconn->res_opts = *res_opts;
2644         if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2645                 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2646                 drbd_calc_cpu_mask(tconn);
2647                 tconn->receiver.reset_cpu_mask = 1;
2648                 tconn->asender.reset_cpu_mask = 1;
2649                 tconn->worker.reset_cpu_mask = 1;
2650         }
2651         err = 0;
2652
2653 fail:
2654         free_cpumask_var(new_cpu_mask);
2655         return err;
2656
2657 }
2658
2659 /* caller must be under genl_lock() */
2660 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2661 {
2662         struct drbd_tconn *tconn;
2663
2664         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2665         if (!tconn)
2666                 return NULL;
2667
2668         tconn->name = kstrdup(name, GFP_KERNEL);
2669         if (!tconn->name)
2670                 goto fail;
2671
2672         if (drbd_alloc_socket(&tconn->data))
2673                 goto fail;
2674         if (drbd_alloc_socket(&tconn->meta))
2675                 goto fail;
2676
2677         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2678                 goto fail;
2679
2680         if (set_resource_options(tconn, res_opts))
2681                 goto fail;
2682
2683         if (!tl_init(tconn))
2684                 goto fail;
2685
2686         tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2687         if (!tconn->current_epoch)
2688                 goto fail;
2689         INIT_LIST_HEAD(&tconn->current_epoch->list);
2690         tconn->epochs = 1;
2691         spin_lock_init(&tconn->epoch_lock);
2692         tconn->write_ordering = WO_bdev_flush;
2693
2694         tconn->cstate = C_STANDALONE;
2695         mutex_init(&tconn->cstate_mutex);
2696         spin_lock_init(&tconn->req_lock);
2697         mutex_init(&tconn->conf_update);
2698         init_waitqueue_head(&tconn->ping_wait);
2699         idr_init(&tconn->volumes);
2700
2701         drbd_init_workqueue(&tconn->sender_work);
2702         mutex_init(&tconn->data.mutex);
2703         mutex_init(&tconn->meta.mutex);
2704
2705         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2706         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2707         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2708
2709         kref_init(&tconn->kref);
2710         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2711
2712         return tconn;
2713
2714 fail:
2715         kfree(tconn->current_epoch);
2716         tl_cleanup(tconn);
2717         free_cpumask_var(tconn->cpu_mask);
2718         drbd_free_socket(&tconn->meta);
2719         drbd_free_socket(&tconn->data);
2720         kfree(tconn->name);
2721         kfree(tconn);
2722
2723         return NULL;
2724 }
2725
2726 void conn_destroy(struct kref *kref)
2727 {
2728         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2729
2730         if (atomic_read(&tconn->current_epoch->epoch_size) !=  0)
2731                 conn_err(tconn, "epoch_size:%d\n", atomic_read(&tconn->current_epoch->epoch_size));
2732         kfree(tconn->current_epoch);
2733
2734         idr_destroy(&tconn->volumes);
2735
2736         free_cpumask_var(tconn->cpu_mask);
2737         drbd_free_socket(&tconn->meta);
2738         drbd_free_socket(&tconn->data);
2739         kfree(tconn->name);
2740         kfree(tconn->int_dig_in);
2741         kfree(tconn->int_dig_vv);
2742         kfree(tconn);
2743 }
2744
2745 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2746 {
2747         struct drbd_conf *mdev;
2748         struct gendisk *disk;
2749         struct request_queue *q;
2750         int vnr_got = vnr;
2751         int minor_got = minor;
2752         enum drbd_ret_code err = ERR_NOMEM;
2753
2754         mdev = minor_to_mdev(minor);
2755         if (mdev)
2756                 return ERR_MINOR_EXISTS;
2757
2758         /* GFP_KERNEL, we are outside of all write-out paths */
2759         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2760         if (!mdev)
2761                 return ERR_NOMEM;
2762
2763         kref_get(&tconn->kref);
2764         mdev->tconn = tconn;
2765
2766         mdev->minor = minor;
2767         mdev->vnr = vnr;
2768
2769         drbd_init_set_defaults(mdev);
2770
2771         q = blk_alloc_queue(GFP_KERNEL);
2772         if (!q)
2773                 goto out_no_q;
2774         mdev->rq_queue = q;
2775         q->queuedata   = mdev;
2776
2777         disk = alloc_disk(1);
2778         if (!disk)
2779                 goto out_no_disk;
2780         mdev->vdisk = disk;
2781
2782         set_disk_ro(disk, true);
2783
2784         disk->queue = q;
2785         disk->major = DRBD_MAJOR;
2786         disk->first_minor = minor;
2787         disk->fops = &drbd_ops;
2788         sprintf(disk->disk_name, "drbd%d", minor);
2789         disk->private_data = mdev;
2790
2791         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2792         /* we have no partitions. we contain only ourselves. */
2793         mdev->this_bdev->bd_contains = mdev->this_bdev;
2794
2795         q->backing_dev_info.congested_fn = drbd_congested;
2796         q->backing_dev_info.congested_data = mdev;
2797
2798         blk_queue_make_request(q, drbd_make_request);
2799         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2800            This triggers a max_bio_size message upon first attach or connect */
2801         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2802         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2803         blk_queue_merge_bvec(q, drbd_merge_bvec);
2804         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2805
2806         mdev->md_io_page = alloc_page(GFP_KERNEL);
2807         if (!mdev->md_io_page)
2808                 goto out_no_io_page;
2809
2810         if (drbd_bm_init(mdev))
2811                 goto out_no_bitmap;
2812         mdev->read_requests = RB_ROOT;
2813         mdev->write_requests = RB_ROOT;
2814
2815         if (!idr_pre_get(&minors, GFP_KERNEL))
2816                 goto out_no_minor_idr;
2817         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2818                 goto out_no_minor_idr;
2819         if (minor_got != minor) {
2820                 err = ERR_MINOR_EXISTS;
2821                 drbd_msg_put_info("requested minor exists already");
2822                 goto out_idr_remove_minor;
2823         }
2824
2825         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2826                 goto out_idr_remove_minor;
2827         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2828                 goto out_idr_remove_minor;
2829         if (vnr_got != vnr) {
2830                 err = ERR_INVALID_REQUEST;
2831                 drbd_msg_put_info("requested volume exists already");
2832                 goto out_idr_remove_vol;
2833         }
2834         add_disk(disk);
2835         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2836
2837         /* inherit the connection state */
2838         mdev->state.conn = tconn->cstate;
2839         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2840                 drbd_connected(mdev);
2841
2842         return NO_ERROR;
2843
2844 out_idr_remove_vol:
2845         idr_remove(&tconn->volumes, vnr_got);
2846 out_idr_remove_minor:
2847         idr_remove(&minors, minor_got);
2848         synchronize_rcu();
2849 out_no_minor_idr:
2850         drbd_bm_cleanup(mdev);
2851 out_no_bitmap:
2852         __free_page(mdev->md_io_page);
2853 out_no_io_page:
2854         put_disk(disk);
2855 out_no_disk:
2856         blk_cleanup_queue(q);
2857 out_no_q:
2858         kfree(mdev);
2859         kref_put(&tconn->kref, &conn_destroy);
2860         return err;
2861 }
2862
2863 int __init drbd_init(void)
2864 {
2865         int err;
2866
2867         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2868                 printk(KERN_ERR
2869                        "drbd: invalid minor_count (%d)\n", minor_count);
2870 #ifdef MODULE
2871                 return -EINVAL;
2872 #else
2873                 minor_count = DRBD_MINOR_COUNT_DEF;
2874 #endif
2875         }
2876
2877         err = register_blkdev(DRBD_MAJOR, "drbd");
2878         if (err) {
2879                 printk(KERN_ERR
2880                        "drbd: unable to register block device major %d\n",
2881                        DRBD_MAJOR);
2882                 return err;
2883         }
2884
2885         err = drbd_genl_register();
2886         if (err) {
2887                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2888                 goto fail;
2889         }
2890
2891
2892         register_reboot_notifier(&drbd_notifier);
2893
2894         /*
2895          * allocate all necessary structs
2896          */
2897         err = -ENOMEM;
2898
2899         init_waitqueue_head(&drbd_pp_wait);
2900
2901         drbd_proc = NULL; /* play safe for drbd_cleanup */
2902         idr_init(&minors);
2903
2904         err = drbd_create_mempools();
2905         if (err)
2906                 goto fail;
2907
2908         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2909         if (!drbd_proc) {
2910                 printk(KERN_ERR "drbd: unable to register proc file\n");
2911                 goto fail;
2912         }
2913
2914         rwlock_init(&global_state_lock);
2915         INIT_LIST_HEAD(&drbd_tconns);
2916
2917         retry.wq = create_singlethread_workqueue("drbd-reissue");
2918         if (!retry.wq) {
2919                 printk(KERN_ERR "drbd: unable to create retry workqueue\n");
2920                 goto fail;
2921         }
2922         INIT_WORK(&retry.worker, do_retry);
2923         spin_lock_init(&retry.lock);
2924         INIT_LIST_HEAD(&retry.writes);
2925
2926         printk(KERN_INFO "drbd: initialized. "
2927                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2928                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2929         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2930         printk(KERN_INFO "drbd: registered as block device major %d\n",
2931                 DRBD_MAJOR);
2932
2933         return 0; /* Success! */
2934
2935 fail:
2936         drbd_cleanup();
2937         if (err == -ENOMEM)
2938                 /* currently always the case */
2939                 printk(KERN_ERR "drbd: ran out of memory\n");
2940         else
2941                 printk(KERN_ERR "drbd: initialization failure\n");
2942         return err;
2943 }
2944
2945 void drbd_free_bc(struct drbd_backing_dev *ldev)
2946 {
2947         if (ldev == NULL)
2948                 return;
2949
2950         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2951         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2952
2953         kfree(ldev);
2954 }
2955
2956 void drbd_free_sock(struct drbd_tconn *tconn)
2957 {
2958         if (tconn->data.socket) {
2959                 mutex_lock(&tconn->data.mutex);
2960                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2961                 sock_release(tconn->data.socket);
2962                 tconn->data.socket = NULL;
2963                 mutex_unlock(&tconn->data.mutex);
2964         }
2965         if (tconn->meta.socket) {
2966                 mutex_lock(&tconn->meta.mutex);
2967                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2968                 sock_release(tconn->meta.socket);
2969                 tconn->meta.socket = NULL;
2970                 mutex_unlock(&tconn->meta.mutex);
2971         }
2972 }
2973
2974 /* meta data management */
2975
2976 struct meta_data_on_disk {
2977         u64 la_size;           /* last agreed size. */
2978         u64 uuid[UI_SIZE];   /* UUIDs. */
2979         u64 device_uuid;
2980         u64 reserved_u64_1;
2981         u32 flags;             /* MDF */
2982         u32 magic;
2983         u32 md_size_sect;
2984         u32 al_offset;         /* offset to this block */
2985         u32 al_nr_extents;     /* important for restoring the AL */
2986               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2987         u32 bm_offset;         /* offset to the bitmap, from here */
2988         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2989         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2990         u32 reserved_u32[3];
2991
2992 } __packed;
2993
2994 /**
2995  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2996  * @mdev:       DRBD device.
2997  */
2998 void drbd_md_sync(struct drbd_conf *mdev)
2999 {
3000         struct meta_data_on_disk *buffer;
3001         sector_t sector;
3002         int i;
3003
3004         del_timer(&mdev->md_sync_timer);
3005         /* timer may be rearmed by drbd_md_mark_dirty() now. */
3006         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3007                 return;
3008
3009         /* We use here D_FAILED and not D_ATTACHING because we try to write
3010          * metadata even if we detach due to a disk failure! */
3011         if (!get_ldev_if_state(mdev, D_FAILED))
3012                 return;
3013
3014         buffer = drbd_md_get_buffer(mdev);
3015         if (!buffer)
3016                 goto out;
3017
3018         memset(buffer, 0, 512);
3019
3020         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3021         for (i = UI_CURRENT; i < UI_SIZE; i++)
3022                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3023         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3024         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
3025
3026         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3027         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3028         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3029         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3030         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3031
3032         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3033         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3034
3035         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3036         sector = mdev->ldev->md.md_offset;
3037
3038         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3039                 /* this was a try anyways ... */
3040                 dev_err(DEV, "meta data update failed!\n");
3041                 drbd_chk_io_error(mdev, 1, true);
3042         }
3043
3044         /* Update mdev->ldev->md.la_size_sect,
3045          * since we updated it on metadata. */
3046         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3047
3048         drbd_md_put_buffer(mdev);
3049 out:
3050         put_ldev(mdev);
3051 }
3052
3053 /**
3054  * drbd_md_read() - Reads in the meta data super block
3055  * @mdev:       DRBD device.
3056  * @bdev:       Device from which the meta data should be read in.
3057  *
3058  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3059  * something goes wrong.
3060  */
3061 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3062 {
3063         struct meta_data_on_disk *buffer;
3064         u32 magic, flags;
3065         int i, rv = NO_ERROR;
3066
3067         if (!get_ldev_if_state(mdev, D_ATTACHING))
3068                 return ERR_IO_MD_DISK;
3069
3070         buffer = drbd_md_get_buffer(mdev);
3071         if (!buffer)
3072                 goto out;
3073
3074         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3075                 /* NOTE: can't do normal error processing here as this is
3076                    called BEFORE disk is attached */
3077                 dev_err(DEV, "Error while reading metadata.\n");
3078                 rv = ERR_IO_MD_DISK;
3079                 goto err;
3080         }
3081
3082         magic = be32_to_cpu(buffer->magic);
3083         flags = be32_to_cpu(buffer->flags);
3084         if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
3085             (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
3086                         /* btw: that's Activity Log clean, not "all" clean. */
3087                 dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
3088                 rv = ERR_MD_UNCLEAN;
3089                 goto err;
3090         }
3091         if (magic != DRBD_MD_MAGIC_08) {
3092                 if (magic == DRBD_MD_MAGIC_07)
3093                         dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3094                 else
3095                         dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3096                 rv = ERR_MD_INVALID;
3097                 goto err;
3098         }
3099         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3100                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3101                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3102                 rv = ERR_MD_INVALID;
3103                 goto err;
3104         }
3105         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3106                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3107                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3108                 rv = ERR_MD_INVALID;
3109                 goto err;
3110         }
3111         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3112                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3113                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3114                 rv = ERR_MD_INVALID;
3115                 goto err;
3116         }
3117
3118         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3119                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3120                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3121                 rv = ERR_MD_INVALID;
3122                 goto err;
3123         }
3124
3125         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3126         for (i = UI_CURRENT; i < UI_SIZE; i++)
3127                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3128         bdev->md.flags = be32_to_cpu(buffer->flags);
3129         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3130
3131         spin_lock_irq(&mdev->tconn->req_lock);
3132         if (mdev->state.conn < C_CONNECTED) {
3133                 int peer;
3134                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3135                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3136                 mdev->peer_max_bio_size = peer;
3137         }
3138         spin_unlock_irq(&mdev->tconn->req_lock);
3139
3140  err:
3141         drbd_md_put_buffer(mdev);
3142  out:
3143         put_ldev(mdev);
3144
3145         return rv;
3146 }
3147
3148 /**
3149  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3150  * @mdev:       DRBD device.
3151  *
3152  * Call this function if you change anything that should be written to
3153  * the meta-data super block. This function sets MD_DIRTY, and starts a
3154  * timer that ensures that within five seconds you have to call drbd_md_sync().
3155  */
3156 #ifdef DEBUG
3157 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3158 {
3159         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3160                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3161                 mdev->last_md_mark_dirty.line = line;
3162                 mdev->last_md_mark_dirty.func = func;
3163         }
3164 }
3165 #else
3166 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3167 {
3168         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3169                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3170 }
3171 #endif
3172
3173 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3174 {
3175         int i;
3176
3177         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3178                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3179 }
3180
3181 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3182 {
3183         if (idx == UI_CURRENT) {
3184                 if (mdev->state.role == R_PRIMARY)
3185                         val |= 1;
3186                 else
3187                         val &= ~((u64)1);
3188
3189                 drbd_set_ed_uuid(mdev, val);
3190         }
3191
3192         mdev->ldev->md.uuid[idx] = val;
3193         drbd_md_mark_dirty(mdev);
3194 }
3195
3196
3197 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3198 {
3199         if (mdev->ldev->md.uuid[idx]) {
3200                 drbd_uuid_move_history(mdev);
3201                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3202         }
3203         _drbd_uuid_set(mdev, idx, val);
3204 }
3205
3206 /**
3207  * drbd_uuid_new_current() - Creates a new current UUID
3208  * @mdev:       DRBD device.
3209  *
3210  * Creates a new current UUID, and rotates the old current UUID into
3211  * the bitmap slot. Causes an incremental resync upon next connect.
3212  */
3213 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3214 {
3215         u64 val;
3216         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3217
3218         if (bm_uuid)
3219                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3220
3221         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3222
3223         get_random_bytes(&val, sizeof(u64));
3224         _drbd_uuid_set(mdev, UI_CURRENT, val);
3225         drbd_print_uuids(mdev, "new current UUID");
3226         /* get it to stable storage _now_ */
3227         drbd_md_sync(mdev);
3228 }
3229
3230 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3231 {
3232         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3233                 return;
3234
3235         if (val == 0) {
3236                 drbd_uuid_move_history(mdev);
3237                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3238                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3239         } else {
3240                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3241                 if (bm_uuid)
3242                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3243
3244                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3245         }
3246         drbd_md_mark_dirty(mdev);
3247 }
3248
3249 /**
3250  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3251  * @mdev:       DRBD device.
3252  *
3253  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3254  */
3255 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3256 {
3257         int rv = -EIO;
3258
3259         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3260                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3261                 drbd_md_sync(mdev);
3262                 drbd_bm_set_all(mdev);
3263
3264                 rv = drbd_bm_write(mdev);
3265
3266                 if (!rv) {
3267                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3268                         drbd_md_sync(mdev);
3269                 }
3270
3271                 put_ldev(mdev);
3272         }
3273
3274         return rv;
3275 }
3276
3277 /**
3278  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3279  * @mdev:       DRBD device.
3280  *
3281  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3282  */
3283 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3284 {
3285         int rv = -EIO;
3286
3287         drbd_resume_al(mdev);
3288         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3289                 drbd_bm_clear_all(mdev);
3290                 rv = drbd_bm_write(mdev);
3291                 put_ldev(mdev);
3292         }
3293
3294         return rv;
3295 }
3296
3297 static int w_bitmap_io(struct drbd_work *w, int unused)
3298 {
3299         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3300         struct drbd_conf *mdev = w->mdev;
3301         int rv = -EIO;
3302
3303         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3304
3305         if (get_ldev(mdev)) {
3306                 drbd_bm_lock(mdev, work->why, work->flags);
3307                 rv = work->io_fn(mdev);
3308                 drbd_bm_unlock(mdev);
3309                 put_ldev(mdev);
3310         }
3311
3312         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3313         wake_up(&mdev->misc_wait);
3314
3315         if (work->done)
3316                 work->done(mdev, rv);
3317
3318         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3319         work->why = NULL;
3320         work->flags = 0;
3321
3322         return 0;
3323 }
3324
3325 void drbd_ldev_destroy(struct drbd_conf *mdev)
3326 {
3327         lc_destroy(mdev->resync);
3328         mdev->resync = NULL;
3329         lc_destroy(mdev->act_log);
3330         mdev->act_log = NULL;
3331         __no_warn(local,
3332                 drbd_free_bc(mdev->ldev);
3333                 mdev->ldev = NULL;);
3334
3335         clear_bit(GO_DISKLESS, &mdev->flags);
3336 }
3337
3338 static int w_go_diskless(struct drbd_work *w, int unused)
3339 {
3340         struct drbd_conf *mdev = w->mdev;
3341
3342         D_ASSERT(mdev->state.disk == D_FAILED);
3343         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3344          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3345          * the protected members anymore, though, so once put_ldev reaches zero
3346          * again, it will be safe to free them. */
3347         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3348         return 0;
3349 }
3350
3351 void drbd_go_diskless(struct drbd_conf *mdev)
3352 {
3353         D_ASSERT(mdev->state.disk == D_FAILED);
3354         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3355                 drbd_queue_work(&mdev->tconn->sender_work, &mdev->go_diskless);
3356 }
3357
3358 /**
3359  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3360  * @mdev:       DRBD device.
3361  * @io_fn:      IO callback to be called when bitmap IO is possible
3362  * @done:       callback to be called after the bitmap IO was performed
3363  * @why:        Descriptive text of the reason for doing the IO
3364  *
3365  * While IO on the bitmap happens we freeze application IO thus we ensure
3366  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3367  * called from worker context. It MUST NOT be used while a previous such
3368  * work is still pending!
3369  */
3370 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3371                           int (*io_fn)(struct drbd_conf *),
3372                           void (*done)(struct drbd_conf *, int),
3373                           char *why, enum bm_flag flags)
3374 {
3375         D_ASSERT(current == mdev->tconn->worker.task);
3376
3377         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3378         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3379         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3380         if (mdev->bm_io_work.why)
3381                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3382                         why, mdev->bm_io_work.why);
3383
3384         mdev->bm_io_work.io_fn = io_fn;
3385         mdev->bm_io_work.done = done;
3386         mdev->bm_io_work.why = why;
3387         mdev->bm_io_work.flags = flags;
3388
3389         spin_lock_irq(&mdev->tconn->req_lock);
3390         set_bit(BITMAP_IO, &mdev->flags);
3391         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3392                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3393                         drbd_queue_work(&mdev->tconn->sender_work, &mdev->bm_io_work.w);
3394         }
3395         spin_unlock_irq(&mdev->tconn->req_lock);
3396 }
3397
3398 /**
3399  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3400  * @mdev:       DRBD device.
3401  * @io_fn:      IO callback to be called when bitmap IO is possible
3402  * @why:        Descriptive text of the reason for doing the IO
3403  *
3404  * freezes application IO while that the actual IO operations runs. This
3405  * functions MAY NOT be called from worker context.
3406  */
3407 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3408                 char *why, enum bm_flag flags)
3409 {
3410         int rv;
3411
3412         D_ASSERT(current != mdev->tconn->worker.task);
3413
3414         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3415                 drbd_suspend_io(mdev);
3416
3417         drbd_bm_lock(mdev, why, flags);
3418         rv = io_fn(mdev);
3419         drbd_bm_unlock(mdev);
3420
3421         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3422                 drbd_resume_io(mdev);
3423
3424         return rv;
3425 }
3426
3427 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3428 {
3429         if ((mdev->ldev->md.flags & flag) != flag) {
3430                 drbd_md_mark_dirty(mdev);
3431                 mdev->ldev->md.flags |= flag;
3432         }
3433 }
3434
3435 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3436 {
3437         if ((mdev->ldev->md.flags & flag) != 0) {
3438                 drbd_md_mark_dirty(mdev);
3439                 mdev->ldev->md.flags &= ~flag;
3440         }
3441 }
3442 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3443 {
3444         return (bdev->md.flags & flag) != 0;
3445 }
3446
3447 static void md_sync_timer_fn(unsigned long data)
3448 {
3449         struct drbd_conf *mdev = (struct drbd_conf *) data;
3450
3451         drbd_queue_work_front(&mdev->tconn->sender_work, &mdev->md_sync_work);
3452 }
3453
3454 static int w_md_sync(struct drbd_work *w, int unused)
3455 {
3456         struct drbd_conf *mdev = w->mdev;
3457
3458         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3459 #ifdef DEBUG
3460         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3461                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3462 #endif
3463         drbd_md_sync(mdev);
3464         return 0;
3465 }
3466
3467 const char *cmdname(enum drbd_packet cmd)
3468 {
3469         /* THINK may need to become several global tables
3470          * when we want to support more than
3471          * one PRO_VERSION */
3472         static const char *cmdnames[] = {
3473                 [P_DATA]                = "Data",
3474                 [P_DATA_REPLY]          = "DataReply",
3475                 [P_RS_DATA_REPLY]       = "RSDataReply",
3476                 [P_BARRIER]             = "Barrier",
3477                 [P_BITMAP]              = "ReportBitMap",
3478                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3479                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3480                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3481                 [P_DATA_REQUEST]        = "DataRequest",
3482                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3483                 [P_SYNC_PARAM]          = "SyncParam",
3484                 [P_SYNC_PARAM89]        = "SyncParam89",
3485                 [P_PROTOCOL]            = "ReportProtocol",
3486                 [P_UUIDS]               = "ReportUUIDs",
3487                 [P_SIZES]               = "ReportSizes",
3488                 [P_STATE]               = "ReportState",
3489                 [P_SYNC_UUID]           = "ReportSyncUUID",
3490                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3491                 [P_AUTH_RESPONSE]       = "AuthResponse",
3492                 [P_PING]                = "Ping",
3493                 [P_PING_ACK]            = "PingAck",
3494                 [P_RECV_ACK]            = "RecvAck",
3495                 [P_WRITE_ACK]           = "WriteAck",
3496                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3497                 [P_DISCARD_WRITE]        = "DiscardWrite",
3498                 [P_NEG_ACK]             = "NegAck",
3499                 [P_NEG_DREPLY]          = "NegDReply",
3500                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3501                 [P_BARRIER_ACK]         = "BarrierAck",
3502                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3503                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3504                 [P_OV_REQUEST]          = "OVRequest",
3505                 [P_OV_REPLY]            = "OVReply",
3506                 [P_OV_RESULT]           = "OVResult",
3507                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3508                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3509                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3510                 [P_DELAY_PROBE]         = "DelayProbe",
3511                 [P_OUT_OF_SYNC]         = "OutOfSync",
3512                 [P_RETRY_WRITE]         = "RetryWrite",
3513                 [P_RS_CANCEL]           = "RSCancel",
3514                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3515                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3516                 [P_RETRY_WRITE]         = "retry_write",
3517                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3518
3519                 /* enum drbd_packet, but not commands - obsoleted flags:
3520                  *      P_MAY_IGNORE
3521                  *      P_MAX_OPT_CMD
3522                  */
3523         };
3524
3525         /* too big for the array: 0xfffX */
3526         if (cmd == P_INITIAL_META)
3527                 return "InitialMeta";
3528         if (cmd == P_INITIAL_DATA)
3529                 return "InitialData";
3530         if (cmd == P_CONNECTION_FEATURES)
3531                 return "ConnectionFeatures";
3532         if (cmd >= ARRAY_SIZE(cmdnames))
3533                 return "Unknown";
3534         return cmdnames[cmd];
3535 }
3536
3537 /**
3538  * drbd_wait_misc  -  wait for a request to make progress
3539  * @mdev:       device associated with the request
3540  * @i:          the struct drbd_interval embedded in struct drbd_request or
3541  *              struct drbd_peer_request
3542  */
3543 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3544 {
3545         struct net_conf *nc;
3546         DEFINE_WAIT(wait);
3547         long timeout;
3548
3549         rcu_read_lock();
3550         nc = rcu_dereference(mdev->tconn->net_conf);
3551         if (!nc) {
3552                 rcu_read_unlock();
3553                 return -ETIMEDOUT;
3554         }
3555         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3556         rcu_read_unlock();
3557
3558         /* Indicate to wake up mdev->misc_wait on progress.  */
3559         i->waiting = true;
3560         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3561         spin_unlock_irq(&mdev->tconn->req_lock);
3562         timeout = schedule_timeout(timeout);
3563         finish_wait(&mdev->misc_wait, &wait);
3564         spin_lock_irq(&mdev->tconn->req_lock);
3565         if (!timeout || mdev->state.conn < C_CONNECTED)
3566                 return -ETIMEDOUT;
3567         if (signal_pending(current))
3568                 return -ERESTARTSYS;
3569         return 0;
3570 }
3571
3572 #ifdef CONFIG_DRBD_FAULT_INJECTION
3573 /* Fault insertion support including random number generator shamelessly
3574  * stolen from kernel/rcutorture.c */
3575 struct fault_random_state {
3576         unsigned long state;
3577         unsigned long count;
3578 };
3579
3580 #define FAULT_RANDOM_MULT 39916801  /* prime */
3581 #define FAULT_RANDOM_ADD        479001701 /* prime */
3582 #define FAULT_RANDOM_REFRESH 10000
3583
3584 /*
3585  * Crude but fast random-number generator.  Uses a linear congruential
3586  * generator, with occasional help from get_random_bytes().
3587  */
3588 static unsigned long
3589 _drbd_fault_random(struct fault_random_state *rsp)
3590 {
3591         long refresh;
3592
3593         if (!rsp->count--) {
3594                 get_random_bytes(&refresh, sizeof(refresh));
3595                 rsp->state += refresh;
3596                 rsp->count = FAULT_RANDOM_REFRESH;
3597         }
3598         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3599         return swahw32(rsp->state);
3600 }
3601
3602 static char *
3603 _drbd_fault_str(unsigned int type) {
3604         static char *_faults[] = {
3605                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3606                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3607                 [DRBD_FAULT_RS_WR] = "Resync write",
3608                 [DRBD_FAULT_RS_RD] = "Resync read",
3609                 [DRBD_FAULT_DT_WR] = "Data write",
3610                 [DRBD_FAULT_DT_RD] = "Data read",
3611                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3612                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3613                 [DRBD_FAULT_AL_EE] = "EE allocation",
3614                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3615         };
3616
3617         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3618 }
3619
3620 unsigned int
3621 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3622 {
3623         static struct fault_random_state rrs = {0, 0};
3624
3625         unsigned int ret = (
3626                 (fault_devs == 0 ||
3627                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3628                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3629
3630         if (ret) {
3631                 fault_count++;
3632
3633                 if (__ratelimit(&drbd_ratelimit_state))
3634                         dev_warn(DEV, "***Simulating %s failure\n",
3635                                 _drbd_fault_str(type));
3636         }
3637
3638         return ret;
3639 }
3640 #endif
3641
3642 const char *drbd_buildtag(void)
3643 {
3644         /* DRBD built from external sources has here a reference to the
3645            git hash of the source code. */
3646
3647         static char buildtag[38] = "\0uilt-in";
3648
3649         if (buildtag[0] == 0) {
3650 #ifdef CONFIG_MODULES
3651                 if (THIS_MODULE != NULL)
3652                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3653                 else
3654 #endif
3655                         buildtag[0] = 'b';
3656         }
3657
3658         return buildtag;
3659 }
3660
3661 module_init(drbd_init)
3662 module_exit(drbd_cleanup)
3663
3664 EXPORT_SYMBOL(drbd_conn_str);
3665 EXPORT_SYMBOL(drbd_role_str);
3666 EXPORT_SYMBOL(drbd_disk_str);
3667 EXPORT_SYMBOL(drbd_set_st_err_str);