drbd: Silenced compiler warnings
[linux-2.6-block.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629         sndbuf_size = nc->sndbuf_size;
630         rcvbuf_size = nc->rcvbuf_size;
631         connect_int = nc->connect_int;
632         rcu_read_unlock();
633
634         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
644
645         what = "sock_create_kern";
646         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647                                SOCK_STREAM, IPPROTO_TCP, &sock);
648         if (err < 0) {
649                 sock = NULL;
650                 goto out;
651         }
652
653         sock->sk->sk_rcvtimeo =
654         sock->sk->sk_sndtimeo = connect_int * HZ;
655         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
656
657        /* explicitly bind to the configured IP as source IP
658         *  for the outgoing connections.
659         *  This is needed for multihomed hosts and to be
660         *  able to use lo: interfaces for drbd.
661         * Make sure to use 0 as port number, so linux selects
662         *  a free one dynamically.
663         */
664         what = "bind before connect";
665         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
666         if (err < 0)
667                 goto out;
668
669         /* connect may fail, peer not yet available.
670          * stay C_WF_CONNECTION, don't go Disconnecting! */
671         disconnect_on_error = 0;
672         what = "connect";
673         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
674
675 out:
676         if (err < 0) {
677                 if (sock) {
678                         sock_release(sock);
679                         sock = NULL;
680                 }
681                 switch (-err) {
682                         /* timeout, busy, signal pending */
683                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684                 case EINTR: case ERESTARTSYS:
685                         /* peer not (yet) available, network problem */
686                 case ECONNREFUSED: case ENETUNREACH:
687                 case EHOSTDOWN:    case EHOSTUNREACH:
688                         disconnect_on_error = 0;
689                         break;
690                 default:
691                         conn_err(tconn, "%s failed, err = %d\n", what, err);
692                 }
693                 if (disconnect_on_error)
694                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
695         }
696
697         return sock;
698 }
699
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
701 {
702         int timeo, err, my_addr_len;
703         int sndbuf_size, rcvbuf_size, connect_int;
704         struct socket *s_estab = NULL, *s_listen;
705         struct sockaddr_in6 my_addr;
706         struct net_conf *nc;
707         const char *what;
708
709         rcu_read_lock();
710         nc = rcu_dereference(tconn->net_conf);
711         if (!nc) {
712                 rcu_read_unlock();
713                 return NULL;
714         }
715         sndbuf_size = nc->sndbuf_size;
716         rcvbuf_size = nc->rcvbuf_size;
717         connect_int = nc->connect_int;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         timeo = connect_int * HZ;
732         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
735         s_listen->sk->sk_rcvtimeo = timeo;
736         s_listen->sk->sk_sndtimeo = timeo;
737         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
738
739         what = "bind before listen";
740         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
741         if (err < 0)
742                 goto out;
743
744         err = drbd_accept(&what, s_listen, &s_estab);
745
746 out:
747         if (s_listen)
748                 sock_release(s_listen);
749         if (err < 0) {
750                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751                         conn_err(tconn, "%s failed, err = %d\n", what, err);
752                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
753                 }
754         }
755
756         return s_estab;
757 }
758
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
760
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762                              enum drbd_packet cmd)
763 {
764         if (!conn_prepare_command(tconn, sock))
765                 return -EIO;
766         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
767 }
768
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
770 {
771         unsigned int header_size = drbd_header_size(tconn);
772         struct packet_info pi;
773         int err;
774
775         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776         if (err != header_size) {
777                 if (err >= 0)
778                         err = -EIO;
779                 return err;
780         }
781         err = decode_header(tconn, tconn->data.rbuf, &pi);
782         if (err)
783                 return err;
784         return pi.cmd;
785 }
786
787 /**
788  * drbd_socket_okay() - Free the socket if its connection is not okay
789  * @sock:       pointer to the pointer to the socket.
790  */
791 static int drbd_socket_okay(struct socket **sock)
792 {
793         int rr;
794         char tb[4];
795
796         if (!*sock)
797                 return false;
798
799         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
800
801         if (rr > 0 || rr == -EAGAIN) {
802                 return true;
803         } else {
804                 sock_release(*sock);
805                 *sock = NULL;
806                 return false;
807         }
808 }
809 /* Gets called if a connection is established, or if a new minor gets created
810    in a connection */
811 int drbd_connected(struct drbd_conf *mdev)
812 {
813         int err;
814
815         atomic_set(&mdev->packet_seq, 0);
816         mdev->peer_seq = 0;
817
818         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819                 &mdev->tconn->cstate_mutex :
820                 &mdev->own_state_mutex;
821
822         err = drbd_send_sync_param(mdev);
823         if (!err)
824                 err = drbd_send_sizes(mdev, 0, 0);
825         if (!err)
826                 err = drbd_send_uuids(mdev);
827         if (!err)
828                 err = drbd_send_state(mdev);
829         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830         clear_bit(RESIZE_PENDING, &mdev->flags);
831         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
832         return err;
833 }
834
835 /*
836  * return values:
837  *   1 yes, we have a valid connection
838  *   0 oops, did not work out, please try again
839  *  -1 peer talks different language,
840  *     no point in trying again, please go standalone.
841  *  -2 We do not have a network config...
842  */
843 static int conn_connect(struct drbd_tconn *tconn)
844 {
845         struct socket *sock, *msock;
846         struct drbd_conf *mdev;
847         struct net_conf *nc;
848         int vnr, timeout, try, h, ok;
849         bool discard_my_data;
850
851         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
852                 return -2;
853
854         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
855
856         /* Assume that the peer only understands protocol 80 until we know better.  */
857         tconn->agreed_pro_version = 80;
858
859         do {
860                 struct socket *s;
861
862                 for (try = 0;;) {
863                         /* 3 tries, this should take less than a second! */
864                         s = drbd_try_connect(tconn);
865                         if (s || ++try >= 3)
866                                 break;
867                         /* give the other side time to call bind() & listen() */
868                         schedule_timeout_interruptible(HZ / 10);
869                 }
870
871                 if (s) {
872                         if (!tconn->data.socket) {
873                                 tconn->data.socket = s;
874                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
875                         } else if (!tconn->meta.socket) {
876                                 tconn->meta.socket = s;
877                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
878                         } else {
879                                 conn_err(tconn, "Logic error in conn_connect()\n");
880                                 goto out_release_sockets;
881                         }
882                 }
883
884                 if (tconn->data.socket && tconn->meta.socket) {
885                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
886                         ok = drbd_socket_okay(&tconn->data.socket);
887                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
888                         if (ok)
889                                 break;
890                 }
891
892 retry:
893                 s = drbd_wait_for_connect(tconn);
894                 if (s) {
895                         try = receive_first_packet(tconn, s);
896                         drbd_socket_okay(&tconn->data.socket);
897                         drbd_socket_okay(&tconn->meta.socket);
898                         switch (try) {
899                         case P_INITIAL_DATA:
900                                 if (tconn->data.socket) {
901                                         conn_warn(tconn, "initial packet S crossed\n");
902                                         sock_release(tconn->data.socket);
903                                 }
904                                 tconn->data.socket = s;
905                                 break;
906                         case P_INITIAL_META:
907                                 if (tconn->meta.socket) {
908                                         conn_warn(tconn, "initial packet M crossed\n");
909                                         sock_release(tconn->meta.socket);
910                                 }
911                                 tconn->meta.socket = s;
912                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
913                                 break;
914                         default:
915                                 conn_warn(tconn, "Error receiving initial packet\n");
916                                 sock_release(s);
917                                 if (random32() & 1)
918                                         goto retry;
919                         }
920                 }
921
922                 if (tconn->cstate <= C_DISCONNECTING)
923                         goto out_release_sockets;
924                 if (signal_pending(current)) {
925                         flush_signals(current);
926                         smp_rmb();
927                         if (get_t_state(&tconn->receiver) == EXITING)
928                                 goto out_release_sockets;
929                 }
930
931                 if (tconn->data.socket && &tconn->meta.socket) {
932                         ok = drbd_socket_okay(&tconn->data.socket);
933                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
934                         if (ok)
935                                 break;
936                 }
937         } while (1);
938
939         sock  = tconn->data.socket;
940         msock = tconn->meta.socket;
941
942         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
943         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
944
945         sock->sk->sk_allocation = GFP_NOIO;
946         msock->sk->sk_allocation = GFP_NOIO;
947
948         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
949         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
950
951         /* NOT YET ...
952          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
953          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
954          * first set it to the P_CONNECTION_FEATURES timeout,
955          * which we set to 4x the configured ping_timeout. */
956         rcu_read_lock();
957         nc = rcu_dereference(tconn->net_conf);
958
959         sock->sk->sk_sndtimeo =
960         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
961
962         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
963         timeout = nc->timeout * HZ / 10;
964         discard_my_data = nc->discard_my_data;
965         rcu_read_unlock();
966
967         msock->sk->sk_sndtimeo = timeout;
968
969         /* we don't want delays.
970          * we use TCP_CORK where appropriate, though */
971         drbd_tcp_nodelay(sock);
972         drbd_tcp_nodelay(msock);
973
974         tconn->last_received = jiffies;
975
976         h = drbd_do_features(tconn);
977         if (h <= 0)
978                 return h;
979
980         if (tconn->cram_hmac_tfm) {
981                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
982                 switch (drbd_do_auth(tconn)) {
983                 case -1:
984                         conn_err(tconn, "Authentication of peer failed\n");
985                         return -1;
986                 case 0:
987                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
988                         return 0;
989                 }
990         }
991
992         sock->sk->sk_sndtimeo = timeout;
993         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
994
995         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
996                 return -1;
997
998         rcu_read_lock();
999         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1000                 kref_get(&mdev->kref);
1001                 rcu_read_unlock();
1002
1003                 if (discard_my_data)
1004                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1005                 else
1006                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1007
1008                 drbd_connected(mdev);
1009                 kref_put(&mdev->kref, &drbd_minor_destroy);
1010                 rcu_read_lock();
1011         }
1012         rcu_read_unlock();
1013
1014         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1015                 return 0;
1016
1017         drbd_thread_start(&tconn->asender);
1018
1019         mutex_lock(&tconn->conf_update);
1020         /* The discard_my_data flag is a single-shot modifier to the next
1021          * connection attempt, the handshake of which is now well underway.
1022          * No need for rcu style copying of the whole struct
1023          * just to clear a single value. */
1024         tconn->net_conf->discard_my_data = 0;
1025         mutex_unlock(&tconn->conf_update);
1026
1027         return h;
1028
1029 out_release_sockets:
1030         if (tconn->data.socket) {
1031                 sock_release(tconn->data.socket);
1032                 tconn->data.socket = NULL;
1033         }
1034         if (tconn->meta.socket) {
1035                 sock_release(tconn->meta.socket);
1036                 tconn->meta.socket = NULL;
1037         }
1038         return -1;
1039 }
1040
1041 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1042 {
1043         unsigned int header_size = drbd_header_size(tconn);
1044
1045         if (header_size == sizeof(struct p_header100) &&
1046             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1047                 struct p_header100 *h = header;
1048                 if (h->pad != 0) {
1049                         conn_err(tconn, "Header padding is not zero\n");
1050                         return -EINVAL;
1051                 }
1052                 pi->vnr = be16_to_cpu(h->volume);
1053                 pi->cmd = be16_to_cpu(h->command);
1054                 pi->size = be32_to_cpu(h->length);
1055         } else if (header_size == sizeof(struct p_header95) &&
1056                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1057                 struct p_header95 *h = header;
1058                 pi->cmd = be16_to_cpu(h->command);
1059                 pi->size = be32_to_cpu(h->length);
1060                 pi->vnr = 0;
1061         } else if (header_size == sizeof(struct p_header80) &&
1062                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1063                 struct p_header80 *h = header;
1064                 pi->cmd = be16_to_cpu(h->command);
1065                 pi->size = be16_to_cpu(h->length);
1066                 pi->vnr = 0;
1067         } else {
1068                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1069                          be32_to_cpu(*(__be32 *)header),
1070                          tconn->agreed_pro_version);
1071                 return -EINVAL;
1072         }
1073         pi->data = header + header_size;
1074         return 0;
1075 }
1076
1077 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1078 {
1079         void *buffer = tconn->data.rbuf;
1080         int err;
1081
1082         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1083         if (err)
1084                 return err;
1085
1086         err = decode_header(tconn, buffer, pi);
1087         tconn->last_received = jiffies;
1088
1089         return err;
1090 }
1091
1092 static void drbd_flush(struct drbd_conf *mdev)
1093 {
1094         int rv;
1095
1096         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1097                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1098                                         NULL);
1099                 if (rv) {
1100                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
1101                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1102                          * don't try again for ANY return value != 0
1103                          * if (rv == -EOPNOTSUPP) */
1104                         drbd_bump_write_ordering(mdev, WO_drain_io);
1105                 }
1106                 put_ldev(mdev);
1107         }
1108 }
1109
1110 /**
1111  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1112  * @mdev:       DRBD device.
1113  * @epoch:      Epoch object.
1114  * @ev:         Epoch event.
1115  */
1116 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1117                                                struct drbd_epoch *epoch,
1118                                                enum epoch_event ev)
1119 {
1120         int epoch_size;
1121         struct drbd_epoch *next_epoch;
1122         enum finish_epoch rv = FE_STILL_LIVE;
1123
1124         spin_lock(&mdev->epoch_lock);
1125         do {
1126                 next_epoch = NULL;
1127
1128                 epoch_size = atomic_read(&epoch->epoch_size);
1129
1130                 switch (ev & ~EV_CLEANUP) {
1131                 case EV_PUT:
1132                         atomic_dec(&epoch->active);
1133                         break;
1134                 case EV_GOT_BARRIER_NR:
1135                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1136                         break;
1137                 case EV_BECAME_LAST:
1138                         /* nothing to do*/
1139                         break;
1140                 }
1141
1142                 if (epoch_size != 0 &&
1143                     atomic_read(&epoch->active) == 0 &&
1144                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1145                         if (!(ev & EV_CLEANUP)) {
1146                                 spin_unlock(&mdev->epoch_lock);
1147                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1148                                 spin_lock(&mdev->epoch_lock);
1149                         }
1150                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1151                                 dec_unacked(mdev);
1152
1153                         if (mdev->current_epoch != epoch) {
1154                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1155                                 list_del(&epoch->list);
1156                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1157                                 mdev->epochs--;
1158                                 kfree(epoch);
1159
1160                                 if (rv == FE_STILL_LIVE)
1161                                         rv = FE_DESTROYED;
1162                         } else {
1163                                 epoch->flags = 0;
1164                                 atomic_set(&epoch->epoch_size, 0);
1165                                 /* atomic_set(&epoch->active, 0); is already zero */
1166                                 if (rv == FE_STILL_LIVE)
1167                                         rv = FE_RECYCLED;
1168                                 wake_up(&mdev->ee_wait);
1169                         }
1170                 }
1171
1172                 if (!next_epoch)
1173                         break;
1174
1175                 epoch = next_epoch;
1176         } while (1);
1177
1178         spin_unlock(&mdev->epoch_lock);
1179
1180         return rv;
1181 }
1182
1183 /**
1184  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1185  * @mdev:       DRBD device.
1186  * @wo:         Write ordering method to try.
1187  */
1188 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1189 {
1190         struct disk_conf *dc;
1191         enum write_ordering_e pwo;
1192         static char *write_ordering_str[] = {
1193                 [WO_none] = "none",
1194                 [WO_drain_io] = "drain",
1195                 [WO_bdev_flush] = "flush",
1196         };
1197
1198         pwo = mdev->write_ordering;
1199         wo = min(pwo, wo);
1200         rcu_read_lock();
1201         dc = rcu_dereference(mdev->ldev->disk_conf);
1202
1203         if (wo == WO_bdev_flush && !dc->disk_flushes)
1204                 wo = WO_drain_io;
1205         if (wo == WO_drain_io && !dc->disk_drain)
1206                 wo = WO_none;
1207         rcu_read_unlock();
1208         mdev->write_ordering = wo;
1209         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1210                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1211 }
1212
1213 /**
1214  * drbd_submit_peer_request()
1215  * @mdev:       DRBD device.
1216  * @peer_req:   peer request
1217  * @rw:         flag field, see bio->bi_rw
1218  *
1219  * May spread the pages to multiple bios,
1220  * depending on bio_add_page restrictions.
1221  *
1222  * Returns 0 if all bios have been submitted,
1223  * -ENOMEM if we could not allocate enough bios,
1224  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1225  *  single page to an empty bio (which should never happen and likely indicates
1226  *  that the lower level IO stack is in some way broken). This has been observed
1227  *  on certain Xen deployments.
1228  */
1229 /* TODO allocate from our own bio_set. */
1230 int drbd_submit_peer_request(struct drbd_conf *mdev,
1231                              struct drbd_peer_request *peer_req,
1232                              const unsigned rw, const int fault_type)
1233 {
1234         struct bio *bios = NULL;
1235         struct bio *bio;
1236         struct page *page = peer_req->pages;
1237         sector_t sector = peer_req->i.sector;
1238         unsigned ds = peer_req->i.size;
1239         unsigned n_bios = 0;
1240         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1241         int err = -ENOMEM;
1242
1243         /* In most cases, we will only need one bio.  But in case the lower
1244          * level restrictions happen to be different at this offset on this
1245          * side than those of the sending peer, we may need to submit the
1246          * request in more than one bio.
1247          *
1248          * Plain bio_alloc is good enough here, this is no DRBD internally
1249          * generated bio, but a bio allocated on behalf of the peer.
1250          */
1251 next_bio:
1252         bio = bio_alloc(GFP_NOIO, nr_pages);
1253         if (!bio) {
1254                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1255                 goto fail;
1256         }
1257         /* > peer_req->i.sector, unless this is the first bio */
1258         bio->bi_sector = sector;
1259         bio->bi_bdev = mdev->ldev->backing_bdev;
1260         bio->bi_rw = rw;
1261         bio->bi_private = peer_req;
1262         bio->bi_end_io = drbd_peer_request_endio;
1263
1264         bio->bi_next = bios;
1265         bios = bio;
1266         ++n_bios;
1267
1268         page_chain_for_each(page) {
1269                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1270                 if (!bio_add_page(bio, page, len, 0)) {
1271                         /* A single page must always be possible!
1272                          * But in case it fails anyways,
1273                          * we deal with it, and complain (below). */
1274                         if (bio->bi_vcnt == 0) {
1275                                 dev_err(DEV,
1276                                         "bio_add_page failed for len=%u, "
1277                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1278                                         len, (unsigned long long)bio->bi_sector);
1279                                 err = -ENOSPC;
1280                                 goto fail;
1281                         }
1282                         goto next_bio;
1283                 }
1284                 ds -= len;
1285                 sector += len >> 9;
1286                 --nr_pages;
1287         }
1288         D_ASSERT(page == NULL);
1289         D_ASSERT(ds == 0);
1290
1291         atomic_set(&peer_req->pending_bios, n_bios);
1292         do {
1293                 bio = bios;
1294                 bios = bios->bi_next;
1295                 bio->bi_next = NULL;
1296
1297                 drbd_generic_make_request(mdev, fault_type, bio);
1298         } while (bios);
1299         return 0;
1300
1301 fail:
1302         while (bios) {
1303                 bio = bios;
1304                 bios = bios->bi_next;
1305                 bio_put(bio);
1306         }
1307         return err;
1308 }
1309
1310 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1311                                              struct drbd_peer_request *peer_req)
1312 {
1313         struct drbd_interval *i = &peer_req->i;
1314
1315         drbd_remove_interval(&mdev->write_requests, i);
1316         drbd_clear_interval(i);
1317
1318         /* Wake up any processes waiting for this peer request to complete.  */
1319         if (i->waiting)
1320                 wake_up(&mdev->misc_wait);
1321 }
1322
1323 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1324 {
1325         struct drbd_conf *mdev;
1326         int rv;
1327         struct p_barrier *p = pi->data;
1328         struct drbd_epoch *epoch;
1329
1330         mdev = vnr_to_mdev(tconn, pi->vnr);
1331         if (!mdev)
1332                 return -EIO;
1333
1334         inc_unacked(mdev);
1335
1336         mdev->current_epoch->barrier_nr = p->barrier;
1337         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1338
1339         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1340          * the activity log, which means it would not be resynced in case the
1341          * R_PRIMARY crashes now.
1342          * Therefore we must send the barrier_ack after the barrier request was
1343          * completed. */
1344         switch (mdev->write_ordering) {
1345         case WO_none:
1346                 if (rv == FE_RECYCLED)
1347                         return 0;
1348
1349                 /* receiver context, in the writeout path of the other node.
1350                  * avoid potential distributed deadlock */
1351                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1352                 if (epoch)
1353                         break;
1354                 else
1355                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1356                         /* Fall through */
1357
1358         case WO_bdev_flush:
1359         case WO_drain_io:
1360                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1361                 drbd_flush(mdev);
1362
1363                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1364                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1365                         if (epoch)
1366                                 break;
1367                 }
1368
1369                 epoch = mdev->current_epoch;
1370                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1371
1372                 D_ASSERT(atomic_read(&epoch->active) == 0);
1373                 D_ASSERT(epoch->flags == 0);
1374
1375                 return 0;
1376         default:
1377                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1378                 return -EIO;
1379         }
1380
1381         epoch->flags = 0;
1382         atomic_set(&epoch->epoch_size, 0);
1383         atomic_set(&epoch->active, 0);
1384
1385         spin_lock(&mdev->epoch_lock);
1386         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1387                 list_add(&epoch->list, &mdev->current_epoch->list);
1388                 mdev->current_epoch = epoch;
1389                 mdev->epochs++;
1390         } else {
1391                 /* The current_epoch got recycled while we allocated this one... */
1392                 kfree(epoch);
1393         }
1394         spin_unlock(&mdev->epoch_lock);
1395
1396         return 0;
1397 }
1398
1399 /* used from receive_RSDataReply (recv_resync_read)
1400  * and from receive_Data */
1401 static struct drbd_peer_request *
1402 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1403               int data_size) __must_hold(local)
1404 {
1405         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1406         struct drbd_peer_request *peer_req;
1407         struct page *page;
1408         int dgs, ds, err;
1409         void *dig_in = mdev->tconn->int_dig_in;
1410         void *dig_vv = mdev->tconn->int_dig_vv;
1411         unsigned long *data;
1412
1413         dgs = 0;
1414         if (mdev->tconn->peer_integrity_tfm) {
1415                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1416                 /*
1417                  * FIXME: Receive the incoming digest into the receive buffer
1418                  *        here, together with its struct p_data?
1419                  */
1420                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1421                 if (err)
1422                         return NULL;
1423                 data_size -= dgs;
1424         }
1425
1426         if (!expect(data_size != 0))
1427                 return NULL;
1428         if (!expect(IS_ALIGNED(data_size, 512)))
1429                 return NULL;
1430         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1431                 return NULL;
1432
1433         /* even though we trust out peer,
1434          * we sometimes have to double check. */
1435         if (sector + (data_size>>9) > capacity) {
1436                 dev_err(DEV, "request from peer beyond end of local disk: "
1437                         "capacity: %llus < sector: %llus + size: %u\n",
1438                         (unsigned long long)capacity,
1439                         (unsigned long long)sector, data_size);
1440                 return NULL;
1441         }
1442
1443         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1444          * "criss-cross" setup, that might cause write-out on some other DRBD,
1445          * which in turn might block on the other node at this very place.  */
1446         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1447         if (!peer_req)
1448                 return NULL;
1449
1450         ds = data_size;
1451         page = peer_req->pages;
1452         page_chain_for_each(page) {
1453                 unsigned len = min_t(int, ds, PAGE_SIZE);
1454                 data = kmap(page);
1455                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1456                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1457                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1458                         data[0] = data[0] ^ (unsigned long)-1;
1459                 }
1460                 kunmap(page);
1461                 if (err) {
1462                         drbd_free_peer_req(mdev, peer_req);
1463                         return NULL;
1464                 }
1465                 ds -= len;
1466         }
1467
1468         if (dgs) {
1469                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1470                 if (memcmp(dig_in, dig_vv, dgs)) {
1471                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1472                                 (unsigned long long)sector, data_size);
1473                         drbd_free_peer_req(mdev, peer_req);
1474                         return NULL;
1475                 }
1476         }
1477         mdev->recv_cnt += data_size>>9;
1478         return peer_req;
1479 }
1480
1481 /* drbd_drain_block() just takes a data block
1482  * out of the socket input buffer, and discards it.
1483  */
1484 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1485 {
1486         struct page *page;
1487         int err = 0;
1488         void *data;
1489
1490         if (!data_size)
1491                 return 0;
1492
1493         page = drbd_alloc_pages(mdev, 1, 1);
1494
1495         data = kmap(page);
1496         while (data_size) {
1497                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1498
1499                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1500                 if (err)
1501                         break;
1502                 data_size -= len;
1503         }
1504         kunmap(page);
1505         drbd_free_pages(mdev, page, 0);
1506         return err;
1507 }
1508
1509 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1510                            sector_t sector, int data_size)
1511 {
1512         struct bio_vec *bvec;
1513         struct bio *bio;
1514         int dgs, err, i, expect;
1515         void *dig_in = mdev->tconn->int_dig_in;
1516         void *dig_vv = mdev->tconn->int_dig_vv;
1517
1518         dgs = 0;
1519         if (mdev->tconn->peer_integrity_tfm) {
1520                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1521                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1522                 if (err)
1523                         return err;
1524                 data_size -= dgs;
1525         }
1526
1527         /* optimistically update recv_cnt.  if receiving fails below,
1528          * we disconnect anyways, and counters will be reset. */
1529         mdev->recv_cnt += data_size>>9;
1530
1531         bio = req->master_bio;
1532         D_ASSERT(sector == bio->bi_sector);
1533
1534         bio_for_each_segment(bvec, bio, i) {
1535                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1536                 expect = min_t(int, data_size, bvec->bv_len);
1537                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1538                 kunmap(bvec->bv_page);
1539                 if (err)
1540                         return err;
1541                 data_size -= expect;
1542         }
1543
1544         if (dgs) {
1545                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1546                 if (memcmp(dig_in, dig_vv, dgs)) {
1547                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1548                         return -EINVAL;
1549                 }
1550         }
1551
1552         D_ASSERT(data_size == 0);
1553         return 0;
1554 }
1555
1556 /*
1557  * e_end_resync_block() is called in asender context via
1558  * drbd_finish_peer_reqs().
1559  */
1560 static int e_end_resync_block(struct drbd_work *w, int unused)
1561 {
1562         struct drbd_peer_request *peer_req =
1563                 container_of(w, struct drbd_peer_request, w);
1564         struct drbd_conf *mdev = w->mdev;
1565         sector_t sector = peer_req->i.sector;
1566         int err;
1567
1568         D_ASSERT(drbd_interval_empty(&peer_req->i));
1569
1570         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1571                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1572                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1573         } else {
1574                 /* Record failure to sync */
1575                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1576
1577                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1578         }
1579         dec_unacked(mdev);
1580
1581         return err;
1582 }
1583
1584 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1585 {
1586         struct drbd_peer_request *peer_req;
1587
1588         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1589         if (!peer_req)
1590                 goto fail;
1591
1592         dec_rs_pending(mdev);
1593
1594         inc_unacked(mdev);
1595         /* corresponding dec_unacked() in e_end_resync_block()
1596          * respective _drbd_clear_done_ee */
1597
1598         peer_req->w.cb = e_end_resync_block;
1599
1600         spin_lock_irq(&mdev->tconn->req_lock);
1601         list_add(&peer_req->w.list, &mdev->sync_ee);
1602         spin_unlock_irq(&mdev->tconn->req_lock);
1603
1604         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1605         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1606                 return 0;
1607
1608         /* don't care for the reason here */
1609         dev_err(DEV, "submit failed, triggering re-connect\n");
1610         spin_lock_irq(&mdev->tconn->req_lock);
1611         list_del(&peer_req->w.list);
1612         spin_unlock_irq(&mdev->tconn->req_lock);
1613
1614         drbd_free_peer_req(mdev, peer_req);
1615 fail:
1616         put_ldev(mdev);
1617         return -EIO;
1618 }
1619
1620 static struct drbd_request *
1621 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1622              sector_t sector, bool missing_ok, const char *func)
1623 {
1624         struct drbd_request *req;
1625
1626         /* Request object according to our peer */
1627         req = (struct drbd_request *)(unsigned long)id;
1628         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1629                 return req;
1630         if (!missing_ok) {
1631                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1632                         (unsigned long)id, (unsigned long long)sector);
1633         }
1634         return NULL;
1635 }
1636
1637 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1638 {
1639         struct drbd_conf *mdev;
1640         struct drbd_request *req;
1641         sector_t sector;
1642         int err;
1643         struct p_data *p = pi->data;
1644
1645         mdev = vnr_to_mdev(tconn, pi->vnr);
1646         if (!mdev)
1647                 return -EIO;
1648
1649         sector = be64_to_cpu(p->sector);
1650
1651         spin_lock_irq(&mdev->tconn->req_lock);
1652         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1653         spin_unlock_irq(&mdev->tconn->req_lock);
1654         if (unlikely(!req))
1655                 return -EIO;
1656
1657         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1658          * special casing it there for the various failure cases.
1659          * still no race with drbd_fail_pending_reads */
1660         err = recv_dless_read(mdev, req, sector, pi->size);
1661         if (!err)
1662                 req_mod(req, DATA_RECEIVED);
1663         /* else: nothing. handled from drbd_disconnect...
1664          * I don't think we may complete this just yet
1665          * in case we are "on-disconnect: freeze" */
1666
1667         return err;
1668 }
1669
1670 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1671 {
1672         struct drbd_conf *mdev;
1673         sector_t sector;
1674         int err;
1675         struct p_data *p = pi->data;
1676
1677         mdev = vnr_to_mdev(tconn, pi->vnr);
1678         if (!mdev)
1679                 return -EIO;
1680
1681         sector = be64_to_cpu(p->sector);
1682         D_ASSERT(p->block_id == ID_SYNCER);
1683
1684         if (get_ldev(mdev)) {
1685                 /* data is submitted to disk within recv_resync_read.
1686                  * corresponding put_ldev done below on error,
1687                  * or in drbd_peer_request_endio. */
1688                 err = recv_resync_read(mdev, sector, pi->size);
1689         } else {
1690                 if (__ratelimit(&drbd_ratelimit_state))
1691                         dev_err(DEV, "Can not write resync data to local disk.\n");
1692
1693                 err = drbd_drain_block(mdev, pi->size);
1694
1695                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1696         }
1697
1698         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1699
1700         return err;
1701 }
1702
1703 static int w_restart_write(struct drbd_work *w, int cancel)
1704 {
1705         struct drbd_request *req = container_of(w, struct drbd_request, w);
1706         struct drbd_conf *mdev = w->mdev;
1707         struct bio *bio;
1708         unsigned long start_time;
1709         unsigned long flags;
1710
1711         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1712         if (!expect(req->rq_state & RQ_POSTPONED)) {
1713                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1714                 return -EIO;
1715         }
1716         bio = req->master_bio;
1717         start_time = req->start_time;
1718         /* Postponed requests will not have their master_bio completed!  */
1719         __req_mod(req, DISCARD_WRITE, NULL);
1720         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1721
1722         while (__drbd_make_request(mdev, bio, start_time))
1723                 /* retry */ ;
1724         return 0;
1725 }
1726
1727 static void restart_conflicting_writes(struct drbd_conf *mdev,
1728                                        sector_t sector, int size)
1729 {
1730         struct drbd_interval *i;
1731         struct drbd_request *req;
1732
1733         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1734                 if (!i->local)
1735                         continue;
1736                 req = container_of(i, struct drbd_request, i);
1737                 if (req->rq_state & RQ_LOCAL_PENDING ||
1738                     !(req->rq_state & RQ_POSTPONED))
1739                         continue;
1740                 if (expect(list_empty(&req->w.list))) {
1741                         req->w.mdev = mdev;
1742                         req->w.cb = w_restart_write;
1743                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1744                 }
1745         }
1746 }
1747
1748 /*
1749  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1750  */
1751 static int e_end_block(struct drbd_work *w, int cancel)
1752 {
1753         struct drbd_peer_request *peer_req =
1754                 container_of(w, struct drbd_peer_request, w);
1755         struct drbd_conf *mdev = w->mdev;
1756         sector_t sector = peer_req->i.sector;
1757         int err = 0, pcmd;
1758
1759         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1760                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1761                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1762                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1763                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1764                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1765                         err = drbd_send_ack(mdev, pcmd, peer_req);
1766                         if (pcmd == P_RS_WRITE_ACK)
1767                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1768                 } else {
1769                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1770                         /* we expect it to be marked out of sync anyways...
1771                          * maybe assert this?  */
1772                 }
1773                 dec_unacked(mdev);
1774         }
1775         /* we delete from the conflict detection hash _after_ we sent out the
1776          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1777         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1778                 spin_lock_irq(&mdev->tconn->req_lock);
1779                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1780                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1781                 if (peer_req->flags & EE_RESTART_REQUESTS)
1782                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1783                 spin_unlock_irq(&mdev->tconn->req_lock);
1784         } else
1785                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1786
1787         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1788
1789         return err;
1790 }
1791
1792 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1793 {
1794         struct drbd_conf *mdev = w->mdev;
1795         struct drbd_peer_request *peer_req =
1796                 container_of(w, struct drbd_peer_request, w);
1797         int err;
1798
1799         err = drbd_send_ack(mdev, ack, peer_req);
1800         dec_unacked(mdev);
1801
1802         return err;
1803 }
1804
1805 static int e_send_discard_write(struct drbd_work *w, int unused)
1806 {
1807         return e_send_ack(w, P_DISCARD_WRITE);
1808 }
1809
1810 static int e_send_retry_write(struct drbd_work *w, int unused)
1811 {
1812         struct drbd_tconn *tconn = w->mdev->tconn;
1813
1814         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1815                              P_RETRY_WRITE : P_DISCARD_WRITE);
1816 }
1817
1818 static bool seq_greater(u32 a, u32 b)
1819 {
1820         /*
1821          * We assume 32-bit wrap-around here.
1822          * For 24-bit wrap-around, we would have to shift:
1823          *  a <<= 8; b <<= 8;
1824          */
1825         return (s32)a - (s32)b > 0;
1826 }
1827
1828 static u32 seq_max(u32 a, u32 b)
1829 {
1830         return seq_greater(a, b) ? a : b;
1831 }
1832
1833 static bool need_peer_seq(struct drbd_conf *mdev)
1834 {
1835         struct drbd_tconn *tconn = mdev->tconn;
1836         int tp;
1837
1838         /*
1839          * We only need to keep track of the last packet_seq number of our peer
1840          * if we are in dual-primary mode and we have the discard flag set; see
1841          * handle_write_conflicts().
1842          */
1843
1844         rcu_read_lock();
1845         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1846         rcu_read_unlock();
1847
1848         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1849 }
1850
1851 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1852 {
1853         unsigned int newest_peer_seq;
1854
1855         if (need_peer_seq(mdev)) {
1856                 spin_lock(&mdev->peer_seq_lock);
1857                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1858                 mdev->peer_seq = newest_peer_seq;
1859                 spin_unlock(&mdev->peer_seq_lock);
1860                 /* wake up only if we actually changed mdev->peer_seq */
1861                 if (peer_seq == newest_peer_seq)
1862                         wake_up(&mdev->seq_wait);
1863         }
1864 }
1865
1866 /* Called from receive_Data.
1867  * Synchronize packets on sock with packets on msock.
1868  *
1869  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1870  * packet traveling on msock, they are still processed in the order they have
1871  * been sent.
1872  *
1873  * Note: we don't care for Ack packets overtaking P_DATA packets.
1874  *
1875  * In case packet_seq is larger than mdev->peer_seq number, there are
1876  * outstanding packets on the msock. We wait for them to arrive.
1877  * In case we are the logically next packet, we update mdev->peer_seq
1878  * ourselves. Correctly handles 32bit wrap around.
1879  *
1880  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1881  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1882  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1883  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1884  *
1885  * returns 0 if we may process the packet,
1886  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1887 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1888 {
1889         DEFINE_WAIT(wait);
1890         long timeout;
1891         int ret;
1892
1893         if (!need_peer_seq(mdev))
1894                 return 0;
1895
1896         spin_lock(&mdev->peer_seq_lock);
1897         for (;;) {
1898                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1899                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1900                         ret = 0;
1901                         break;
1902                 }
1903                 if (signal_pending(current)) {
1904                         ret = -ERESTARTSYS;
1905                         break;
1906                 }
1907                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1908                 spin_unlock(&mdev->peer_seq_lock);
1909                 rcu_read_lock();
1910                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1911                 rcu_read_unlock();
1912                 timeout = schedule_timeout(timeout);
1913                 spin_lock(&mdev->peer_seq_lock);
1914                 if (!timeout) {
1915                         ret = -ETIMEDOUT;
1916                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1917                         break;
1918                 }
1919         }
1920         spin_unlock(&mdev->peer_seq_lock);
1921         finish_wait(&mdev->seq_wait, &wait);
1922         return ret;
1923 }
1924
1925 /* see also bio_flags_to_wire()
1926  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1927  * flags and back. We may replicate to other kernel versions. */
1928 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1929 {
1930         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1931                 (dpf & DP_FUA ? REQ_FUA : 0) |
1932                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1933                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1934 }
1935
1936 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1937                                     unsigned int size)
1938 {
1939         struct drbd_interval *i;
1940
1941     repeat:
1942         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1943                 struct drbd_request *req;
1944                 struct bio_and_error m;
1945
1946                 if (!i->local)
1947                         continue;
1948                 req = container_of(i, struct drbd_request, i);
1949                 if (!(req->rq_state & RQ_POSTPONED))
1950                         continue;
1951                 req->rq_state &= ~RQ_POSTPONED;
1952                 __req_mod(req, NEG_ACKED, &m);
1953                 spin_unlock_irq(&mdev->tconn->req_lock);
1954                 if (m.bio)
1955                         complete_master_bio(mdev, &m);
1956                 spin_lock_irq(&mdev->tconn->req_lock);
1957                 goto repeat;
1958         }
1959 }
1960
1961 static int handle_write_conflicts(struct drbd_conf *mdev,
1962                                   struct drbd_peer_request *peer_req)
1963 {
1964         struct drbd_tconn *tconn = mdev->tconn;
1965         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1966         sector_t sector = peer_req->i.sector;
1967         const unsigned int size = peer_req->i.size;
1968         struct drbd_interval *i;
1969         bool equal;
1970         int err;
1971
1972         /*
1973          * Inserting the peer request into the write_requests tree will prevent
1974          * new conflicting local requests from being added.
1975          */
1976         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1977
1978     repeat:
1979         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1980                 if (i == &peer_req->i)
1981                         continue;
1982
1983                 if (!i->local) {
1984                         /*
1985                          * Our peer has sent a conflicting remote request; this
1986                          * should not happen in a two-node setup.  Wait for the
1987                          * earlier peer request to complete.
1988                          */
1989                         err = drbd_wait_misc(mdev, i);
1990                         if (err)
1991                                 goto out;
1992                         goto repeat;
1993                 }
1994
1995                 equal = i->sector == sector && i->size == size;
1996                 if (resolve_conflicts) {
1997                         /*
1998                          * If the peer request is fully contained within the
1999                          * overlapping request, it can be discarded; otherwise,
2000                          * it will be retried once all overlapping requests
2001                          * have completed.
2002                          */
2003                         bool discard = i->sector <= sector && i->sector +
2004                                        (i->size >> 9) >= sector + (size >> 9);
2005
2006                         if (!equal)
2007                                 dev_alert(DEV, "Concurrent writes detected: "
2008                                                "local=%llus +%u, remote=%llus +%u, "
2009                                                "assuming %s came first\n",
2010                                           (unsigned long long)i->sector, i->size,
2011                                           (unsigned long long)sector, size,
2012                                           discard ? "local" : "remote");
2013
2014                         inc_unacked(mdev);
2015                         peer_req->w.cb = discard ? e_send_discard_write :
2016                                                    e_send_retry_write;
2017                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2018                         wake_asender(mdev->tconn);
2019
2020                         err = -ENOENT;
2021                         goto out;
2022                 } else {
2023                         struct drbd_request *req =
2024                                 container_of(i, struct drbd_request, i);
2025
2026                         if (!equal)
2027                                 dev_alert(DEV, "Concurrent writes detected: "
2028                                                "local=%llus +%u, remote=%llus +%u\n",
2029                                           (unsigned long long)i->sector, i->size,
2030                                           (unsigned long long)sector, size);
2031
2032                         if (req->rq_state & RQ_LOCAL_PENDING ||
2033                             !(req->rq_state & RQ_POSTPONED)) {
2034                                 /*
2035                                  * Wait for the node with the discard flag to
2036                                  * decide if this request will be discarded or
2037                                  * retried.  Requests that are discarded will
2038                                  * disappear from the write_requests tree.
2039                                  *
2040                                  * In addition, wait for the conflicting
2041                                  * request to finish locally before submitting
2042                                  * the conflicting peer request.
2043                                  */
2044                                 err = drbd_wait_misc(mdev, &req->i);
2045                                 if (err) {
2046                                         _conn_request_state(mdev->tconn,
2047                                                             NS(conn, C_TIMEOUT),
2048                                                             CS_HARD);
2049                                         fail_postponed_requests(mdev, sector, size);
2050                                         goto out;
2051                                 }
2052                                 goto repeat;
2053                         }
2054                         /*
2055                          * Remember to restart the conflicting requests after
2056                          * the new peer request has completed.
2057                          */
2058                         peer_req->flags |= EE_RESTART_REQUESTS;
2059                 }
2060         }
2061         err = 0;
2062
2063     out:
2064         if (err)
2065                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2066         return err;
2067 }
2068
2069 /* mirrored write */
2070 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2071 {
2072         struct drbd_conf *mdev;
2073         sector_t sector;
2074         struct drbd_peer_request *peer_req;
2075         struct p_data *p = pi->data;
2076         u32 peer_seq = be32_to_cpu(p->seq_num);
2077         int rw = WRITE;
2078         u32 dp_flags;
2079         int err, tp;
2080
2081         mdev = vnr_to_mdev(tconn, pi->vnr);
2082         if (!mdev)
2083                 return -EIO;
2084
2085         if (!get_ldev(mdev)) {
2086                 int err2;
2087
2088                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2089                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2090                 atomic_inc(&mdev->current_epoch->epoch_size);
2091                 err2 = drbd_drain_block(mdev, pi->size);
2092                 if (!err)
2093                         err = err2;
2094                 return err;
2095         }
2096
2097         /*
2098          * Corresponding put_ldev done either below (on various errors), or in
2099          * drbd_peer_request_endio, if we successfully submit the data at the
2100          * end of this function.
2101          */
2102
2103         sector = be64_to_cpu(p->sector);
2104         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2105         if (!peer_req) {
2106                 put_ldev(mdev);
2107                 return -EIO;
2108         }
2109
2110         peer_req->w.cb = e_end_block;
2111
2112         dp_flags = be32_to_cpu(p->dp_flags);
2113         rw |= wire_flags_to_bio(mdev, dp_flags);
2114
2115         if (dp_flags & DP_MAY_SET_IN_SYNC)
2116                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2117
2118         spin_lock(&mdev->epoch_lock);
2119         peer_req->epoch = mdev->current_epoch;
2120         atomic_inc(&peer_req->epoch->epoch_size);
2121         atomic_inc(&peer_req->epoch->active);
2122         spin_unlock(&mdev->epoch_lock);
2123
2124         rcu_read_lock();
2125         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2126         rcu_read_unlock();
2127         if (tp) {
2128                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2129                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2130                 if (err)
2131                         goto out_interrupted;
2132                 spin_lock_irq(&mdev->tconn->req_lock);
2133                 err = handle_write_conflicts(mdev, peer_req);
2134                 if (err) {
2135                         spin_unlock_irq(&mdev->tconn->req_lock);
2136                         if (err == -ENOENT) {
2137                                 put_ldev(mdev);
2138                                 return 0;
2139                         }
2140                         goto out_interrupted;
2141                 }
2142         } else
2143                 spin_lock_irq(&mdev->tconn->req_lock);
2144         list_add(&peer_req->w.list, &mdev->active_ee);
2145         spin_unlock_irq(&mdev->tconn->req_lock);
2146
2147         if (mdev->tconn->agreed_pro_version < 100) {
2148                 rcu_read_lock();
2149                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2150                 case DRBD_PROT_C:
2151                         dp_flags |= DP_SEND_WRITE_ACK;
2152                         break;
2153                 case DRBD_PROT_B:
2154                         dp_flags |= DP_SEND_RECEIVE_ACK;
2155                         break;
2156                 }
2157                 rcu_read_unlock();
2158         }
2159
2160         if (dp_flags & DP_SEND_WRITE_ACK) {
2161                 peer_req->flags |= EE_SEND_WRITE_ACK;
2162                 inc_unacked(mdev);
2163                 /* corresponding dec_unacked() in e_end_block()
2164                  * respective _drbd_clear_done_ee */
2165         }
2166
2167         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2168                 /* I really don't like it that the receiver thread
2169                  * sends on the msock, but anyways */
2170                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2171         }
2172
2173         if (mdev->state.pdsk < D_INCONSISTENT) {
2174                 /* In case we have the only disk of the cluster, */
2175                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2176                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2177                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2178                 drbd_al_begin_io(mdev, &peer_req->i);
2179         }
2180
2181         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2182         if (!err)
2183                 return 0;
2184
2185         /* don't care for the reason here */
2186         dev_err(DEV, "submit failed, triggering re-connect\n");
2187         spin_lock_irq(&mdev->tconn->req_lock);
2188         list_del(&peer_req->w.list);
2189         drbd_remove_epoch_entry_interval(mdev, peer_req);
2190         spin_unlock_irq(&mdev->tconn->req_lock);
2191         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2192                 drbd_al_complete_io(mdev, &peer_req->i);
2193
2194 out_interrupted:
2195         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2196         put_ldev(mdev);
2197         drbd_free_peer_req(mdev, peer_req);
2198         return err;
2199 }
2200
2201 /* We may throttle resync, if the lower device seems to be busy,
2202  * and current sync rate is above c_min_rate.
2203  *
2204  * To decide whether or not the lower device is busy, we use a scheme similar
2205  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2206  * (more than 64 sectors) of activity we cannot account for with our own resync
2207  * activity, it obviously is "busy".
2208  *
2209  * The current sync rate used here uses only the most recent two step marks,
2210  * to have a short time average so we can react faster.
2211  */
2212 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2213 {
2214         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2215         unsigned long db, dt, dbdt;
2216         struct lc_element *tmp;
2217         int curr_events;
2218         int throttle = 0;
2219         unsigned int c_min_rate;
2220
2221         rcu_read_lock();
2222         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2223         rcu_read_unlock();
2224
2225         /* feature disabled? */
2226         if (c_min_rate == 0)
2227                 return 0;
2228
2229         spin_lock_irq(&mdev->al_lock);
2230         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2231         if (tmp) {
2232                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2233                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2234                         spin_unlock_irq(&mdev->al_lock);
2235                         return 0;
2236                 }
2237                 /* Do not slow down if app IO is already waiting for this extent */
2238         }
2239         spin_unlock_irq(&mdev->al_lock);
2240
2241         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2242                       (int)part_stat_read(&disk->part0, sectors[1]) -
2243                         atomic_read(&mdev->rs_sect_ev);
2244
2245         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2246                 unsigned long rs_left;
2247                 int i;
2248
2249                 mdev->rs_last_events = curr_events;
2250
2251                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2252                  * approx. */
2253                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2254
2255                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2256                         rs_left = mdev->ov_left;
2257                 else
2258                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2259
2260                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2261                 if (!dt)
2262                         dt++;
2263                 db = mdev->rs_mark_left[i] - rs_left;
2264                 dbdt = Bit2KB(db/dt);
2265
2266                 if (dbdt > c_min_rate)
2267                         throttle = 1;
2268         }
2269         return throttle;
2270 }
2271
2272
2273 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2274 {
2275         struct drbd_conf *mdev;
2276         sector_t sector;
2277         sector_t capacity;
2278         struct drbd_peer_request *peer_req;
2279         struct digest_info *di = NULL;
2280         int size, verb;
2281         unsigned int fault_type;
2282         struct p_block_req *p = pi->data;
2283
2284         mdev = vnr_to_mdev(tconn, pi->vnr);
2285         if (!mdev)
2286                 return -EIO;
2287         capacity = drbd_get_capacity(mdev->this_bdev);
2288
2289         sector = be64_to_cpu(p->sector);
2290         size   = be32_to_cpu(p->blksize);
2291
2292         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2293                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2294                                 (unsigned long long)sector, size);
2295                 return -EINVAL;
2296         }
2297         if (sector + (size>>9) > capacity) {
2298                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2299                                 (unsigned long long)sector, size);
2300                 return -EINVAL;
2301         }
2302
2303         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2304                 verb = 1;
2305                 switch (pi->cmd) {
2306                 case P_DATA_REQUEST:
2307                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2308                         break;
2309                 case P_RS_DATA_REQUEST:
2310                 case P_CSUM_RS_REQUEST:
2311                 case P_OV_REQUEST:
2312                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2313                         break;
2314                 case P_OV_REPLY:
2315                         verb = 0;
2316                         dec_rs_pending(mdev);
2317                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2318                         break;
2319                 default:
2320                         BUG();
2321                 }
2322                 if (verb && __ratelimit(&drbd_ratelimit_state))
2323                         dev_err(DEV, "Can not satisfy peer's read request, "
2324                             "no local data.\n");
2325
2326                 /* drain possibly payload */
2327                 return drbd_drain_block(mdev, pi->size);
2328         }
2329
2330         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2331          * "criss-cross" setup, that might cause write-out on some other DRBD,
2332          * which in turn might block on the other node at this very place.  */
2333         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2334         if (!peer_req) {
2335                 put_ldev(mdev);
2336                 return -ENOMEM;
2337         }
2338
2339         switch (pi->cmd) {
2340         case P_DATA_REQUEST:
2341                 peer_req->w.cb = w_e_end_data_req;
2342                 fault_type = DRBD_FAULT_DT_RD;
2343                 /* application IO, don't drbd_rs_begin_io */
2344                 goto submit;
2345
2346         case P_RS_DATA_REQUEST:
2347                 peer_req->w.cb = w_e_end_rsdata_req;
2348                 fault_type = DRBD_FAULT_RS_RD;
2349                 /* used in the sector offset progress display */
2350                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2351                 break;
2352
2353         case P_OV_REPLY:
2354         case P_CSUM_RS_REQUEST:
2355                 fault_type = DRBD_FAULT_RS_RD;
2356                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2357                 if (!di)
2358                         goto out_free_e;
2359
2360                 di->digest_size = pi->size;
2361                 di->digest = (((char *)di)+sizeof(struct digest_info));
2362
2363                 peer_req->digest = di;
2364                 peer_req->flags |= EE_HAS_DIGEST;
2365
2366                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2367                         goto out_free_e;
2368
2369                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2370                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2371                         peer_req->w.cb = w_e_end_csum_rs_req;
2372                         /* used in the sector offset progress display */
2373                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2374                 } else if (pi->cmd == P_OV_REPLY) {
2375                         /* track progress, we may need to throttle */
2376                         atomic_add(size >> 9, &mdev->rs_sect_in);
2377                         peer_req->w.cb = w_e_end_ov_reply;
2378                         dec_rs_pending(mdev);
2379                         /* drbd_rs_begin_io done when we sent this request,
2380                          * but accounting still needs to be done. */
2381                         goto submit_for_resync;
2382                 }
2383                 break;
2384
2385         case P_OV_REQUEST:
2386                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2387                     mdev->tconn->agreed_pro_version >= 90) {
2388                         unsigned long now = jiffies;
2389                         int i;
2390                         mdev->ov_start_sector = sector;
2391                         mdev->ov_position = sector;
2392                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2393                         mdev->rs_total = mdev->ov_left;
2394                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2395                                 mdev->rs_mark_left[i] = mdev->ov_left;
2396                                 mdev->rs_mark_time[i] = now;
2397                         }
2398                         dev_info(DEV, "Online Verify start sector: %llu\n",
2399                                         (unsigned long long)sector);
2400                 }
2401                 peer_req->w.cb = w_e_end_ov_req;
2402                 fault_type = DRBD_FAULT_RS_RD;
2403                 break;
2404
2405         default:
2406                 BUG();
2407         }
2408
2409         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2410          * wrt the receiver, but it is not as straightforward as it may seem.
2411          * Various places in the resync start and stop logic assume resync
2412          * requests are processed in order, requeuing this on the worker thread
2413          * introduces a bunch of new code for synchronization between threads.
2414          *
2415          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2416          * "forever", throttling after drbd_rs_begin_io will lock that extent
2417          * for application writes for the same time.  For now, just throttle
2418          * here, where the rest of the code expects the receiver to sleep for
2419          * a while, anyways.
2420          */
2421
2422         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2423          * this defers syncer requests for some time, before letting at least
2424          * on request through.  The resync controller on the receiving side
2425          * will adapt to the incoming rate accordingly.
2426          *
2427          * We cannot throttle here if remote is Primary/SyncTarget:
2428          * we would also throttle its application reads.
2429          * In that case, throttling is done on the SyncTarget only.
2430          */
2431         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2432                 schedule_timeout_uninterruptible(HZ/10);
2433         if (drbd_rs_begin_io(mdev, sector))
2434                 goto out_free_e;
2435
2436 submit_for_resync:
2437         atomic_add(size >> 9, &mdev->rs_sect_ev);
2438
2439 submit:
2440         inc_unacked(mdev);
2441         spin_lock_irq(&mdev->tconn->req_lock);
2442         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2443         spin_unlock_irq(&mdev->tconn->req_lock);
2444
2445         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2446                 return 0;
2447
2448         /* don't care for the reason here */
2449         dev_err(DEV, "submit failed, triggering re-connect\n");
2450         spin_lock_irq(&mdev->tconn->req_lock);
2451         list_del(&peer_req->w.list);
2452         spin_unlock_irq(&mdev->tconn->req_lock);
2453         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2454
2455 out_free_e:
2456         put_ldev(mdev);
2457         drbd_free_peer_req(mdev, peer_req);
2458         return -EIO;
2459 }
2460
2461 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2462 {
2463         int self, peer, rv = -100;
2464         unsigned long ch_self, ch_peer;
2465         enum drbd_after_sb_p after_sb_0p;
2466
2467         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2468         peer = mdev->p_uuid[UI_BITMAP] & 1;
2469
2470         ch_peer = mdev->p_uuid[UI_SIZE];
2471         ch_self = mdev->comm_bm_set;
2472
2473         rcu_read_lock();
2474         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2475         rcu_read_unlock();
2476         switch (after_sb_0p) {
2477         case ASB_CONSENSUS:
2478         case ASB_DISCARD_SECONDARY:
2479         case ASB_CALL_HELPER:
2480         case ASB_VIOLENTLY:
2481                 dev_err(DEV, "Configuration error.\n");
2482                 break;
2483         case ASB_DISCONNECT:
2484                 break;
2485         case ASB_DISCARD_YOUNGER_PRI:
2486                 if (self == 0 && peer == 1) {
2487                         rv = -1;
2488                         break;
2489                 }
2490                 if (self == 1 && peer == 0) {
2491                         rv =  1;
2492                         break;
2493                 }
2494                 /* Else fall through to one of the other strategies... */
2495         case ASB_DISCARD_OLDER_PRI:
2496                 if (self == 0 && peer == 1) {
2497                         rv = 1;
2498                         break;
2499                 }
2500                 if (self == 1 && peer == 0) {
2501                         rv = -1;
2502                         break;
2503                 }
2504                 /* Else fall through to one of the other strategies... */
2505                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2506                      "Using discard-least-changes instead\n");
2507         case ASB_DISCARD_ZERO_CHG:
2508                 if (ch_peer == 0 && ch_self == 0) {
2509                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2510                                 ? -1 : 1;
2511                         break;
2512                 } else {
2513                         if (ch_peer == 0) { rv =  1; break; }
2514                         if (ch_self == 0) { rv = -1; break; }
2515                 }
2516                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2517                         break;
2518         case ASB_DISCARD_LEAST_CHG:
2519                 if      (ch_self < ch_peer)
2520                         rv = -1;
2521                 else if (ch_self > ch_peer)
2522                         rv =  1;
2523                 else /* ( ch_self == ch_peer ) */
2524                      /* Well, then use something else. */
2525                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2526                                 ? -1 : 1;
2527                 break;
2528         case ASB_DISCARD_LOCAL:
2529                 rv = -1;
2530                 break;
2531         case ASB_DISCARD_REMOTE:
2532                 rv =  1;
2533         }
2534
2535         return rv;
2536 }
2537
2538 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2539 {
2540         int hg, rv = -100;
2541         enum drbd_after_sb_p after_sb_1p;
2542
2543         rcu_read_lock();
2544         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2545         rcu_read_unlock();
2546         switch (after_sb_1p) {
2547         case ASB_DISCARD_YOUNGER_PRI:
2548         case ASB_DISCARD_OLDER_PRI:
2549         case ASB_DISCARD_LEAST_CHG:
2550         case ASB_DISCARD_LOCAL:
2551         case ASB_DISCARD_REMOTE:
2552         case ASB_DISCARD_ZERO_CHG:
2553                 dev_err(DEV, "Configuration error.\n");
2554                 break;
2555         case ASB_DISCONNECT:
2556                 break;
2557         case ASB_CONSENSUS:
2558                 hg = drbd_asb_recover_0p(mdev);
2559                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2560                         rv = hg;
2561                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2562                         rv = hg;
2563                 break;
2564         case ASB_VIOLENTLY:
2565                 rv = drbd_asb_recover_0p(mdev);
2566                 break;
2567         case ASB_DISCARD_SECONDARY:
2568                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2569         case ASB_CALL_HELPER:
2570                 hg = drbd_asb_recover_0p(mdev);
2571                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2572                         enum drbd_state_rv rv2;
2573
2574                         drbd_set_role(mdev, R_SECONDARY, 0);
2575                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2576                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2577                           * we do not need to wait for the after state change work either. */
2578                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2579                         if (rv2 != SS_SUCCESS) {
2580                                 drbd_khelper(mdev, "pri-lost-after-sb");
2581                         } else {
2582                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2583                                 rv = hg;
2584                         }
2585                 } else
2586                         rv = hg;
2587         }
2588
2589         return rv;
2590 }
2591
2592 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2593 {
2594         int hg, rv = -100;
2595         enum drbd_after_sb_p after_sb_2p;
2596
2597         rcu_read_lock();
2598         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2599         rcu_read_unlock();
2600         switch (after_sb_2p) {
2601         case ASB_DISCARD_YOUNGER_PRI:
2602         case ASB_DISCARD_OLDER_PRI:
2603         case ASB_DISCARD_LEAST_CHG:
2604         case ASB_DISCARD_LOCAL:
2605         case ASB_DISCARD_REMOTE:
2606         case ASB_CONSENSUS:
2607         case ASB_DISCARD_SECONDARY:
2608         case ASB_DISCARD_ZERO_CHG:
2609                 dev_err(DEV, "Configuration error.\n");
2610                 break;
2611         case ASB_VIOLENTLY:
2612                 rv = drbd_asb_recover_0p(mdev);
2613                 break;
2614         case ASB_DISCONNECT:
2615                 break;
2616         case ASB_CALL_HELPER:
2617                 hg = drbd_asb_recover_0p(mdev);
2618                 if (hg == -1) {
2619                         enum drbd_state_rv rv2;
2620
2621                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2622                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2623                           * we do not need to wait for the after state change work either. */
2624                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2625                         if (rv2 != SS_SUCCESS) {
2626                                 drbd_khelper(mdev, "pri-lost-after-sb");
2627                         } else {
2628                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2629                                 rv = hg;
2630                         }
2631                 } else
2632                         rv = hg;
2633         }
2634
2635         return rv;
2636 }
2637
2638 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2639                            u64 bits, u64 flags)
2640 {
2641         if (!uuid) {
2642                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2643                 return;
2644         }
2645         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2646              text,
2647              (unsigned long long)uuid[UI_CURRENT],
2648              (unsigned long long)uuid[UI_BITMAP],
2649              (unsigned long long)uuid[UI_HISTORY_START],
2650              (unsigned long long)uuid[UI_HISTORY_END],
2651              (unsigned long long)bits,
2652              (unsigned long long)flags);
2653 }
2654
2655 /*
2656   100   after split brain try auto recover
2657     2   C_SYNC_SOURCE set BitMap
2658     1   C_SYNC_SOURCE use BitMap
2659     0   no Sync
2660    -1   C_SYNC_TARGET use BitMap
2661    -2   C_SYNC_TARGET set BitMap
2662  -100   after split brain, disconnect
2663 -1000   unrelated data
2664 -1091   requires proto 91
2665 -1096   requires proto 96
2666  */
2667 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2668 {
2669         u64 self, peer;
2670         int i, j;
2671
2672         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2673         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2674
2675         *rule_nr = 10;
2676         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2677                 return 0;
2678
2679         *rule_nr = 20;
2680         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2681              peer != UUID_JUST_CREATED)
2682                 return -2;
2683
2684         *rule_nr = 30;
2685         if (self != UUID_JUST_CREATED &&
2686             (peer == UUID_JUST_CREATED || peer == (u64)0))
2687                 return 2;
2688
2689         if (self == peer) {
2690                 int rct, dc; /* roles at crash time */
2691
2692                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2693
2694                         if (mdev->tconn->agreed_pro_version < 91)
2695                                 return -1091;
2696
2697                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2698                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2699                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2700                                 drbd_uuid_set_bm(mdev, 0UL);
2701
2702                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2703                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2704                                 *rule_nr = 34;
2705                         } else {
2706                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2707                                 *rule_nr = 36;
2708                         }
2709
2710                         return 1;
2711                 }
2712
2713                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2714
2715                         if (mdev->tconn->agreed_pro_version < 91)
2716                                 return -1091;
2717
2718                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2719                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2720                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2721
2722                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2723                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2724                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2725
2726                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2727                                 *rule_nr = 35;
2728                         } else {
2729                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2730                                 *rule_nr = 37;
2731                         }
2732
2733                         return -1;
2734                 }
2735
2736                 /* Common power [off|failure] */
2737                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2738                         (mdev->p_uuid[UI_FLAGS] & 2);
2739                 /* lowest bit is set when we were primary,
2740                  * next bit (weight 2) is set when peer was primary */
2741                 *rule_nr = 40;
2742
2743                 switch (rct) {
2744                 case 0: /* !self_pri && !peer_pri */ return 0;
2745                 case 1: /*  self_pri && !peer_pri */ return 1;
2746                 case 2: /* !self_pri &&  peer_pri */ return -1;
2747                 case 3: /*  self_pri &&  peer_pri */
2748                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2749                         return dc ? -1 : 1;
2750                 }
2751         }
2752
2753         *rule_nr = 50;
2754         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2755         if (self == peer)
2756                 return -1;
2757
2758         *rule_nr = 51;
2759         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2760         if (self == peer) {
2761                 if (mdev->tconn->agreed_pro_version < 96 ?
2762                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2763                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2764                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2765                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2766                            resync as sync source modifications of the peer's UUIDs. */
2767
2768                         if (mdev->tconn->agreed_pro_version < 91)
2769                                 return -1091;
2770
2771                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2772                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2773
2774                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2775                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2776
2777                         return -1;
2778                 }
2779         }
2780
2781         *rule_nr = 60;
2782         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2783         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2784                 peer = mdev->p_uuid[i] & ~((u64)1);
2785                 if (self == peer)
2786                         return -2;
2787         }
2788
2789         *rule_nr = 70;
2790         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2791         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2792         if (self == peer)
2793                 return 1;
2794
2795         *rule_nr = 71;
2796         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2797         if (self == peer) {
2798                 if (mdev->tconn->agreed_pro_version < 96 ?
2799                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2800                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2801                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2802                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2803                            resync as sync source modifications of our UUIDs. */
2804
2805                         if (mdev->tconn->agreed_pro_version < 91)
2806                                 return -1091;
2807
2808                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2809                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2810
2811                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2812                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2813                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2814
2815                         return 1;
2816                 }
2817         }
2818
2819
2820         *rule_nr = 80;
2821         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2822         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2823                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2824                 if (self == peer)
2825                         return 2;
2826         }
2827
2828         *rule_nr = 90;
2829         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2830         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2831         if (self == peer && self != ((u64)0))
2832                 return 100;
2833
2834         *rule_nr = 100;
2835         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2836                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2837                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2838                         peer = mdev->p_uuid[j] & ~((u64)1);
2839                         if (self == peer)
2840                                 return -100;
2841                 }
2842         }
2843
2844         return -1000;
2845 }
2846
2847 /* drbd_sync_handshake() returns the new conn state on success, or
2848    CONN_MASK (-1) on failure.
2849  */
2850 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2851                                            enum drbd_disk_state peer_disk) __must_hold(local)
2852 {
2853         enum drbd_conns rv = C_MASK;
2854         enum drbd_disk_state mydisk;
2855         struct net_conf *nc;
2856         int hg, rule_nr, rr_conflict, tentative;
2857
2858         mydisk = mdev->state.disk;
2859         if (mydisk == D_NEGOTIATING)
2860                 mydisk = mdev->new_state_tmp.disk;
2861
2862         dev_info(DEV, "drbd_sync_handshake:\n");
2863         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2864         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2865                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2866
2867         hg = drbd_uuid_compare(mdev, &rule_nr);
2868
2869         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2870
2871         if (hg == -1000) {
2872                 dev_alert(DEV, "Unrelated data, aborting!\n");
2873                 return C_MASK;
2874         }
2875         if (hg < -1000) {
2876                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2877                 return C_MASK;
2878         }
2879
2880         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2881             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2882                 int f = (hg == -100) || abs(hg) == 2;
2883                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2884                 if (f)
2885                         hg = hg*2;
2886                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2887                      hg > 0 ? "source" : "target");
2888         }
2889
2890         if (abs(hg) == 100)
2891                 drbd_khelper(mdev, "initial-split-brain");
2892
2893         rcu_read_lock();
2894         nc = rcu_dereference(mdev->tconn->net_conf);
2895
2896         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2897                 int pcount = (mdev->state.role == R_PRIMARY)
2898                            + (peer_role == R_PRIMARY);
2899                 int forced = (hg == -100);
2900
2901                 switch (pcount) {
2902                 case 0:
2903                         hg = drbd_asb_recover_0p(mdev);
2904                         break;
2905                 case 1:
2906                         hg = drbd_asb_recover_1p(mdev);
2907                         break;
2908                 case 2:
2909                         hg = drbd_asb_recover_2p(mdev);
2910                         break;
2911                 }
2912                 if (abs(hg) < 100) {
2913                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2914                              "automatically solved. Sync from %s node\n",
2915                              pcount, (hg < 0) ? "peer" : "this");
2916                         if (forced) {
2917                                 dev_warn(DEV, "Doing a full sync, since"
2918                                      " UUIDs where ambiguous.\n");
2919                                 hg = hg*2;
2920                         }
2921                 }
2922         }
2923
2924         if (hg == -100) {
2925                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2926                         hg = -1;
2927                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2928                         hg = 1;
2929
2930                 if (abs(hg) < 100)
2931                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2932                              "Sync from %s node\n",
2933                              (hg < 0) ? "peer" : "this");
2934         }
2935         rr_conflict = nc->rr_conflict;
2936         tentative = nc->tentative;
2937         rcu_read_unlock();
2938
2939         if (hg == -100) {
2940                 /* FIXME this log message is not correct if we end up here
2941                  * after an attempted attach on a diskless node.
2942                  * We just refuse to attach -- well, we drop the "connection"
2943                  * to that disk, in a way... */
2944                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2945                 drbd_khelper(mdev, "split-brain");
2946                 return C_MASK;
2947         }
2948
2949         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2950                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2951                 return C_MASK;
2952         }
2953
2954         if (hg < 0 && /* by intention we do not use mydisk here. */
2955             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2956                 switch (rr_conflict) {
2957                 case ASB_CALL_HELPER:
2958                         drbd_khelper(mdev, "pri-lost");
2959                         /* fall through */
2960                 case ASB_DISCONNECT:
2961                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2962                         return C_MASK;
2963                 case ASB_VIOLENTLY:
2964                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2965                              "assumption\n");
2966                 }
2967         }
2968
2969         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2970                 if (hg == 0)
2971                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2972                 else
2973                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2974                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2975                                  abs(hg) >= 2 ? "full" : "bit-map based");
2976                 return C_MASK;
2977         }
2978
2979         if (abs(hg) >= 2) {
2980                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2981                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2982                                         BM_LOCKED_SET_ALLOWED))
2983                         return C_MASK;
2984         }
2985
2986         if (hg > 0) { /* become sync source. */
2987                 rv = C_WF_BITMAP_S;
2988         } else if (hg < 0) { /* become sync target */
2989                 rv = C_WF_BITMAP_T;
2990         } else {
2991                 rv = C_CONNECTED;
2992                 if (drbd_bm_total_weight(mdev)) {
2993                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2994                              drbd_bm_total_weight(mdev));
2995                 }
2996         }
2997
2998         return rv;
2999 }
3000
3001 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3002 {
3003         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3004         if (peer == ASB_DISCARD_REMOTE)
3005                 return ASB_DISCARD_LOCAL;
3006
3007         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3008         if (peer == ASB_DISCARD_LOCAL)
3009                 return ASB_DISCARD_REMOTE;
3010
3011         /* everything else is valid if they are equal on both sides. */
3012         return peer;
3013 }
3014
3015 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3016 {
3017         struct p_protocol *p = pi->data;
3018         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3019         int p_proto, p_discard_my_data, p_two_primaries, cf;
3020         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3021         char integrity_alg[SHARED_SECRET_MAX] = "";
3022         struct crypto_hash *peer_integrity_tfm = NULL;
3023         void *int_dig_in = NULL, *int_dig_vv = NULL;
3024
3025         p_proto         = be32_to_cpu(p->protocol);
3026         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3027         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3028         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3029         p_two_primaries = be32_to_cpu(p->two_primaries);
3030         cf              = be32_to_cpu(p->conn_flags);
3031         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3032
3033         if (tconn->agreed_pro_version >= 87) {
3034                 int err;
3035
3036                 if (pi->size > sizeof(integrity_alg))
3037                         return -EIO;
3038                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3039                 if (err)
3040                         return err;
3041                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3042         }
3043
3044         if (pi->cmd != P_PROTOCOL_UPDATE) {
3045                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3046
3047                 if (cf & CF_DRY_RUN)
3048                         set_bit(CONN_DRY_RUN, &tconn->flags);
3049
3050                 rcu_read_lock();
3051                 nc = rcu_dereference(tconn->net_conf);
3052
3053                 if (p_proto != nc->wire_protocol) {
3054                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3055                         goto disconnect_rcu_unlock;
3056                 }
3057
3058                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3059                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3060                         goto disconnect_rcu_unlock;
3061                 }
3062
3063                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3064                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3065                         goto disconnect_rcu_unlock;
3066                 }
3067
3068                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3069                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3070                         goto disconnect_rcu_unlock;
3071                 }
3072
3073                 if (p_discard_my_data && nc->discard_my_data) {
3074                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3075                         goto disconnect_rcu_unlock;
3076                 }
3077
3078                 if (p_two_primaries != nc->two_primaries) {
3079                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3080                         goto disconnect_rcu_unlock;
3081                 }
3082
3083                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3084                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3085                         goto disconnect_rcu_unlock;
3086                 }
3087
3088                 rcu_read_unlock();
3089         }
3090
3091         if (integrity_alg[0]) {
3092                 int hash_size;
3093
3094                 /*
3095                  * We can only change the peer data integrity algorithm
3096                  * here.  Changing our own data integrity algorithm
3097                  * requires that we send a P_PROTOCOL_UPDATE packet at
3098                  * the same time; otherwise, the peer has no way to
3099                  * tell between which packets the algorithm should
3100                  * change.
3101                  */
3102
3103                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3104                 if (!peer_integrity_tfm) {
3105                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3106                                  integrity_alg);
3107                         goto disconnect;
3108                 }
3109
3110                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3111                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3112                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3113                 if (!(int_dig_in && int_dig_vv)) {
3114                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3115                         goto disconnect;
3116                 }
3117         }
3118
3119         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3120         if (!new_net_conf) {
3121                 conn_err(tconn, "Allocation of new net_conf failed\n");
3122                 goto disconnect;
3123         }
3124
3125         mutex_lock(&tconn->data.mutex);
3126         mutex_lock(&tconn->conf_update);
3127         old_net_conf = tconn->net_conf;
3128         *new_net_conf = *old_net_conf;
3129
3130         new_net_conf->wire_protocol = p_proto;
3131         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3132         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3133         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3134         new_net_conf->two_primaries = p_two_primaries;
3135
3136         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3137         mutex_unlock(&tconn->conf_update);
3138         mutex_unlock(&tconn->data.mutex);
3139
3140         crypto_free_hash(tconn->peer_integrity_tfm);
3141         kfree(tconn->int_dig_in);
3142         kfree(tconn->int_dig_vv);
3143         tconn->peer_integrity_tfm = peer_integrity_tfm;
3144         tconn->int_dig_in = int_dig_in;
3145         tconn->int_dig_vv = int_dig_vv;
3146
3147         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3148                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3149                           integrity_alg[0] ? integrity_alg : "(none)");
3150
3151         synchronize_rcu();
3152         kfree(old_net_conf);
3153         return 0;
3154
3155 disconnect_rcu_unlock:
3156         rcu_read_unlock();
3157 disconnect:
3158         crypto_free_hash(peer_integrity_tfm);
3159         kfree(int_dig_in);
3160         kfree(int_dig_vv);
3161         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3162         return -EIO;
3163 }
3164
3165 /* helper function
3166  * input: alg name, feature name
3167  * return: NULL (alg name was "")
3168  *         ERR_PTR(error) if something goes wrong
3169  *         or the crypto hash ptr, if it worked out ok. */
3170 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3171                 const char *alg, const char *name)
3172 {
3173         struct crypto_hash *tfm;
3174
3175         if (!alg[0])
3176                 return NULL;
3177
3178         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3179         if (IS_ERR(tfm)) {
3180                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3181                         alg, name, PTR_ERR(tfm));
3182                 return tfm;
3183         }
3184         return tfm;
3185 }
3186
3187 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3188 {
3189         void *buffer = tconn->data.rbuf;
3190         int size = pi->size;
3191
3192         while (size) {
3193                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3194                 s = drbd_recv(tconn, buffer, s);
3195                 if (s <= 0) {
3196                         if (s < 0)
3197                                 return s;
3198                         break;
3199                 }
3200                 size -= s;
3201         }
3202         if (size)
3203                 return -EIO;
3204         return 0;
3205 }
3206
3207 /*
3208  * config_unknown_volume  -  device configuration command for unknown volume
3209  *
3210  * When a device is added to an existing connection, the node on which the
3211  * device is added first will send configuration commands to its peer but the
3212  * peer will not know about the device yet.  It will warn and ignore these
3213  * commands.  Once the device is added on the second node, the second node will
3214  * send the same device configuration commands, but in the other direction.
3215  *
3216  * (We can also end up here if drbd is misconfigured.)
3217  */
3218 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3219 {
3220         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3221                   cmdname(pi->cmd), pi->vnr);
3222         return ignore_remaining_packet(tconn, pi);
3223 }
3224
3225 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3226 {
3227         struct drbd_conf *mdev;
3228         struct p_rs_param_95 *p;
3229         unsigned int header_size, data_size, exp_max_sz;
3230         struct crypto_hash *verify_tfm = NULL;
3231         struct crypto_hash *csums_tfm = NULL;
3232         struct net_conf *old_net_conf, *new_net_conf = NULL;
3233         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3234         const int apv = tconn->agreed_pro_version;
3235         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3236         int fifo_size = 0;
3237         int err;
3238
3239         mdev = vnr_to_mdev(tconn, pi->vnr);
3240         if (!mdev)
3241                 return config_unknown_volume(tconn, pi);
3242
3243         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3244                     : apv == 88 ? sizeof(struct p_rs_param)
3245                                         + SHARED_SECRET_MAX
3246                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3247                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3248
3249         if (pi->size > exp_max_sz) {
3250                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3251                     pi->size, exp_max_sz);
3252                 return -EIO;
3253         }
3254
3255         if (apv <= 88) {
3256                 header_size = sizeof(struct p_rs_param);
3257                 data_size = pi->size - header_size;
3258         } else if (apv <= 94) {
3259                 header_size = sizeof(struct p_rs_param_89);
3260                 data_size = pi->size - header_size;
3261                 D_ASSERT(data_size == 0);
3262         } else {
3263                 header_size = sizeof(struct p_rs_param_95);
3264                 data_size = pi->size - header_size;
3265                 D_ASSERT(data_size == 0);
3266         }
3267
3268         /* initialize verify_alg and csums_alg */
3269         p = pi->data;
3270         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3271
3272         err = drbd_recv_all(mdev->tconn, p, header_size);
3273         if (err)
3274                 return err;
3275
3276         mutex_lock(&mdev->tconn->conf_update);
3277         old_net_conf = mdev->tconn->net_conf;
3278         if (get_ldev(mdev)) {
3279                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3280                 if (!new_disk_conf) {
3281                         put_ldev(mdev);
3282                         mutex_unlock(&mdev->tconn->conf_update);
3283                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3284                         return -ENOMEM;
3285                 }
3286
3287                 old_disk_conf = mdev->ldev->disk_conf;
3288                 *new_disk_conf = *old_disk_conf;
3289
3290                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3291         }
3292
3293         if (apv >= 88) {
3294                 if (apv == 88) {
3295                         if (data_size > SHARED_SECRET_MAX) {
3296                                 dev_err(DEV, "verify-alg too long, "
3297                                     "peer wants %u, accepting only %u byte\n",
3298                                                 data_size, SHARED_SECRET_MAX);
3299                                 err = -EIO;
3300                                 goto reconnect;
3301                         }
3302
3303                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3304                         if (err)
3305                                 goto reconnect;
3306                         /* we expect NUL terminated string */
3307                         /* but just in case someone tries to be evil */
3308                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3309                         p->verify_alg[data_size-1] = 0;
3310
3311                 } else /* apv >= 89 */ {
3312                         /* we still expect NUL terminated strings */
3313                         /* but just in case someone tries to be evil */
3314                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3315                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3316                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3317                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3318                 }
3319
3320                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3321                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3322                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3323                                     old_net_conf->verify_alg, p->verify_alg);
3324                                 goto disconnect;
3325                         }
3326                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3327                                         p->verify_alg, "verify-alg");
3328                         if (IS_ERR(verify_tfm)) {
3329                                 verify_tfm = NULL;
3330                                 goto disconnect;
3331                         }
3332                 }
3333
3334                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3335                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3336                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3337                                     old_net_conf->csums_alg, p->csums_alg);
3338                                 goto disconnect;
3339                         }
3340                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3341                                         p->csums_alg, "csums-alg");
3342                         if (IS_ERR(csums_tfm)) {
3343                                 csums_tfm = NULL;
3344                                 goto disconnect;
3345                         }
3346                 }
3347
3348                 if (apv > 94 && new_disk_conf) {
3349                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3350                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3351                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3352                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3353
3354                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3355                         if (fifo_size != mdev->rs_plan_s->size) {
3356                                 new_plan = fifo_alloc(fifo_size);
3357                                 if (!new_plan) {
3358                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3359                                         put_ldev(mdev);
3360                                         goto disconnect;
3361                                 }
3362                         }
3363                 }
3364
3365                 if (verify_tfm || csums_tfm) {
3366                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3367                         if (!new_net_conf) {
3368                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3369                                 goto disconnect;
3370                         }
3371
3372                         *new_net_conf = *old_net_conf;
3373
3374                         if (verify_tfm) {
3375                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3376                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3377                                 crypto_free_hash(mdev->tconn->verify_tfm);
3378                                 mdev->tconn->verify_tfm = verify_tfm;
3379                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3380                         }
3381                         if (csums_tfm) {
3382                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3383                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3384                                 crypto_free_hash(mdev->tconn->csums_tfm);
3385                                 mdev->tconn->csums_tfm = csums_tfm;
3386                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3387                         }
3388                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3389                 }
3390         }
3391
3392         if (new_disk_conf) {
3393                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3394                 put_ldev(mdev);
3395         }
3396
3397         if (new_plan) {
3398                 old_plan = mdev->rs_plan_s;
3399                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3400         }
3401
3402         mutex_unlock(&mdev->tconn->conf_update);
3403         synchronize_rcu();
3404         if (new_net_conf)
3405                 kfree(old_net_conf);
3406         kfree(old_disk_conf);
3407         kfree(old_plan);
3408
3409         return 0;
3410
3411 reconnect:
3412         if (new_disk_conf) {
3413                 put_ldev(mdev);
3414                 kfree(new_disk_conf);
3415         }
3416         mutex_unlock(&mdev->tconn->conf_update);
3417         return -EIO;
3418
3419 disconnect:
3420         kfree(new_plan);
3421         if (new_disk_conf) {
3422                 put_ldev(mdev);
3423                 kfree(new_disk_conf);
3424         }
3425         mutex_unlock(&mdev->tconn->conf_update);
3426         /* just for completeness: actually not needed,
3427          * as this is not reached if csums_tfm was ok. */
3428         crypto_free_hash(csums_tfm);
3429         /* but free the verify_tfm again, if csums_tfm did not work out */
3430         crypto_free_hash(verify_tfm);
3431         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3432         return -EIO;
3433 }
3434
3435 /* warn if the arguments differ by more than 12.5% */
3436 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3437         const char *s, sector_t a, sector_t b)
3438 {
3439         sector_t d;
3440         if (a == 0 || b == 0)
3441                 return;
3442         d = (a > b) ? (a - b) : (b - a);
3443         if (d > (a>>3) || d > (b>>3))
3444                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3445                      (unsigned long long)a, (unsigned long long)b);
3446 }
3447
3448 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3449 {
3450         struct drbd_conf *mdev;
3451         struct p_sizes *p = pi->data;
3452         enum determine_dev_size dd = unchanged;
3453         sector_t p_size, p_usize, my_usize;
3454         int ldsc = 0; /* local disk size changed */
3455         enum dds_flags ddsf;
3456
3457         mdev = vnr_to_mdev(tconn, pi->vnr);
3458         if (!mdev)
3459                 return config_unknown_volume(tconn, pi);
3460
3461         p_size = be64_to_cpu(p->d_size);
3462         p_usize = be64_to_cpu(p->u_size);
3463
3464         /* just store the peer's disk size for now.
3465          * we still need to figure out whether we accept that. */
3466         mdev->p_size = p_size;
3467
3468         if (get_ldev(mdev)) {
3469                 rcu_read_lock();
3470                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3471                 rcu_read_unlock();
3472
3473                 warn_if_differ_considerably(mdev, "lower level device sizes",
3474                            p_size, drbd_get_max_capacity(mdev->ldev));
3475                 warn_if_differ_considerably(mdev, "user requested size",
3476                                             p_usize, my_usize);
3477
3478                 /* if this is the first connect, or an otherwise expected
3479                  * param exchange, choose the minimum */
3480                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3481                         p_usize = min_not_zero(my_usize, p_usize);
3482
3483                 /* Never shrink a device with usable data during connect.
3484                    But allow online shrinking if we are connected. */
3485                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3486                     drbd_get_capacity(mdev->this_bdev) &&
3487                     mdev->state.disk >= D_OUTDATED &&
3488                     mdev->state.conn < C_CONNECTED) {
3489                         dev_err(DEV, "The peer's disk size is too small!\n");
3490                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3491                         put_ldev(mdev);
3492                         return -EIO;
3493                 }
3494
3495                 if (my_usize != p_usize) {
3496                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3497
3498                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3499                         if (!new_disk_conf) {
3500                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3501                                 put_ldev(mdev);
3502                                 return -ENOMEM;
3503                         }
3504
3505                         mutex_lock(&mdev->tconn->conf_update);
3506                         old_disk_conf = mdev->ldev->disk_conf;
3507                         *new_disk_conf = *old_disk_conf;
3508                         new_disk_conf->disk_size = p_usize;
3509
3510                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3511                         mutex_unlock(&mdev->tconn->conf_update);
3512                         synchronize_rcu();
3513                         kfree(old_disk_conf);
3514
3515                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3516                                  (unsigned long)my_usize);
3517                 }
3518
3519                 put_ldev(mdev);
3520         }
3521
3522         ddsf = be16_to_cpu(p->dds_flags);
3523         if (get_ldev(mdev)) {
3524                 dd = drbd_determine_dev_size(mdev, ddsf);
3525                 put_ldev(mdev);
3526                 if (dd == dev_size_error)
3527                         return -EIO;
3528                 drbd_md_sync(mdev);
3529         } else {
3530                 /* I am diskless, need to accept the peer's size. */
3531                 drbd_set_my_capacity(mdev, p_size);
3532         }
3533
3534         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3535         drbd_reconsider_max_bio_size(mdev);
3536
3537         if (get_ldev(mdev)) {
3538                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3539                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3540                         ldsc = 1;
3541                 }
3542
3543                 put_ldev(mdev);
3544         }
3545
3546         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3547                 if (be64_to_cpu(p->c_size) !=
3548                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3549                         /* we have different sizes, probably peer
3550                          * needs to know my new size... */
3551                         drbd_send_sizes(mdev, 0, ddsf);
3552                 }
3553                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3554                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3555                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3556                             mdev->state.disk >= D_INCONSISTENT) {
3557                                 if (ddsf & DDSF_NO_RESYNC)
3558                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3559                                 else
3560                                         resync_after_online_grow(mdev);
3561                         } else
3562                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3563                 }
3564         }
3565
3566         return 0;
3567 }
3568
3569 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3570 {
3571         struct drbd_conf *mdev;
3572         struct p_uuids *p = pi->data;
3573         u64 *p_uuid;
3574         int i, updated_uuids = 0;
3575
3576         mdev = vnr_to_mdev(tconn, pi->vnr);
3577         if (!mdev)
3578                 return config_unknown_volume(tconn, pi);
3579
3580         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3581
3582         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3583                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3584
3585         kfree(mdev->p_uuid);
3586         mdev->p_uuid = p_uuid;
3587
3588         if (mdev->state.conn < C_CONNECTED &&
3589             mdev->state.disk < D_INCONSISTENT &&
3590             mdev->state.role == R_PRIMARY &&
3591             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3592                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3593                     (unsigned long long)mdev->ed_uuid);
3594                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3595                 return -EIO;
3596         }
3597
3598         if (get_ldev(mdev)) {
3599                 int skip_initial_sync =
3600                         mdev->state.conn == C_CONNECTED &&
3601                         mdev->tconn->agreed_pro_version >= 90 &&
3602                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3603                         (p_uuid[UI_FLAGS] & 8);
3604                 if (skip_initial_sync) {
3605                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3606                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3607                                         "clear_n_write from receive_uuids",
3608                                         BM_LOCKED_TEST_ALLOWED);
3609                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3610                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3611                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3612                                         CS_VERBOSE, NULL);
3613                         drbd_md_sync(mdev);
3614                         updated_uuids = 1;
3615                 }
3616                 put_ldev(mdev);
3617         } else if (mdev->state.disk < D_INCONSISTENT &&
3618                    mdev->state.role == R_PRIMARY) {
3619                 /* I am a diskless primary, the peer just created a new current UUID
3620                    for me. */
3621                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3622         }
3623
3624         /* Before we test for the disk state, we should wait until an eventually
3625            ongoing cluster wide state change is finished. That is important if
3626            we are primary and are detaching from our disk. We need to see the
3627            new disk state... */
3628         mutex_lock(mdev->state_mutex);
3629         mutex_unlock(mdev->state_mutex);
3630         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3631                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3632
3633         if (updated_uuids)
3634                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3635
3636         return 0;
3637 }
3638
3639 /**
3640  * convert_state() - Converts the peer's view of the cluster state to our point of view
3641  * @ps:         The state as seen by the peer.
3642  */
3643 static union drbd_state convert_state(union drbd_state ps)
3644 {
3645         union drbd_state ms;
3646
3647         static enum drbd_conns c_tab[] = {
3648                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3649                 [C_CONNECTED] = C_CONNECTED,
3650
3651                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3652                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3653                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3654                 [C_VERIFY_S]       = C_VERIFY_T,
3655                 [C_MASK]   = C_MASK,
3656         };
3657
3658         ms.i = ps.i;
3659
3660         ms.conn = c_tab[ps.conn];
3661         ms.peer = ps.role;
3662         ms.role = ps.peer;
3663         ms.pdsk = ps.disk;
3664         ms.disk = ps.pdsk;
3665         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3666
3667         return ms;
3668 }
3669
3670 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3671 {
3672         struct drbd_conf *mdev;
3673         struct p_req_state *p = pi->data;
3674         union drbd_state mask, val;
3675         enum drbd_state_rv rv;
3676
3677         mdev = vnr_to_mdev(tconn, pi->vnr);
3678         if (!mdev)
3679                 return -EIO;
3680
3681         mask.i = be32_to_cpu(p->mask);
3682         val.i = be32_to_cpu(p->val);
3683
3684         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3685             mutex_is_locked(mdev->state_mutex)) {
3686                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3687                 return 0;
3688         }
3689
3690         mask = convert_state(mask);
3691         val = convert_state(val);
3692
3693         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3694         drbd_send_sr_reply(mdev, rv);
3695
3696         drbd_md_sync(mdev);
3697
3698         return 0;
3699 }
3700
3701 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3702 {
3703         struct p_req_state *p = pi->data;
3704         union drbd_state mask, val;
3705         enum drbd_state_rv rv;
3706
3707         mask.i = be32_to_cpu(p->mask);
3708         val.i = be32_to_cpu(p->val);
3709
3710         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3711             mutex_is_locked(&tconn->cstate_mutex)) {
3712                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3713                 return 0;
3714         }
3715
3716         mask = convert_state(mask);
3717         val = convert_state(val);
3718
3719         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3720         conn_send_sr_reply(tconn, rv);
3721
3722         return 0;
3723 }
3724
3725 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3726 {
3727         struct drbd_conf *mdev;
3728         struct p_state *p = pi->data;
3729         union drbd_state os, ns, peer_state;
3730         enum drbd_disk_state real_peer_disk;
3731         enum chg_state_flags cs_flags;
3732         int rv;
3733
3734         mdev = vnr_to_mdev(tconn, pi->vnr);
3735         if (!mdev)
3736                 return config_unknown_volume(tconn, pi);
3737
3738         peer_state.i = be32_to_cpu(p->state);
3739
3740         real_peer_disk = peer_state.disk;
3741         if (peer_state.disk == D_NEGOTIATING) {
3742                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3743                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3744         }
3745
3746         spin_lock_irq(&mdev->tconn->req_lock);
3747  retry:
3748         os = ns = drbd_read_state(mdev);
3749         spin_unlock_irq(&mdev->tconn->req_lock);
3750
3751         /* If this is the "end of sync" confirmation, usually the peer disk
3752          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3753          * set) resync started in PausedSyncT, or if the timing of pause-/
3754          * unpause-sync events has been "just right", the peer disk may
3755          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3756          */
3757         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3758             real_peer_disk == D_UP_TO_DATE &&
3759             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3760                 /* If we are (becoming) SyncSource, but peer is still in sync
3761                  * preparation, ignore its uptodate-ness to avoid flapping, it
3762                  * will change to inconsistent once the peer reaches active
3763                  * syncing states.
3764                  * It may have changed syncer-paused flags, however, so we
3765                  * cannot ignore this completely. */
3766                 if (peer_state.conn > C_CONNECTED &&
3767                     peer_state.conn < C_SYNC_SOURCE)
3768                         real_peer_disk = D_INCONSISTENT;
3769
3770                 /* if peer_state changes to connected at the same time,
3771                  * it explicitly notifies us that it finished resync.
3772                  * Maybe we should finish it up, too? */
3773                 else if (os.conn >= C_SYNC_SOURCE &&
3774                          peer_state.conn == C_CONNECTED) {
3775                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3776                                 drbd_resync_finished(mdev);
3777                         return 0;
3778                 }
3779         }
3780
3781         /* peer says his disk is inconsistent, while we think it is uptodate,
3782          * and this happens while the peer still thinks we have a sync going on,
3783          * but we think we are already done with the sync.
3784          * We ignore this to avoid flapping pdsk.
3785          * This should not happen, if the peer is a recent version of drbd. */
3786         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3787             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3788                 real_peer_disk = D_UP_TO_DATE;
3789
3790         if (ns.conn == C_WF_REPORT_PARAMS)
3791                 ns.conn = C_CONNECTED;
3792
3793         if (peer_state.conn == C_AHEAD)
3794                 ns.conn = C_BEHIND;
3795
3796         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3797             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3798                 int cr; /* consider resync */
3799
3800                 /* if we established a new connection */
3801                 cr  = (os.conn < C_CONNECTED);
3802                 /* if we had an established connection
3803                  * and one of the nodes newly attaches a disk */
3804                 cr |= (os.conn == C_CONNECTED &&
3805                        (peer_state.disk == D_NEGOTIATING ||
3806                         os.disk == D_NEGOTIATING));
3807                 /* if we have both been inconsistent, and the peer has been
3808                  * forced to be UpToDate with --overwrite-data */
3809                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3810                 /* if we had been plain connected, and the admin requested to
3811                  * start a sync by "invalidate" or "invalidate-remote" */
3812                 cr |= (os.conn == C_CONNECTED &&
3813                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3814                                  peer_state.conn <= C_WF_BITMAP_T));
3815
3816                 if (cr)
3817                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3818
3819                 put_ldev(mdev);
3820                 if (ns.conn == C_MASK) {
3821                         ns.conn = C_CONNECTED;
3822                         if (mdev->state.disk == D_NEGOTIATING) {
3823                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3824                         } else if (peer_state.disk == D_NEGOTIATING) {
3825                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3826                                 peer_state.disk = D_DISKLESS;
3827                                 real_peer_disk = D_DISKLESS;
3828                         } else {
3829                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3830                                         return -EIO;
3831                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3832                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3833                                 return -EIO;
3834                         }
3835                 }
3836         }
3837
3838         spin_lock_irq(&mdev->tconn->req_lock);
3839         if (os.i != drbd_read_state(mdev).i)
3840                 goto retry;
3841         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3842         ns.peer = peer_state.role;
3843         ns.pdsk = real_peer_disk;
3844         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3845         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3846                 ns.disk = mdev->new_state_tmp.disk;
3847         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3848         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3849             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3850                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3851                    for temporal network outages! */
3852                 spin_unlock_irq(&mdev->tconn->req_lock);
3853                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3854                 tl_clear(mdev->tconn);
3855                 drbd_uuid_new_current(mdev);
3856                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3857                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3858                 return -EIO;
3859         }
3860         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3861         ns = drbd_read_state(mdev);
3862         spin_unlock_irq(&mdev->tconn->req_lock);
3863
3864         if (rv < SS_SUCCESS) {
3865                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3866                 return -EIO;
3867         }
3868
3869         if (os.conn > C_WF_REPORT_PARAMS) {
3870                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3871                     peer_state.disk != D_NEGOTIATING ) {
3872                         /* we want resync, peer has not yet decided to sync... */
3873                         /* Nowadays only used when forcing a node into primary role and
3874                            setting its disk to UpToDate with that */
3875                         drbd_send_uuids(mdev);
3876                         drbd_send_state(mdev);
3877                 }
3878         }
3879
3880         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3881
3882         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3883
3884         return 0;
3885 }
3886
3887 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3888 {
3889         struct drbd_conf *mdev;
3890         struct p_rs_uuid *p = pi->data;
3891
3892         mdev = vnr_to_mdev(tconn, pi->vnr);
3893         if (!mdev)
3894                 return -EIO;
3895
3896         wait_event(mdev->misc_wait,
3897                    mdev->state.conn == C_WF_SYNC_UUID ||
3898                    mdev->state.conn == C_BEHIND ||
3899                    mdev->state.conn < C_CONNECTED ||
3900                    mdev->state.disk < D_NEGOTIATING);
3901
3902         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3903
3904         /* Here the _drbd_uuid_ functions are right, current should
3905            _not_ be rotated into the history */
3906         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3907                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3908                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3909
3910                 drbd_print_uuids(mdev, "updated sync uuid");
3911                 drbd_start_resync(mdev, C_SYNC_TARGET);
3912
3913                 put_ldev(mdev);
3914         } else
3915                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3916
3917         return 0;
3918 }
3919
3920 /**
3921  * receive_bitmap_plain
3922  *
3923  * Return 0 when done, 1 when another iteration is needed, and a negative error
3924  * code upon failure.
3925  */
3926 static int
3927 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3928                      unsigned long *p, struct bm_xfer_ctx *c)
3929 {
3930         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3931                                  drbd_header_size(mdev->tconn);
3932         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3933                                        c->bm_words - c->word_offset);
3934         unsigned int want = num_words * sizeof(*p);
3935         int err;
3936
3937         if (want != size) {
3938                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3939                 return -EIO;
3940         }
3941         if (want == 0)
3942                 return 0;
3943         err = drbd_recv_all(mdev->tconn, p, want);
3944         if (err)
3945                 return err;
3946
3947         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3948
3949         c->word_offset += num_words;
3950         c->bit_offset = c->word_offset * BITS_PER_LONG;
3951         if (c->bit_offset > c->bm_bits)
3952                 c->bit_offset = c->bm_bits;
3953
3954         return 1;
3955 }
3956
3957 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3958 {
3959         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3960 }
3961
3962 static int dcbp_get_start(struct p_compressed_bm *p)
3963 {
3964         return (p->encoding & 0x80) != 0;
3965 }
3966
3967 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3968 {
3969         return (p->encoding >> 4) & 0x7;
3970 }
3971
3972 /**
3973  * recv_bm_rle_bits
3974  *
3975  * Return 0 when done, 1 when another iteration is needed, and a negative error
3976  * code upon failure.
3977  */
3978 static int
3979 recv_bm_rle_bits(struct drbd_conf *mdev,
3980                 struct p_compressed_bm *p,
3981                  struct bm_xfer_ctx *c,
3982                  unsigned int len)
3983 {
3984         struct bitstream bs;
3985         u64 look_ahead;
3986         u64 rl;
3987         u64 tmp;
3988         unsigned long s = c->bit_offset;
3989         unsigned long e;
3990         int toggle = dcbp_get_start(p);
3991         int have;
3992         int bits;
3993
3994         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3995
3996         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3997         if (bits < 0)
3998                 return -EIO;
3999
4000         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4001                 bits = vli_decode_bits(&rl, look_ahead);
4002                 if (bits <= 0)
4003                         return -EIO;
4004
4005                 if (toggle) {
4006                         e = s + rl -1;
4007                         if (e >= c->bm_bits) {
4008                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4009                                 return -EIO;
4010                         }
4011                         _drbd_bm_set_bits(mdev, s, e);
4012                 }
4013
4014                 if (have < bits) {
4015                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4016                                 have, bits, look_ahead,
4017                                 (unsigned int)(bs.cur.b - p->code),
4018                                 (unsigned int)bs.buf_len);
4019                         return -EIO;
4020                 }
4021                 look_ahead >>= bits;
4022                 have -= bits;
4023
4024                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4025                 if (bits < 0)
4026                         return -EIO;
4027                 look_ahead |= tmp << have;
4028                 have += bits;
4029         }
4030
4031         c->bit_offset = s;
4032         bm_xfer_ctx_bit_to_word_offset(c);
4033
4034         return (s != c->bm_bits);
4035 }
4036
4037 /**
4038  * decode_bitmap_c
4039  *
4040  * Return 0 when done, 1 when another iteration is needed, and a negative error
4041  * code upon failure.
4042  */
4043 static int
4044 decode_bitmap_c(struct drbd_conf *mdev,
4045                 struct p_compressed_bm *p,
4046                 struct bm_xfer_ctx *c,
4047                 unsigned int len)
4048 {
4049         if (dcbp_get_code(p) == RLE_VLI_Bits)
4050                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4051
4052         /* other variants had been implemented for evaluation,
4053          * but have been dropped as this one turned out to be "best"
4054          * during all our tests. */
4055
4056         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4057         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4058         return -EIO;
4059 }
4060
4061 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4062                 const char *direction, struct bm_xfer_ctx *c)
4063 {
4064         /* what would it take to transfer it "plaintext" */
4065         unsigned int header_size = drbd_header_size(mdev->tconn);
4066         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4067         unsigned int plain =
4068                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4069                 c->bm_words * sizeof(unsigned long);
4070         unsigned int total = c->bytes[0] + c->bytes[1];
4071         unsigned int r;
4072
4073         /* total can not be zero. but just in case: */
4074         if (total == 0)
4075                 return;
4076
4077         /* don't report if not compressed */
4078         if (total >= plain)
4079                 return;
4080
4081         /* total < plain. check for overflow, still */
4082         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4083                                     : (1000 * total / plain);
4084
4085         if (r > 1000)
4086                 r = 1000;
4087
4088         r = 1000 - r;
4089         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4090              "total %u; compression: %u.%u%%\n",
4091                         direction,
4092                         c->bytes[1], c->packets[1],
4093                         c->bytes[0], c->packets[0],
4094                         total, r/10, r % 10);
4095 }
4096
4097 /* Since we are processing the bitfield from lower addresses to higher,
4098    it does not matter if the process it in 32 bit chunks or 64 bit
4099    chunks as long as it is little endian. (Understand it as byte stream,
4100    beginning with the lowest byte...) If we would use big endian
4101    we would need to process it from the highest address to the lowest,
4102    in order to be agnostic to the 32 vs 64 bits issue.
4103
4104    returns 0 on failure, 1 if we successfully received it. */
4105 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4106 {
4107         struct drbd_conf *mdev;
4108         struct bm_xfer_ctx c;
4109         int err;
4110
4111         mdev = vnr_to_mdev(tconn, pi->vnr);
4112         if (!mdev)
4113                 return -EIO;
4114
4115         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4116         /* you are supposed to send additional out-of-sync information
4117          * if you actually set bits during this phase */
4118
4119         c = (struct bm_xfer_ctx) {
4120                 .bm_bits = drbd_bm_bits(mdev),
4121                 .bm_words = drbd_bm_words(mdev),
4122         };
4123
4124         for(;;) {
4125                 if (pi->cmd == P_BITMAP)
4126                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4127                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4128                         /* MAYBE: sanity check that we speak proto >= 90,
4129                          * and the feature is enabled! */
4130                         struct p_compressed_bm *p = pi->data;
4131
4132                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4133                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4134                                 err = -EIO;
4135                                 goto out;
4136                         }
4137                         if (pi->size <= sizeof(*p)) {
4138                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4139                                 err = -EIO;
4140                                 goto out;
4141                         }
4142                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4143                         if (err)
4144                                goto out;
4145                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4146                 } else {
4147                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4148                         err = -EIO;
4149                         goto out;
4150                 }
4151
4152                 c.packets[pi->cmd == P_BITMAP]++;
4153                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4154
4155                 if (err <= 0) {
4156                         if (err < 0)
4157                                 goto out;
4158                         break;
4159                 }
4160                 err = drbd_recv_header(mdev->tconn, pi);
4161                 if (err)
4162                         goto out;
4163         }
4164
4165         INFO_bm_xfer_stats(mdev, "receive", &c);
4166
4167         if (mdev->state.conn == C_WF_BITMAP_T) {
4168                 enum drbd_state_rv rv;
4169
4170                 err = drbd_send_bitmap(mdev);
4171                 if (err)
4172                         goto out;
4173                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4174                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4175                 D_ASSERT(rv == SS_SUCCESS);
4176         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4177                 /* admin may have requested C_DISCONNECTING,
4178                  * other threads may have noticed network errors */
4179                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4180                     drbd_conn_str(mdev->state.conn));
4181         }
4182         err = 0;
4183
4184  out:
4185         drbd_bm_unlock(mdev);
4186         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4187                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4188         return err;
4189 }
4190
4191 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4192 {
4193         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4194                  pi->cmd, pi->size);
4195
4196         return ignore_remaining_packet(tconn, pi);
4197 }
4198
4199 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4200 {
4201         /* Make sure we've acked all the TCP data associated
4202          * with the data requests being unplugged */
4203         drbd_tcp_quickack(tconn->data.socket);
4204
4205         return 0;
4206 }
4207
4208 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4209 {
4210         struct drbd_conf *mdev;
4211         struct p_block_desc *p = pi->data;
4212
4213         mdev = vnr_to_mdev(tconn, pi->vnr);
4214         if (!mdev)
4215                 return -EIO;
4216
4217         switch (mdev->state.conn) {
4218         case C_WF_SYNC_UUID:
4219         case C_WF_BITMAP_T:
4220         case C_BEHIND:
4221                         break;
4222         default:
4223                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4224                                 drbd_conn_str(mdev->state.conn));
4225         }
4226
4227         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4228
4229         return 0;
4230 }
4231
4232 struct data_cmd {
4233         int expect_payload;
4234         size_t pkt_size;
4235         int (*fn)(struct drbd_tconn *, struct packet_info *);
4236 };
4237
4238 static struct data_cmd drbd_cmd_handler[] = {
4239         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4240         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4241         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4242         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4243         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4244         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4245         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4246         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4247         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4248         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4249         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4250         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4251         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4252         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4253         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4254         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4255         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4256         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4257         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4258         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4259         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4260         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4261         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4262         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4263 };
4264
4265 static void drbdd(struct drbd_tconn *tconn)
4266 {
4267         struct packet_info pi;
4268         size_t shs; /* sub header size */
4269         int err;
4270
4271         while (get_t_state(&tconn->receiver) == RUNNING) {
4272                 struct data_cmd *cmd;
4273
4274                 drbd_thread_current_set_cpu(&tconn->receiver);
4275                 if (drbd_recv_header(tconn, &pi))
4276                         goto err_out;
4277
4278                 cmd = &drbd_cmd_handler[pi.cmd];
4279                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4280                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4281                                  cmdname(pi.cmd), pi.cmd);
4282                         goto err_out;
4283                 }
4284
4285                 shs = cmd->pkt_size;
4286                 if (pi.size > shs && !cmd->expect_payload) {
4287                         conn_err(tconn, "No payload expected %s l:%d\n",
4288                                  cmdname(pi.cmd), pi.size);
4289                         goto err_out;
4290                 }
4291
4292                 if (shs) {
4293                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4294                         if (err)
4295                                 goto err_out;
4296                         pi.size -= shs;
4297                 }
4298
4299                 err = cmd->fn(tconn, &pi);
4300                 if (err) {
4301                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4302                                  cmdname(pi.cmd), err, pi.size);
4303                         goto err_out;
4304                 }
4305         }
4306         return;
4307
4308     err_out:
4309         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4310 }
4311
4312 void conn_flush_workqueue(struct drbd_tconn *tconn)
4313 {
4314         struct drbd_wq_barrier barr;
4315
4316         barr.w.cb = w_prev_work_done;
4317         barr.w.tconn = tconn;
4318         init_completion(&barr.done);
4319         drbd_queue_work(&tconn->data.work, &barr.w);
4320         wait_for_completion(&barr.done);
4321 }
4322
4323 static void conn_disconnect(struct drbd_tconn *tconn)
4324 {
4325         struct drbd_conf *mdev;
4326         enum drbd_conns oc;
4327         int vnr;
4328
4329         if (tconn->cstate == C_STANDALONE)
4330                 return;
4331
4332         /* asender does not clean up anything. it must not interfere, either */
4333         drbd_thread_stop(&tconn->asender);
4334         drbd_free_sock(tconn);
4335
4336         rcu_read_lock();
4337         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4338                 kref_get(&mdev->kref);
4339                 rcu_read_unlock();
4340                 drbd_disconnected(mdev);
4341                 kref_put(&mdev->kref, &drbd_minor_destroy);
4342                 rcu_read_lock();
4343         }
4344         rcu_read_unlock();
4345
4346         conn_info(tconn, "Connection closed\n");
4347
4348         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4349                 conn_try_outdate_peer_async(tconn);
4350
4351         spin_lock_irq(&tconn->req_lock);
4352         oc = tconn->cstate;
4353         if (oc >= C_UNCONNECTED)
4354                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4355
4356         spin_unlock_irq(&tconn->req_lock);
4357
4358         if (oc == C_DISCONNECTING)
4359                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4360 }
4361
4362 static int drbd_disconnected(struct drbd_conf *mdev)
4363 {
4364         unsigned int i;
4365
4366         /* wait for current activity to cease. */
4367         spin_lock_irq(&mdev->tconn->req_lock);
4368         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4369         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4370         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4371         spin_unlock_irq(&mdev->tconn->req_lock);
4372
4373         /* We do not have data structures that would allow us to
4374          * get the rs_pending_cnt down to 0 again.
4375          *  * On C_SYNC_TARGET we do not have any data structures describing
4376          *    the pending RSDataRequest's we have sent.
4377          *  * On C_SYNC_SOURCE there is no data structure that tracks
4378          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4379          *  And no, it is not the sum of the reference counts in the
4380          *  resync_LRU. The resync_LRU tracks the whole operation including
4381          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4382          *  on the fly. */
4383         drbd_rs_cancel_all(mdev);
4384         mdev->rs_total = 0;
4385         mdev->rs_failed = 0;
4386         atomic_set(&mdev->rs_pending_cnt, 0);
4387         wake_up(&mdev->misc_wait);
4388
4389         del_timer_sync(&mdev->resync_timer);
4390         resync_timer_fn((unsigned long)mdev);
4391
4392         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4393          * w_make_resync_request etc. which may still be on the worker queue
4394          * to be "canceled" */
4395         drbd_flush_workqueue(mdev);
4396
4397         drbd_finish_peer_reqs(mdev);
4398
4399         kfree(mdev->p_uuid);
4400         mdev->p_uuid = NULL;
4401
4402         if (!drbd_suspended(mdev))
4403                 tl_clear(mdev->tconn);
4404
4405         drbd_md_sync(mdev);
4406
4407         /* serialize with bitmap writeout triggered by the state change,
4408          * if any. */
4409         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4410
4411         /* tcp_close and release of sendpage pages can be deferred.  I don't
4412          * want to use SO_LINGER, because apparently it can be deferred for
4413          * more than 20 seconds (longest time I checked).
4414          *
4415          * Actually we don't care for exactly when the network stack does its
4416          * put_page(), but release our reference on these pages right here.
4417          */
4418         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4419         if (i)
4420                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4421         i = atomic_read(&mdev->pp_in_use_by_net);
4422         if (i)
4423                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4424         i = atomic_read(&mdev->pp_in_use);
4425         if (i)
4426                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4427
4428         D_ASSERT(list_empty(&mdev->read_ee));
4429         D_ASSERT(list_empty(&mdev->active_ee));
4430         D_ASSERT(list_empty(&mdev->sync_ee));
4431         D_ASSERT(list_empty(&mdev->done_ee));
4432
4433         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4434         atomic_set(&mdev->current_epoch->epoch_size, 0);
4435         D_ASSERT(list_empty(&mdev->current_epoch->list));
4436
4437         return 0;
4438 }
4439
4440 /*
4441  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4442  * we can agree on is stored in agreed_pro_version.
4443  *
4444  * feature flags and the reserved array should be enough room for future
4445  * enhancements of the handshake protocol, and possible plugins...
4446  *
4447  * for now, they are expected to be zero, but ignored.
4448  */
4449 static int drbd_send_features(struct drbd_tconn *tconn)
4450 {
4451         struct drbd_socket *sock;
4452         struct p_connection_features *p;
4453
4454         sock = &tconn->data;
4455         p = conn_prepare_command(tconn, sock);
4456         if (!p)
4457                 return -EIO;
4458         memset(p, 0, sizeof(*p));
4459         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4460         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4461         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4462 }
4463
4464 /*
4465  * return values:
4466  *   1 yes, we have a valid connection
4467  *   0 oops, did not work out, please try again
4468  *  -1 peer talks different language,
4469  *     no point in trying again, please go standalone.
4470  */
4471 static int drbd_do_features(struct drbd_tconn *tconn)
4472 {
4473         /* ASSERT current == tconn->receiver ... */
4474         struct p_connection_features *p;
4475         const int expect = sizeof(struct p_connection_features);
4476         struct packet_info pi;
4477         int err;
4478
4479         err = drbd_send_features(tconn);
4480         if (err)
4481                 return 0;
4482
4483         err = drbd_recv_header(tconn, &pi);
4484         if (err)
4485                 return 0;
4486
4487         if (pi.cmd != P_CONNECTION_FEATURES) {
4488                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4489                          cmdname(pi.cmd), pi.cmd);
4490                 return -1;
4491         }
4492
4493         if (pi.size != expect) {
4494                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4495                      expect, pi.size);
4496                 return -1;
4497         }
4498
4499         p = pi.data;
4500         err = drbd_recv_all_warn(tconn, p, expect);
4501         if (err)
4502                 return 0;
4503
4504         p->protocol_min = be32_to_cpu(p->protocol_min);
4505         p->protocol_max = be32_to_cpu(p->protocol_max);
4506         if (p->protocol_max == 0)
4507                 p->protocol_max = p->protocol_min;
4508
4509         if (PRO_VERSION_MAX < p->protocol_min ||
4510             PRO_VERSION_MIN > p->protocol_max)
4511                 goto incompat;
4512
4513         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4514
4515         conn_info(tconn, "Handshake successful: "
4516              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4517
4518         return 1;
4519
4520  incompat:
4521         conn_err(tconn, "incompatible DRBD dialects: "
4522             "I support %d-%d, peer supports %d-%d\n",
4523             PRO_VERSION_MIN, PRO_VERSION_MAX,
4524             p->protocol_min, p->protocol_max);
4525         return -1;
4526 }
4527
4528 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4529 static int drbd_do_auth(struct drbd_tconn *tconn)
4530 {
4531         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4532         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4533         return -1;
4534 }
4535 #else
4536 #define CHALLENGE_LEN 64
4537
4538 /* Return value:
4539         1 - auth succeeded,
4540         0 - failed, try again (network error),
4541         -1 - auth failed, don't try again.
4542 */
4543
4544 static int drbd_do_auth(struct drbd_tconn *tconn)
4545 {
4546         struct drbd_socket *sock;
4547         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4548         struct scatterlist sg;
4549         char *response = NULL;
4550         char *right_response = NULL;
4551         char *peers_ch = NULL;
4552         unsigned int key_len;
4553         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4554         unsigned int resp_size;
4555         struct hash_desc desc;
4556         struct packet_info pi;
4557         struct net_conf *nc;
4558         int err, rv;
4559
4560         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4561
4562         rcu_read_lock();
4563         nc = rcu_dereference(tconn->net_conf);
4564         key_len = strlen(nc->shared_secret);
4565         memcpy(secret, nc->shared_secret, key_len);
4566         rcu_read_unlock();
4567
4568         desc.tfm = tconn->cram_hmac_tfm;
4569         desc.flags = 0;
4570
4571         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4572         if (rv) {
4573                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4574                 rv = -1;
4575                 goto fail;
4576         }
4577
4578         get_random_bytes(my_challenge, CHALLENGE_LEN);
4579
4580         sock = &tconn->data;
4581         if (!conn_prepare_command(tconn, sock)) {
4582                 rv = 0;
4583                 goto fail;
4584         }
4585         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4586                                 my_challenge, CHALLENGE_LEN);
4587         if (!rv)
4588                 goto fail;
4589
4590         err = drbd_recv_header(tconn, &pi);
4591         if (err) {
4592                 rv = 0;
4593                 goto fail;
4594         }
4595
4596         if (pi.cmd != P_AUTH_CHALLENGE) {
4597                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4598                          cmdname(pi.cmd), pi.cmd);
4599                 rv = 0;
4600                 goto fail;
4601         }
4602
4603         if (pi.size > CHALLENGE_LEN * 2) {
4604                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4605                 rv = -1;
4606                 goto fail;
4607         }
4608
4609         peers_ch = kmalloc(pi.size, GFP_NOIO);
4610         if (peers_ch == NULL) {
4611                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4612                 rv = -1;
4613                 goto fail;
4614         }
4615
4616         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4617         if (err) {
4618                 rv = 0;
4619                 goto fail;
4620         }
4621
4622         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4623         response = kmalloc(resp_size, GFP_NOIO);
4624         if (response == NULL) {
4625                 conn_err(tconn, "kmalloc of response failed\n");
4626                 rv = -1;
4627                 goto fail;
4628         }
4629
4630         sg_init_table(&sg, 1);
4631         sg_set_buf(&sg, peers_ch, pi.size);
4632
4633         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4634         if (rv) {
4635                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4636                 rv = -1;
4637                 goto fail;
4638         }
4639
4640         if (!conn_prepare_command(tconn, sock)) {
4641                 rv = 0;
4642                 goto fail;
4643         }
4644         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4645                                 response, resp_size);
4646         if (!rv)
4647                 goto fail;
4648
4649         err = drbd_recv_header(tconn, &pi);
4650         if (err) {
4651                 rv = 0;
4652                 goto fail;
4653         }
4654
4655         if (pi.cmd != P_AUTH_RESPONSE) {
4656                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4657                          cmdname(pi.cmd), pi.cmd);
4658                 rv = 0;
4659                 goto fail;
4660         }
4661
4662         if (pi.size != resp_size) {
4663                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4664                 rv = 0;
4665                 goto fail;
4666         }
4667
4668         err = drbd_recv_all_warn(tconn, response , resp_size);
4669         if (err) {
4670                 rv = 0;
4671                 goto fail;
4672         }
4673
4674         right_response = kmalloc(resp_size, GFP_NOIO);
4675         if (right_response == NULL) {
4676                 conn_err(tconn, "kmalloc of right_response failed\n");
4677                 rv = -1;
4678                 goto fail;
4679         }
4680
4681         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4682
4683         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4684         if (rv) {
4685                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4686                 rv = -1;
4687                 goto fail;
4688         }
4689
4690         rv = !memcmp(response, right_response, resp_size);
4691
4692         if (rv)
4693                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4694                      resp_size);
4695         else
4696                 rv = -1;
4697
4698  fail:
4699         kfree(peers_ch);
4700         kfree(response);
4701         kfree(right_response);
4702
4703         return rv;
4704 }
4705 #endif
4706
4707 int drbdd_init(struct drbd_thread *thi)
4708 {
4709         struct drbd_tconn *tconn = thi->tconn;
4710         int h;
4711
4712         conn_info(tconn, "receiver (re)started\n");
4713
4714         do {
4715                 h = conn_connect(tconn);
4716                 if (h == 0) {
4717                         conn_disconnect(tconn);
4718                         schedule_timeout_interruptible(HZ);
4719                 }
4720                 if (h == -1) {
4721                         conn_warn(tconn, "Discarding network configuration.\n");
4722                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4723                 }
4724         } while (h == 0);
4725
4726         if (h > 0)
4727                 drbdd(tconn);
4728
4729         conn_disconnect(tconn);
4730
4731         conn_info(tconn, "receiver terminated\n");
4732         return 0;
4733 }
4734
4735 /* ********* acknowledge sender ******** */
4736
4737 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4738 {
4739         struct p_req_state_reply *p = pi->data;
4740         int retcode = be32_to_cpu(p->retcode);
4741
4742         if (retcode >= SS_SUCCESS) {
4743                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4744         } else {
4745                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4746                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4747                          drbd_set_st_err_str(retcode), retcode);
4748         }
4749         wake_up(&tconn->ping_wait);
4750
4751         return 0;
4752 }
4753
4754 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4755 {
4756         struct drbd_conf *mdev;
4757         struct p_req_state_reply *p = pi->data;
4758         int retcode = be32_to_cpu(p->retcode);
4759
4760         mdev = vnr_to_mdev(tconn, pi->vnr);
4761         if (!mdev)
4762                 return -EIO;
4763
4764         if (retcode >= SS_SUCCESS) {
4765                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4766         } else {
4767                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4768                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4769                         drbd_set_st_err_str(retcode), retcode);
4770         }
4771         wake_up(&mdev->state_wait);
4772
4773         return 0;
4774 }
4775
4776 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4777 {
4778         return drbd_send_ping_ack(tconn);
4779
4780 }
4781
4782 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4783 {
4784         /* restore idle timeout */
4785         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4786         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4787                 wake_up(&tconn->ping_wait);
4788
4789         return 0;
4790 }
4791
4792 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4793 {
4794         struct drbd_conf *mdev;
4795         struct p_block_ack *p = pi->data;
4796         sector_t sector = be64_to_cpu(p->sector);
4797         int blksize = be32_to_cpu(p->blksize);
4798
4799         mdev = vnr_to_mdev(tconn, pi->vnr);
4800         if (!mdev)
4801                 return -EIO;
4802
4803         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4804
4805         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4806
4807         if (get_ldev(mdev)) {
4808                 drbd_rs_complete_io(mdev, sector);
4809                 drbd_set_in_sync(mdev, sector, blksize);
4810                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4811                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4812                 put_ldev(mdev);
4813         }
4814         dec_rs_pending(mdev);
4815         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4816
4817         return 0;
4818 }
4819
4820 static int
4821 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4822                               struct rb_root *root, const char *func,
4823                               enum drbd_req_event what, bool missing_ok)
4824 {
4825         struct drbd_request *req;
4826         struct bio_and_error m;
4827
4828         spin_lock_irq(&mdev->tconn->req_lock);
4829         req = find_request(mdev, root, id, sector, missing_ok, func);
4830         if (unlikely(!req)) {
4831                 spin_unlock_irq(&mdev->tconn->req_lock);
4832                 return -EIO;
4833         }
4834         __req_mod(req, what, &m);
4835         spin_unlock_irq(&mdev->tconn->req_lock);
4836
4837         if (m.bio)
4838                 complete_master_bio(mdev, &m);
4839         return 0;
4840 }
4841
4842 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4843 {
4844         struct drbd_conf *mdev;
4845         struct p_block_ack *p = pi->data;
4846         sector_t sector = be64_to_cpu(p->sector);
4847         int blksize = be32_to_cpu(p->blksize);
4848         enum drbd_req_event what;
4849
4850         mdev = vnr_to_mdev(tconn, pi->vnr);
4851         if (!mdev)
4852                 return -EIO;
4853
4854         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4855
4856         if (p->block_id == ID_SYNCER) {
4857                 drbd_set_in_sync(mdev, sector, blksize);
4858                 dec_rs_pending(mdev);
4859                 return 0;
4860         }
4861         switch (pi->cmd) {
4862         case P_RS_WRITE_ACK:
4863                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4864                 break;
4865         case P_WRITE_ACK:
4866                 what = WRITE_ACKED_BY_PEER;
4867                 break;
4868         case P_RECV_ACK:
4869                 what = RECV_ACKED_BY_PEER;
4870                 break;
4871         case P_DISCARD_WRITE:
4872                 what = DISCARD_WRITE;
4873                 break;
4874         case P_RETRY_WRITE:
4875                 what = POSTPONE_WRITE;
4876                 break;
4877         default:
4878                 BUG();
4879         }
4880
4881         return validate_req_change_req_state(mdev, p->block_id, sector,
4882                                              &mdev->write_requests, __func__,
4883                                              what, false);
4884 }
4885
4886 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4887 {
4888         struct drbd_conf *mdev;
4889         struct p_block_ack *p = pi->data;
4890         sector_t sector = be64_to_cpu(p->sector);
4891         int size = be32_to_cpu(p->blksize);
4892         int err;
4893
4894         mdev = vnr_to_mdev(tconn, pi->vnr);
4895         if (!mdev)
4896                 return -EIO;
4897
4898         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4899
4900         if (p->block_id == ID_SYNCER) {
4901                 dec_rs_pending(mdev);
4902                 drbd_rs_failed_io(mdev, sector, size);
4903                 return 0;
4904         }
4905
4906         err = validate_req_change_req_state(mdev, p->block_id, sector,
4907                                             &mdev->write_requests, __func__,
4908                                             NEG_ACKED, true);
4909         if (err) {
4910                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4911                    The master bio might already be completed, therefore the
4912                    request is no longer in the collision hash. */
4913                 /* In Protocol B we might already have got a P_RECV_ACK
4914                    but then get a P_NEG_ACK afterwards. */
4915                 drbd_set_out_of_sync(mdev, sector, size);
4916         }
4917         return 0;
4918 }
4919
4920 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4921 {
4922         struct drbd_conf *mdev;
4923         struct p_block_ack *p = pi->data;
4924         sector_t sector = be64_to_cpu(p->sector);
4925
4926         mdev = vnr_to_mdev(tconn, pi->vnr);
4927         if (!mdev)
4928                 return -EIO;
4929
4930         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4931
4932         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4933             (unsigned long long)sector, be32_to_cpu(p->blksize));
4934
4935         return validate_req_change_req_state(mdev, p->block_id, sector,
4936                                              &mdev->read_requests, __func__,
4937                                              NEG_ACKED, false);
4938 }
4939
4940 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4941 {
4942         struct drbd_conf *mdev;
4943         sector_t sector;
4944         int size;
4945         struct p_block_ack *p = pi->data;
4946
4947         mdev = vnr_to_mdev(tconn, pi->vnr);
4948         if (!mdev)
4949                 return -EIO;
4950
4951         sector = be64_to_cpu(p->sector);
4952         size = be32_to_cpu(p->blksize);
4953
4954         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4955
4956         dec_rs_pending(mdev);
4957
4958         if (get_ldev_if_state(mdev, D_FAILED)) {
4959                 drbd_rs_complete_io(mdev, sector);
4960                 switch (pi->cmd) {
4961                 case P_NEG_RS_DREPLY:
4962                         drbd_rs_failed_io(mdev, sector, size);
4963                 case P_RS_CANCEL:
4964                         break;
4965                 default:
4966                         BUG();
4967                 }
4968                 put_ldev(mdev);
4969         }
4970
4971         return 0;
4972 }
4973
4974 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4975 {
4976         struct drbd_conf *mdev;
4977         struct p_barrier_ack *p = pi->data;
4978
4979         mdev = vnr_to_mdev(tconn, pi->vnr);
4980         if (!mdev)
4981                 return -EIO;
4982
4983         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4984
4985         if (mdev->state.conn == C_AHEAD &&
4986             atomic_read(&mdev->ap_in_flight) == 0 &&
4987             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4988                 mdev->start_resync_timer.expires = jiffies + HZ;
4989                 add_timer(&mdev->start_resync_timer);
4990         }
4991
4992         return 0;
4993 }
4994
4995 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4996 {
4997         struct drbd_conf *mdev;
4998         struct p_block_ack *p = pi->data;
4999         struct drbd_work *w;
5000         sector_t sector;
5001         int size;
5002
5003         mdev = vnr_to_mdev(tconn, pi->vnr);
5004         if (!mdev)
5005                 return -EIO;
5006
5007         sector = be64_to_cpu(p->sector);
5008         size = be32_to_cpu(p->blksize);
5009
5010         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5011
5012         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5013                 drbd_ov_out_of_sync_found(mdev, sector, size);
5014         else
5015                 ov_out_of_sync_print(mdev);
5016
5017         if (!get_ldev(mdev))
5018                 return 0;
5019
5020         drbd_rs_complete_io(mdev, sector);
5021         dec_rs_pending(mdev);
5022
5023         --mdev->ov_left;
5024
5025         /* let's advance progress step marks only for every other megabyte */
5026         if ((mdev->ov_left & 0x200) == 0x200)
5027                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5028
5029         if (mdev->ov_left == 0) {
5030                 w = kmalloc(sizeof(*w), GFP_NOIO);
5031                 if (w) {
5032                         w->cb = w_ov_finished;
5033                         w->mdev = mdev;
5034                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5035                 } else {
5036                         dev_err(DEV, "kmalloc(w) failed.");
5037                         ov_out_of_sync_print(mdev);
5038                         drbd_resync_finished(mdev);
5039                 }
5040         }
5041         put_ldev(mdev);
5042         return 0;
5043 }
5044
5045 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5046 {
5047         return 0;
5048 }
5049
5050 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5051 {
5052         struct drbd_conf *mdev;
5053         int vnr, not_empty = 0;
5054
5055         do {
5056                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5057                 flush_signals(current);
5058
5059                 rcu_read_lock();
5060                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5061                         kref_get(&mdev->kref);
5062                         rcu_read_unlock();
5063                         if (drbd_finish_peer_reqs(mdev)) {
5064                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5065                                 return 1;
5066                         }
5067                         kref_put(&mdev->kref, &drbd_minor_destroy);
5068                         rcu_read_lock();
5069                 }
5070                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5071
5072                 spin_lock_irq(&tconn->req_lock);
5073                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5074                         not_empty = !list_empty(&mdev->done_ee);
5075                         if (not_empty)
5076                                 break;
5077                 }
5078                 spin_unlock_irq(&tconn->req_lock);
5079                 rcu_read_unlock();
5080         } while (not_empty);
5081
5082         return 0;
5083 }
5084
5085 struct asender_cmd {
5086         size_t pkt_size;
5087         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5088 };
5089
5090 static struct asender_cmd asender_tbl[] = {
5091         [P_PING]            = { 0, got_Ping },
5092         [P_PING_ACK]        = { 0, got_PingAck },
5093         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5094         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5095         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5096         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5097         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5098         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5099         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5100         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5101         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5102         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5103         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5104         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5105         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5106         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5107         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5108 };
5109
5110 int drbd_asender(struct drbd_thread *thi)
5111 {
5112         struct drbd_tconn *tconn = thi->tconn;
5113         struct asender_cmd *cmd = NULL;
5114         struct packet_info pi;
5115         int rv;
5116         void *buf    = tconn->meta.rbuf;
5117         int received = 0;
5118         unsigned int header_size = drbd_header_size(tconn);
5119         int expect   = header_size;
5120         bool ping_timeout_active = false;
5121         struct net_conf *nc;
5122         int ping_timeo, tcp_cork, ping_int;
5123
5124         current->policy = SCHED_RR;  /* Make this a realtime task! */
5125         current->rt_priority = 2;    /* more important than all other tasks */
5126
5127         while (get_t_state(thi) == RUNNING) {
5128                 drbd_thread_current_set_cpu(thi);
5129
5130                 rcu_read_lock();
5131                 nc = rcu_dereference(tconn->net_conf);
5132                 ping_timeo = nc->ping_timeo;
5133                 tcp_cork = nc->tcp_cork;
5134                 ping_int = nc->ping_int;
5135                 rcu_read_unlock();
5136
5137                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5138                         if (drbd_send_ping(tconn)) {
5139                                 conn_err(tconn, "drbd_send_ping has failed\n");
5140                                 goto reconnect;
5141                         }
5142                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5143                         ping_timeout_active = true;
5144                 }
5145
5146                 /* TODO: conditionally cork; it may hurt latency if we cork without
5147                    much to send */
5148                 if (tcp_cork)
5149                         drbd_tcp_cork(tconn->meta.socket);
5150                 if (tconn_finish_peer_reqs(tconn)) {
5151                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5152                         goto reconnect;
5153                 }
5154                 /* but unconditionally uncork unless disabled */
5155                 if (tcp_cork)
5156                         drbd_tcp_uncork(tconn->meta.socket);
5157
5158                 /* short circuit, recv_msg would return EINTR anyways. */
5159                 if (signal_pending(current))
5160                         continue;
5161
5162                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5163                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5164
5165                 flush_signals(current);
5166
5167                 /* Note:
5168                  * -EINTR        (on meta) we got a signal
5169                  * -EAGAIN       (on meta) rcvtimeo expired
5170                  * -ECONNRESET   other side closed the connection
5171                  * -ERESTARTSYS  (on data) we got a signal
5172                  * rv <  0       other than above: unexpected error!
5173                  * rv == expected: full header or command
5174                  * rv <  expected: "woken" by signal during receive
5175                  * rv == 0       : "connection shut down by peer"
5176                  */
5177                 if (likely(rv > 0)) {
5178                         received += rv;
5179                         buf      += rv;
5180                 } else if (rv == 0) {
5181                         conn_err(tconn, "meta connection shut down by peer.\n");
5182                         goto reconnect;
5183                 } else if (rv == -EAGAIN) {
5184                         /* If the data socket received something meanwhile,
5185                          * that is good enough: peer is still alive. */
5186                         if (time_after(tconn->last_received,
5187                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5188                                 continue;
5189                         if (ping_timeout_active) {
5190                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5191                                 goto reconnect;
5192                         }
5193                         set_bit(SEND_PING, &tconn->flags);
5194                         continue;
5195                 } else if (rv == -EINTR) {
5196                         continue;
5197                 } else {
5198                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5199                         goto reconnect;
5200                 }
5201
5202                 if (received == expect && cmd == NULL) {
5203                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5204                                 goto reconnect;
5205                         cmd = &asender_tbl[pi.cmd];
5206                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5207                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5208                                          cmdname(pi.cmd), pi.cmd);
5209                                 goto disconnect;
5210                         }
5211                         expect = header_size + cmd->pkt_size;
5212                         if (pi.size != expect - header_size) {
5213                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5214                                         pi.cmd, pi.size);
5215                                 goto reconnect;
5216                         }
5217                 }
5218                 if (received == expect) {
5219                         bool err;
5220
5221                         err = cmd->fn(tconn, &pi);
5222                         if (err) {
5223                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5224                                 goto reconnect;
5225                         }
5226
5227                         tconn->last_received = jiffies;
5228
5229                         if (cmd == &asender_tbl[P_PING_ACK]) {
5230                                 /* restore idle timeout */
5231                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5232                                 ping_timeout_active = false;
5233                         }
5234
5235                         buf      = tconn->meta.rbuf;
5236                         received = 0;
5237                         expect   = header_size;
5238                         cmd      = NULL;
5239                 }
5240         }
5241
5242         if (0) {
5243 reconnect:
5244                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5245         }
5246         if (0) {
5247 disconnect:
5248                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5249         }
5250         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5251
5252         conn_info(tconn, "asender terminated\n");
5253
5254         return 0;
5255 }