drbd: rcu_read_lock() and rcu_dereference() for tconn->net_conf
[linux-2.6-block.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, try_connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         try_connect_int = nc->try_connect_int;
633
634         my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645         rcu_read_unlock();
646
647         what = "sock_create_kern";
648         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649                                SOCK_STREAM, IPPROTO_TCP, &sock);
650         if (err < 0) {
651                 sock = NULL;
652                 goto out;
653         }
654
655         sock->sk->sk_rcvtimeo =
656         sock->sk->sk_sndtimeo = try_connect_int * HZ;
657         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
658
659        /* explicitly bind to the configured IP as source IP
660         *  for the outgoing connections.
661         *  This is needed for multihomed hosts and to be
662         *  able to use lo: interfaces for drbd.
663         * Make sure to use 0 as port number, so linux selects
664         *  a free one dynamically.
665         */
666         what = "bind before connect";
667         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
668         if (err < 0)
669                 goto out;
670
671         /* connect may fail, peer not yet available.
672          * stay C_WF_CONNECTION, don't go Disconnecting! */
673         disconnect_on_error = 0;
674         what = "connect";
675         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
676
677 out:
678         if (err < 0) {
679                 if (sock) {
680                         sock_release(sock);
681                         sock = NULL;
682                 }
683                 switch (-err) {
684                         /* timeout, busy, signal pending */
685                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686                 case EINTR: case ERESTARTSYS:
687                         /* peer not (yet) available, network problem */
688                 case ECONNREFUSED: case ENETUNREACH:
689                 case EHOSTDOWN:    case EHOSTUNREACH:
690                         disconnect_on_error = 0;
691                         break;
692                 default:
693                         conn_err(tconn, "%s failed, err = %d\n", what, err);
694                 }
695                 if (disconnect_on_error)
696                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
697         }
698
699         return sock;
700 }
701
702 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
703 {
704         int timeo, err, my_addr_len;
705         int sndbuf_size, rcvbuf_size, try_connect_int;
706         struct socket *s_estab = NULL, *s_listen;
707         struct sockaddr_in6 my_addr;
708         struct net_conf *nc;
709         const char *what;
710
711         rcu_read_lock();
712         nc = rcu_dereference(tconn->net_conf);
713         if (!nc) {
714                 rcu_read_unlock();
715                 return NULL;
716         }
717
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         try_connect_int = nc->try_connect_int;
721
722         my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, nc->my_addr, my_addr_len);
724         rcu_read_unlock();
725
726         what = "sock_create_kern";
727         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
728                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         timeo = try_connect_int * HZ;
735         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
738         s_listen->sk->sk_rcvtimeo = timeo;
739         s_listen->sk->sk_sndtimeo = timeo;
740         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
741
742         what = "bind before listen";
743         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744         if (err < 0)
745                 goto out;
746
747         err = drbd_accept(&what, s_listen, &s_estab);
748
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return s_estab;
760 }
761
762 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
763
764 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765                              enum drbd_packet cmd)
766 {
767         if (!conn_prepare_command(tconn, sock))
768                 return -EIO;
769         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
770 }
771
772 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
773 {
774         unsigned int header_size = drbd_header_size(tconn);
775         struct packet_info pi;
776         int err;
777
778         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779         if (err != header_size) {
780                 if (err >= 0)
781                         err = -EIO;
782                 return err;
783         }
784         err = decode_header(tconn, tconn->data.rbuf, &pi);
785         if (err)
786                 return err;
787         return pi.cmd;
788 }
789
790 /**
791  * drbd_socket_okay() - Free the socket if its connection is not okay
792  * @sock:       pointer to the pointer to the socket.
793  */
794 static int drbd_socket_okay(struct socket **sock)
795 {
796         int rr;
797         char tb[4];
798
799         if (!*sock)
800                 return false;
801
802         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
803
804         if (rr > 0 || rr == -EAGAIN) {
805                 return true;
806         } else {
807                 sock_release(*sock);
808                 *sock = NULL;
809                 return false;
810         }
811 }
812 /* Gets called if a connection is established, or if a new minor gets created
813    in a connection */
814 int drbd_connected(int vnr, void *p, void *data)
815 {
816         struct drbd_conf *mdev = (struct drbd_conf *)p;
817         int err;
818
819         atomic_set(&mdev->packet_seq, 0);
820         mdev->peer_seq = 0;
821
822         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
823                 &mdev->tconn->cstate_mutex :
824                 &mdev->own_state_mutex;
825
826         err = drbd_send_sync_param(mdev);
827         if (!err)
828                 err = drbd_send_sizes(mdev, 0, 0);
829         if (!err)
830                 err = drbd_send_uuids(mdev);
831         if (!err)
832                 err = drbd_send_state(mdev);
833         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
834         clear_bit(RESIZE_PENDING, &mdev->flags);
835         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
836         return err;
837 }
838
839 /*
840  * return values:
841  *   1 yes, we have a valid connection
842  *   0 oops, did not work out, please try again
843  *  -1 peer talks different language,
844  *     no point in trying again, please go standalone.
845  *  -2 We do not have a network config...
846  */
847 static int drbd_connect(struct drbd_tconn *tconn)
848 {
849         struct socket *sock, *msock;
850         struct net_conf *nc;
851         int timeout, try, h, ok;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
857
858         /* Assume that the peer only understands protocol 80 until we know better.  */
859         tconn->agreed_pro_version = 80;
860
861         do {
862                 struct socket *s;
863
864                 for (try = 0;;) {
865                         /* 3 tries, this should take less than a second! */
866                         s = drbd_try_connect(tconn);
867                         if (s || ++try >= 3)
868                                 break;
869                         /* give the other side time to call bind() & listen() */
870                         schedule_timeout_interruptible(HZ / 10);
871                 }
872
873                 if (s) {
874                         if (!tconn->data.socket) {
875                                 tconn->data.socket = s;
876                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
877                         } else if (!tconn->meta.socket) {
878                                 tconn->meta.socket = s;
879                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
880                         } else {
881                                 conn_err(tconn, "Logic error in drbd_connect()\n");
882                                 goto out_release_sockets;
883                         }
884                 }
885
886                 if (tconn->data.socket && tconn->meta.socket) {
887                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
888                         ok = drbd_socket_okay(&tconn->data.socket);
889                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
890                         if (ok)
891                                 break;
892                 }
893
894 retry:
895                 s = drbd_wait_for_connect(tconn);
896                 if (s) {
897                         try = receive_first_packet(tconn, s);
898                         drbd_socket_okay(&tconn->data.socket);
899                         drbd_socket_okay(&tconn->meta.socket);
900                         switch (try) {
901                         case P_INITIAL_DATA:
902                                 if (tconn->data.socket) {
903                                         conn_warn(tconn, "initial packet S crossed\n");
904                                         sock_release(tconn->data.socket);
905                                 }
906                                 tconn->data.socket = s;
907                                 break;
908                         case P_INITIAL_META:
909                                 if (tconn->meta.socket) {
910                                         conn_warn(tconn, "initial packet M crossed\n");
911                                         sock_release(tconn->meta.socket);
912                                 }
913                                 tconn->meta.socket = s;
914                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
915                                 break;
916                         default:
917                                 conn_warn(tconn, "Error receiving initial packet\n");
918                                 sock_release(s);
919                                 if (random32() & 1)
920                                         goto retry;
921                         }
922                 }
923
924                 if (tconn->cstate <= C_DISCONNECTING)
925                         goto out_release_sockets;
926                 if (signal_pending(current)) {
927                         flush_signals(current);
928                         smp_rmb();
929                         if (get_t_state(&tconn->receiver) == EXITING)
930                                 goto out_release_sockets;
931                 }
932
933                 if (tconn->data.socket && &tconn->meta.socket) {
934                         ok = drbd_socket_okay(&tconn->data.socket);
935                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
936                         if (ok)
937                                 break;
938                 }
939         } while (1);
940
941         sock  = tconn->data.socket;
942         msock = tconn->meta.socket;
943
944         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947         sock->sk->sk_allocation = GFP_NOIO;
948         msock->sk->sk_allocation = GFP_NOIO;
949
950         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
953         /* NOT YET ...
954          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
955          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
956          * first set it to the P_CONNECTION_FEATURES timeout,
957          * which we set to 4x the configured ping_timeout. */
958         rcu_read_lock();
959         nc = rcu_dereference(tconn->net_conf);
960
961         sock->sk->sk_sndtimeo =
962         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965         timeout = nc->timeout * HZ / 10;
966         rcu_read_unlock();
967
968         msock->sk->sk_sndtimeo = timeout;
969
970         /* we don't want delays.
971          * we use TCP_CORK where appropriate, though */
972         drbd_tcp_nodelay(sock);
973         drbd_tcp_nodelay(msock);
974
975         tconn->last_received = jiffies;
976
977         h = drbd_do_features(tconn);
978         if (h <= 0)
979                 return h;
980
981         if (tconn->cram_hmac_tfm) {
982                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
983                 switch (drbd_do_auth(tconn)) {
984                 case -1:
985                         conn_err(tconn, "Authentication of peer failed\n");
986                         return -1;
987                 case 0:
988                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
989                         return 0;
990                 }
991         }
992
993         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
994                 return 0;
995
996         sock->sk->sk_sndtimeo = timeout;
997         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
999         drbd_thread_start(&tconn->asender);
1000
1001         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1002                 return -1;
1003
1004         down_read(&drbd_cfg_rwsem);
1005         h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
1006         up_read(&drbd_cfg_rwsem);
1007         return h;
1008
1009 out_release_sockets:
1010         if (tconn->data.socket) {
1011                 sock_release(tconn->data.socket);
1012                 tconn->data.socket = NULL;
1013         }
1014         if (tconn->meta.socket) {
1015                 sock_release(tconn->meta.socket);
1016                 tconn->meta.socket = NULL;
1017         }
1018         return -1;
1019 }
1020
1021 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1022 {
1023         unsigned int header_size = drbd_header_size(tconn);
1024
1025         if (header_size == sizeof(struct p_header100) &&
1026             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1027                 struct p_header100 *h = header;
1028                 if (h->pad != 0) {
1029                         conn_err(tconn, "Header padding is not zero\n");
1030                         return -EINVAL;
1031                 }
1032                 pi->vnr = be16_to_cpu(h->volume);
1033                 pi->cmd = be16_to_cpu(h->command);
1034                 pi->size = be32_to_cpu(h->length);
1035         } else if (header_size == sizeof(struct p_header95) &&
1036                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1037                 struct p_header95 *h = header;
1038                 pi->cmd = be16_to_cpu(h->command);
1039                 pi->size = be32_to_cpu(h->length);
1040                 pi->vnr = 0;
1041         } else if (header_size == sizeof(struct p_header80) &&
1042                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1043                 struct p_header80 *h = header;
1044                 pi->cmd = be16_to_cpu(h->command);
1045                 pi->size = be16_to_cpu(h->length);
1046                 pi->vnr = 0;
1047         } else {
1048                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1049                          be32_to_cpu(*(__be32 *)header),
1050                          tconn->agreed_pro_version);
1051                 return -EINVAL;
1052         }
1053         pi->data = header + header_size;
1054         return 0;
1055 }
1056
1057 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1058 {
1059         void *buffer = tconn->data.rbuf;
1060         int err;
1061
1062         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1063         if (err)
1064                 return err;
1065
1066         err = decode_header(tconn, buffer, pi);
1067         tconn->last_received = jiffies;
1068
1069         return err;
1070 }
1071
1072 static void drbd_flush(struct drbd_conf *mdev)
1073 {
1074         int rv;
1075
1076         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1077                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1078                                         NULL);
1079                 if (rv) {
1080                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1081                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1082                          * don't try again for ANY return value != 0
1083                          * if (rv == -EOPNOTSUPP) */
1084                         drbd_bump_write_ordering(mdev, WO_drain_io);
1085                 }
1086                 put_ldev(mdev);
1087         }
1088 }
1089
1090 /**
1091  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1092  * @mdev:       DRBD device.
1093  * @epoch:      Epoch object.
1094  * @ev:         Epoch event.
1095  */
1096 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1097                                                struct drbd_epoch *epoch,
1098                                                enum epoch_event ev)
1099 {
1100         int epoch_size;
1101         struct drbd_epoch *next_epoch;
1102         enum finish_epoch rv = FE_STILL_LIVE;
1103
1104         spin_lock(&mdev->epoch_lock);
1105         do {
1106                 next_epoch = NULL;
1107
1108                 epoch_size = atomic_read(&epoch->epoch_size);
1109
1110                 switch (ev & ~EV_CLEANUP) {
1111                 case EV_PUT:
1112                         atomic_dec(&epoch->active);
1113                         break;
1114                 case EV_GOT_BARRIER_NR:
1115                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1116                         break;
1117                 case EV_BECAME_LAST:
1118                         /* nothing to do*/
1119                         break;
1120                 }
1121
1122                 if (epoch_size != 0 &&
1123                     atomic_read(&epoch->active) == 0 &&
1124                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1125                         if (!(ev & EV_CLEANUP)) {
1126                                 spin_unlock(&mdev->epoch_lock);
1127                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1128                                 spin_lock(&mdev->epoch_lock);
1129                         }
1130                         dec_unacked(mdev);
1131
1132                         if (mdev->current_epoch != epoch) {
1133                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1134                                 list_del(&epoch->list);
1135                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1136                                 mdev->epochs--;
1137                                 kfree(epoch);
1138
1139                                 if (rv == FE_STILL_LIVE)
1140                                         rv = FE_DESTROYED;
1141                         } else {
1142                                 epoch->flags = 0;
1143                                 atomic_set(&epoch->epoch_size, 0);
1144                                 /* atomic_set(&epoch->active, 0); is already zero */
1145                                 if (rv == FE_STILL_LIVE)
1146                                         rv = FE_RECYCLED;
1147                                 wake_up(&mdev->ee_wait);
1148                         }
1149                 }
1150
1151                 if (!next_epoch)
1152                         break;
1153
1154                 epoch = next_epoch;
1155         } while (1);
1156
1157         spin_unlock(&mdev->epoch_lock);
1158
1159         return rv;
1160 }
1161
1162 /**
1163  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1164  * @mdev:       DRBD device.
1165  * @wo:         Write ordering method to try.
1166  */
1167 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1168 {
1169         enum write_ordering_e pwo;
1170         static char *write_ordering_str[] = {
1171                 [WO_none] = "none",
1172                 [WO_drain_io] = "drain",
1173                 [WO_bdev_flush] = "flush",
1174         };
1175
1176         pwo = mdev->write_ordering;
1177         wo = min(pwo, wo);
1178         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1179                 wo = WO_drain_io;
1180         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1181                 wo = WO_none;
1182         mdev->write_ordering = wo;
1183         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1184                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1185 }
1186
1187 /**
1188  * drbd_submit_peer_request()
1189  * @mdev:       DRBD device.
1190  * @peer_req:   peer request
1191  * @rw:         flag field, see bio->bi_rw
1192  *
1193  * May spread the pages to multiple bios,
1194  * depending on bio_add_page restrictions.
1195  *
1196  * Returns 0 if all bios have been submitted,
1197  * -ENOMEM if we could not allocate enough bios,
1198  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1199  *  single page to an empty bio (which should never happen and likely indicates
1200  *  that the lower level IO stack is in some way broken). This has been observed
1201  *  on certain Xen deployments.
1202  */
1203 /* TODO allocate from our own bio_set. */
1204 int drbd_submit_peer_request(struct drbd_conf *mdev,
1205                              struct drbd_peer_request *peer_req,
1206                              const unsigned rw, const int fault_type)
1207 {
1208         struct bio *bios = NULL;
1209         struct bio *bio;
1210         struct page *page = peer_req->pages;
1211         sector_t sector = peer_req->i.sector;
1212         unsigned ds = peer_req->i.size;
1213         unsigned n_bios = 0;
1214         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1215         int err = -ENOMEM;
1216
1217         /* In most cases, we will only need one bio.  But in case the lower
1218          * level restrictions happen to be different at this offset on this
1219          * side than those of the sending peer, we may need to submit the
1220          * request in more than one bio.
1221          *
1222          * Plain bio_alloc is good enough here, this is no DRBD internally
1223          * generated bio, but a bio allocated on behalf of the peer.
1224          */
1225 next_bio:
1226         bio = bio_alloc(GFP_NOIO, nr_pages);
1227         if (!bio) {
1228                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1229                 goto fail;
1230         }
1231         /* > peer_req->i.sector, unless this is the first bio */
1232         bio->bi_sector = sector;
1233         bio->bi_bdev = mdev->ldev->backing_bdev;
1234         bio->bi_rw = rw;
1235         bio->bi_private = peer_req;
1236         bio->bi_end_io = drbd_peer_request_endio;
1237
1238         bio->bi_next = bios;
1239         bios = bio;
1240         ++n_bios;
1241
1242         page_chain_for_each(page) {
1243                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1244                 if (!bio_add_page(bio, page, len, 0)) {
1245                         /* A single page must always be possible!
1246                          * But in case it fails anyways,
1247                          * we deal with it, and complain (below). */
1248                         if (bio->bi_vcnt == 0) {
1249                                 dev_err(DEV,
1250                                         "bio_add_page failed for len=%u, "
1251                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1252                                         len, (unsigned long long)bio->bi_sector);
1253                                 err = -ENOSPC;
1254                                 goto fail;
1255                         }
1256                         goto next_bio;
1257                 }
1258                 ds -= len;
1259                 sector += len >> 9;
1260                 --nr_pages;
1261         }
1262         D_ASSERT(page == NULL);
1263         D_ASSERT(ds == 0);
1264
1265         atomic_set(&peer_req->pending_bios, n_bios);
1266         do {
1267                 bio = bios;
1268                 bios = bios->bi_next;
1269                 bio->bi_next = NULL;
1270
1271                 drbd_generic_make_request(mdev, fault_type, bio);
1272         } while (bios);
1273         return 0;
1274
1275 fail:
1276         while (bios) {
1277                 bio = bios;
1278                 bios = bios->bi_next;
1279                 bio_put(bio);
1280         }
1281         return err;
1282 }
1283
1284 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1285                                              struct drbd_peer_request *peer_req)
1286 {
1287         struct drbd_interval *i = &peer_req->i;
1288
1289         drbd_remove_interval(&mdev->write_requests, i);
1290         drbd_clear_interval(i);
1291
1292         /* Wake up any processes waiting for this peer request to complete.  */
1293         if (i->waiting)
1294                 wake_up(&mdev->misc_wait);
1295 }
1296
1297 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1298 {
1299         struct drbd_conf *mdev;
1300         int rv;
1301         struct p_barrier *p = pi->data;
1302         struct drbd_epoch *epoch;
1303
1304         mdev = vnr_to_mdev(tconn, pi->vnr);
1305         if (!mdev)
1306                 return -EIO;
1307
1308         inc_unacked(mdev);
1309
1310         mdev->current_epoch->barrier_nr = p->barrier;
1311         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1312
1313         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1314          * the activity log, which means it would not be resynced in case the
1315          * R_PRIMARY crashes now.
1316          * Therefore we must send the barrier_ack after the barrier request was
1317          * completed. */
1318         switch (mdev->write_ordering) {
1319         case WO_none:
1320                 if (rv == FE_RECYCLED)
1321                         return 0;
1322
1323                 /* receiver context, in the writeout path of the other node.
1324                  * avoid potential distributed deadlock */
1325                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1326                 if (epoch)
1327                         break;
1328                 else
1329                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1330                         /* Fall through */
1331
1332         case WO_bdev_flush:
1333         case WO_drain_io:
1334                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1335                 drbd_flush(mdev);
1336
1337                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1338                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1339                         if (epoch)
1340                                 break;
1341                 }
1342
1343                 epoch = mdev->current_epoch;
1344                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1345
1346                 D_ASSERT(atomic_read(&epoch->active) == 0);
1347                 D_ASSERT(epoch->flags == 0);
1348
1349                 return 0;
1350         default:
1351                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1352                 return -EIO;
1353         }
1354
1355         epoch->flags = 0;
1356         atomic_set(&epoch->epoch_size, 0);
1357         atomic_set(&epoch->active, 0);
1358
1359         spin_lock(&mdev->epoch_lock);
1360         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1361                 list_add(&epoch->list, &mdev->current_epoch->list);
1362                 mdev->current_epoch = epoch;
1363                 mdev->epochs++;
1364         } else {
1365                 /* The current_epoch got recycled while we allocated this one... */
1366                 kfree(epoch);
1367         }
1368         spin_unlock(&mdev->epoch_lock);
1369
1370         return 0;
1371 }
1372
1373 /* used from receive_RSDataReply (recv_resync_read)
1374  * and from receive_Data */
1375 static struct drbd_peer_request *
1376 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1377               int data_size) __must_hold(local)
1378 {
1379         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1380         struct drbd_peer_request *peer_req;
1381         struct page *page;
1382         int dgs, ds, err;
1383         void *dig_in = mdev->tconn->int_dig_in;
1384         void *dig_vv = mdev->tconn->int_dig_vv;
1385         unsigned long *data;
1386
1387         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1388                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1389
1390         if (dgs) {
1391                 /*
1392                  * FIXME: Receive the incoming digest into the receive buffer
1393                  *        here, together with its struct p_data?
1394                  */
1395                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1396                 if (err)
1397                         return NULL;
1398         }
1399
1400         data_size -= dgs;
1401
1402         if (!expect(data_size != 0))
1403                 return NULL;
1404         if (!expect(IS_ALIGNED(data_size, 512)))
1405                 return NULL;
1406         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1407                 return NULL;
1408
1409         /* even though we trust out peer,
1410          * we sometimes have to double check. */
1411         if (sector + (data_size>>9) > capacity) {
1412                 dev_err(DEV, "request from peer beyond end of local disk: "
1413                         "capacity: %llus < sector: %llus + size: %u\n",
1414                         (unsigned long long)capacity,
1415                         (unsigned long long)sector, data_size);
1416                 return NULL;
1417         }
1418
1419         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1420          * "criss-cross" setup, that might cause write-out on some other DRBD,
1421          * which in turn might block on the other node at this very place.  */
1422         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1423         if (!peer_req)
1424                 return NULL;
1425
1426         ds = data_size;
1427         page = peer_req->pages;
1428         page_chain_for_each(page) {
1429                 unsigned len = min_t(int, ds, PAGE_SIZE);
1430                 data = kmap(page);
1431                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1432                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1433                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1434                         data[0] = data[0] ^ (unsigned long)-1;
1435                 }
1436                 kunmap(page);
1437                 if (err) {
1438                         drbd_free_peer_req(mdev, peer_req);
1439                         return NULL;
1440                 }
1441                 ds -= len;
1442         }
1443
1444         if (dgs) {
1445                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1446                 if (memcmp(dig_in, dig_vv, dgs)) {
1447                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1448                                 (unsigned long long)sector, data_size);
1449                         drbd_free_peer_req(mdev, peer_req);
1450                         return NULL;
1451                 }
1452         }
1453         mdev->recv_cnt += data_size>>9;
1454         return peer_req;
1455 }
1456
1457 /* drbd_drain_block() just takes a data block
1458  * out of the socket input buffer, and discards it.
1459  */
1460 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1461 {
1462         struct page *page;
1463         int err = 0;
1464         void *data;
1465
1466         if (!data_size)
1467                 return 0;
1468
1469         page = drbd_alloc_pages(mdev, 1, 1);
1470
1471         data = kmap(page);
1472         while (data_size) {
1473                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1474
1475                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1476                 if (err)
1477                         break;
1478                 data_size -= len;
1479         }
1480         kunmap(page);
1481         drbd_free_pages(mdev, page, 0);
1482         return err;
1483 }
1484
1485 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1486                            sector_t sector, int data_size)
1487 {
1488         struct bio_vec *bvec;
1489         struct bio *bio;
1490         int dgs, err, i, expect;
1491         void *dig_in = mdev->tconn->int_dig_in;
1492         void *dig_vv = mdev->tconn->int_dig_vv;
1493
1494         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1495                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1496
1497         if (dgs) {
1498                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1499                 if (err)
1500                         return err;
1501         }
1502
1503         data_size -= dgs;
1504
1505         /* optimistically update recv_cnt.  if receiving fails below,
1506          * we disconnect anyways, and counters will be reset. */
1507         mdev->recv_cnt += data_size>>9;
1508
1509         bio = req->master_bio;
1510         D_ASSERT(sector == bio->bi_sector);
1511
1512         bio_for_each_segment(bvec, bio, i) {
1513                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1514                 expect = min_t(int, data_size, bvec->bv_len);
1515                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1516                 kunmap(bvec->bv_page);
1517                 if (err)
1518                         return err;
1519                 data_size -= expect;
1520         }
1521
1522         if (dgs) {
1523                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1524                 if (memcmp(dig_in, dig_vv, dgs)) {
1525                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1526                         return -EINVAL;
1527                 }
1528         }
1529
1530         D_ASSERT(data_size == 0);
1531         return 0;
1532 }
1533
1534 /*
1535  * e_end_resync_block() is called in asender context via
1536  * drbd_finish_peer_reqs().
1537  */
1538 static int e_end_resync_block(struct drbd_work *w, int unused)
1539 {
1540         struct drbd_peer_request *peer_req =
1541                 container_of(w, struct drbd_peer_request, w);
1542         struct drbd_conf *mdev = w->mdev;
1543         sector_t sector = peer_req->i.sector;
1544         int err;
1545
1546         D_ASSERT(drbd_interval_empty(&peer_req->i));
1547
1548         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1549                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1550                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1551         } else {
1552                 /* Record failure to sync */
1553                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1554
1555                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1556         }
1557         dec_unacked(mdev);
1558
1559         return err;
1560 }
1561
1562 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1563 {
1564         struct drbd_peer_request *peer_req;
1565
1566         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1567         if (!peer_req)
1568                 goto fail;
1569
1570         dec_rs_pending(mdev);
1571
1572         inc_unacked(mdev);
1573         /* corresponding dec_unacked() in e_end_resync_block()
1574          * respective _drbd_clear_done_ee */
1575
1576         peer_req->w.cb = e_end_resync_block;
1577
1578         spin_lock_irq(&mdev->tconn->req_lock);
1579         list_add(&peer_req->w.list, &mdev->sync_ee);
1580         spin_unlock_irq(&mdev->tconn->req_lock);
1581
1582         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1583         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1584                 return 0;
1585
1586         /* don't care for the reason here */
1587         dev_err(DEV, "submit failed, triggering re-connect\n");
1588         spin_lock_irq(&mdev->tconn->req_lock);
1589         list_del(&peer_req->w.list);
1590         spin_unlock_irq(&mdev->tconn->req_lock);
1591
1592         drbd_free_peer_req(mdev, peer_req);
1593 fail:
1594         put_ldev(mdev);
1595         return -EIO;
1596 }
1597
1598 static struct drbd_request *
1599 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1600              sector_t sector, bool missing_ok, const char *func)
1601 {
1602         struct drbd_request *req;
1603
1604         /* Request object according to our peer */
1605         req = (struct drbd_request *)(unsigned long)id;
1606         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1607                 return req;
1608         if (!missing_ok) {
1609                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1610                         (unsigned long)id, (unsigned long long)sector);
1611         }
1612         return NULL;
1613 }
1614
1615 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1616 {
1617         struct drbd_conf *mdev;
1618         struct drbd_request *req;
1619         sector_t sector;
1620         int err;
1621         struct p_data *p = pi->data;
1622
1623         mdev = vnr_to_mdev(tconn, pi->vnr);
1624         if (!mdev)
1625                 return -EIO;
1626
1627         sector = be64_to_cpu(p->sector);
1628
1629         spin_lock_irq(&mdev->tconn->req_lock);
1630         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1631         spin_unlock_irq(&mdev->tconn->req_lock);
1632         if (unlikely(!req))
1633                 return -EIO;
1634
1635         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1636          * special casing it there for the various failure cases.
1637          * still no race with drbd_fail_pending_reads */
1638         err = recv_dless_read(mdev, req, sector, pi->size);
1639         if (!err)
1640                 req_mod(req, DATA_RECEIVED);
1641         /* else: nothing. handled from drbd_disconnect...
1642          * I don't think we may complete this just yet
1643          * in case we are "on-disconnect: freeze" */
1644
1645         return err;
1646 }
1647
1648 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1649 {
1650         struct drbd_conf *mdev;
1651         sector_t sector;
1652         int err;
1653         struct p_data *p = pi->data;
1654
1655         mdev = vnr_to_mdev(tconn, pi->vnr);
1656         if (!mdev)
1657                 return -EIO;
1658
1659         sector = be64_to_cpu(p->sector);
1660         D_ASSERT(p->block_id == ID_SYNCER);
1661
1662         if (get_ldev(mdev)) {
1663                 /* data is submitted to disk within recv_resync_read.
1664                  * corresponding put_ldev done below on error,
1665                  * or in drbd_peer_request_endio. */
1666                 err = recv_resync_read(mdev, sector, pi->size);
1667         } else {
1668                 if (__ratelimit(&drbd_ratelimit_state))
1669                         dev_err(DEV, "Can not write resync data to local disk.\n");
1670
1671                 err = drbd_drain_block(mdev, pi->size);
1672
1673                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1674         }
1675
1676         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1677
1678         return err;
1679 }
1680
1681 static int w_restart_write(struct drbd_work *w, int cancel)
1682 {
1683         struct drbd_request *req = container_of(w, struct drbd_request, w);
1684         struct drbd_conf *mdev = w->mdev;
1685         struct bio *bio;
1686         unsigned long start_time;
1687         unsigned long flags;
1688
1689         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1690         if (!expect(req->rq_state & RQ_POSTPONED)) {
1691                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1692                 return -EIO;
1693         }
1694         bio = req->master_bio;
1695         start_time = req->start_time;
1696         /* Postponed requests will not have their master_bio completed!  */
1697         __req_mod(req, DISCARD_WRITE, NULL);
1698         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1699
1700         while (__drbd_make_request(mdev, bio, start_time))
1701                 /* retry */ ;
1702         return 0;
1703 }
1704
1705 static void restart_conflicting_writes(struct drbd_conf *mdev,
1706                                        sector_t sector, int size)
1707 {
1708         struct drbd_interval *i;
1709         struct drbd_request *req;
1710
1711         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1712                 if (!i->local)
1713                         continue;
1714                 req = container_of(i, struct drbd_request, i);
1715                 if (req->rq_state & RQ_LOCAL_PENDING ||
1716                     !(req->rq_state & RQ_POSTPONED))
1717                         continue;
1718                 if (expect(list_empty(&req->w.list))) {
1719                         req->w.mdev = mdev;
1720                         req->w.cb = w_restart_write;
1721                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1722                 }
1723         }
1724 }
1725
1726 /*
1727  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1728  */
1729 static int e_end_block(struct drbd_work *w, int cancel)
1730 {
1731         struct drbd_peer_request *peer_req =
1732                 container_of(w, struct drbd_peer_request, w);
1733         struct drbd_conf *mdev = w->mdev;
1734         sector_t sector = peer_req->i.sector;
1735         int err = 0, pcmd;
1736
1737         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1738                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1739                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1740                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1741                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1742                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1743                         err = drbd_send_ack(mdev, pcmd, peer_req);
1744                         if (pcmd == P_RS_WRITE_ACK)
1745                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1746                 } else {
1747                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1748                         /* we expect it to be marked out of sync anyways...
1749                          * maybe assert this?  */
1750                 }
1751                 dec_unacked(mdev);
1752         }
1753         /* we delete from the conflict detection hash _after_ we sent out the
1754          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1755         if (mdev->tconn->net_conf->two_primaries) {
1756                 spin_lock_irq(&mdev->tconn->req_lock);
1757                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1758                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1759                 if (peer_req->flags & EE_RESTART_REQUESTS)
1760                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1761                 spin_unlock_irq(&mdev->tconn->req_lock);
1762         } else
1763                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1764
1765         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1766
1767         return err;
1768 }
1769
1770 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1771 {
1772         struct drbd_conf *mdev = w->mdev;
1773         struct drbd_peer_request *peer_req =
1774                 container_of(w, struct drbd_peer_request, w);
1775         int err;
1776
1777         err = drbd_send_ack(mdev, ack, peer_req);
1778         dec_unacked(mdev);
1779
1780         return err;
1781 }
1782
1783 static int e_send_discard_write(struct drbd_work *w, int unused)
1784 {
1785         return e_send_ack(w, P_DISCARD_WRITE);
1786 }
1787
1788 static int e_send_retry_write(struct drbd_work *w, int unused)
1789 {
1790         struct drbd_tconn *tconn = w->mdev->tconn;
1791
1792         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1793                              P_RETRY_WRITE : P_DISCARD_WRITE);
1794 }
1795
1796 static bool seq_greater(u32 a, u32 b)
1797 {
1798         /*
1799          * We assume 32-bit wrap-around here.
1800          * For 24-bit wrap-around, we would have to shift:
1801          *  a <<= 8; b <<= 8;
1802          */
1803         return (s32)a - (s32)b > 0;
1804 }
1805
1806 static u32 seq_max(u32 a, u32 b)
1807 {
1808         return seq_greater(a, b) ? a : b;
1809 }
1810
1811 static bool need_peer_seq(struct drbd_conf *mdev)
1812 {
1813         struct drbd_tconn *tconn = mdev->tconn;
1814
1815         /*
1816          * We only need to keep track of the last packet_seq number of our peer
1817          * if we are in dual-primary mode and we have the discard flag set; see
1818          * handle_write_conflicts().
1819          */
1820         return tconn->net_conf->two_primaries &&
1821                test_bit(DISCARD_CONCURRENT, &tconn->flags);
1822 }
1823
1824 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1825 {
1826         unsigned int newest_peer_seq;
1827
1828         if (need_peer_seq(mdev)) {
1829                 spin_lock(&mdev->peer_seq_lock);
1830                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1831                 mdev->peer_seq = newest_peer_seq;
1832                 spin_unlock(&mdev->peer_seq_lock);
1833                 /* wake up only if we actually changed mdev->peer_seq */
1834                 if (peer_seq == newest_peer_seq)
1835                         wake_up(&mdev->seq_wait);
1836         }
1837 }
1838
1839 /* Called from receive_Data.
1840  * Synchronize packets on sock with packets on msock.
1841  *
1842  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1843  * packet traveling on msock, they are still processed in the order they have
1844  * been sent.
1845  *
1846  * Note: we don't care for Ack packets overtaking P_DATA packets.
1847  *
1848  * In case packet_seq is larger than mdev->peer_seq number, there are
1849  * outstanding packets on the msock. We wait for them to arrive.
1850  * In case we are the logically next packet, we update mdev->peer_seq
1851  * ourselves. Correctly handles 32bit wrap around.
1852  *
1853  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1854  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1855  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1856  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1857  *
1858  * returns 0 if we may process the packet,
1859  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1860 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1861 {
1862         DEFINE_WAIT(wait);
1863         long timeout;
1864         int ret;
1865
1866         if (!need_peer_seq(mdev))
1867                 return 0;
1868
1869         spin_lock(&mdev->peer_seq_lock);
1870         for (;;) {
1871                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1872                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1873                         ret = 0;
1874                         break;
1875                 }
1876                 if (signal_pending(current)) {
1877                         ret = -ERESTARTSYS;
1878                         break;
1879                 }
1880                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1881                 spin_unlock(&mdev->peer_seq_lock);
1882                 rcu_read_lock();
1883                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1884                 rcu_read_unlock();
1885                 timeout = schedule_timeout(timeout);
1886                 spin_lock(&mdev->peer_seq_lock);
1887                 if (!timeout) {
1888                         ret = -ETIMEDOUT;
1889                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1890                         break;
1891                 }
1892         }
1893         spin_unlock(&mdev->peer_seq_lock);
1894         finish_wait(&mdev->seq_wait, &wait);
1895         return ret;
1896 }
1897
1898 /* see also bio_flags_to_wire()
1899  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1900  * flags and back. We may replicate to other kernel versions. */
1901 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1902 {
1903         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1904                 (dpf & DP_FUA ? REQ_FUA : 0) |
1905                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1906                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1907 }
1908
1909 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1910                                     unsigned int size)
1911 {
1912         struct drbd_interval *i;
1913
1914     repeat:
1915         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1916                 struct drbd_request *req;
1917                 struct bio_and_error m;
1918
1919                 if (!i->local)
1920                         continue;
1921                 req = container_of(i, struct drbd_request, i);
1922                 if (!(req->rq_state & RQ_POSTPONED))
1923                         continue;
1924                 req->rq_state &= ~RQ_POSTPONED;
1925                 __req_mod(req, NEG_ACKED, &m);
1926                 spin_unlock_irq(&mdev->tconn->req_lock);
1927                 if (m.bio)
1928                         complete_master_bio(mdev, &m);
1929                 spin_lock_irq(&mdev->tconn->req_lock);
1930                 goto repeat;
1931         }
1932 }
1933
1934 static int handle_write_conflicts(struct drbd_conf *mdev,
1935                                   struct drbd_peer_request *peer_req)
1936 {
1937         struct drbd_tconn *tconn = mdev->tconn;
1938         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1939         sector_t sector = peer_req->i.sector;
1940         const unsigned int size = peer_req->i.size;
1941         struct drbd_interval *i;
1942         bool equal;
1943         int err;
1944
1945         /*
1946          * Inserting the peer request into the write_requests tree will prevent
1947          * new conflicting local requests from being added.
1948          */
1949         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1950
1951     repeat:
1952         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1953                 if (i == &peer_req->i)
1954                         continue;
1955
1956                 if (!i->local) {
1957                         /*
1958                          * Our peer has sent a conflicting remote request; this
1959                          * should not happen in a two-node setup.  Wait for the
1960                          * earlier peer request to complete.
1961                          */
1962                         err = drbd_wait_misc(mdev, i);
1963                         if (err)
1964                                 goto out;
1965                         goto repeat;
1966                 }
1967
1968                 equal = i->sector == sector && i->size == size;
1969                 if (resolve_conflicts) {
1970                         /*
1971                          * If the peer request is fully contained within the
1972                          * overlapping request, it can be discarded; otherwise,
1973                          * it will be retried once all overlapping requests
1974                          * have completed.
1975                          */
1976                         bool discard = i->sector <= sector && i->sector +
1977                                        (i->size >> 9) >= sector + (size >> 9);
1978
1979                         if (!equal)
1980                                 dev_alert(DEV, "Concurrent writes detected: "
1981                                                "local=%llus +%u, remote=%llus +%u, "
1982                                                "assuming %s came first\n",
1983                                           (unsigned long long)i->sector, i->size,
1984                                           (unsigned long long)sector, size,
1985                                           discard ? "local" : "remote");
1986
1987                         inc_unacked(mdev);
1988                         peer_req->w.cb = discard ? e_send_discard_write :
1989                                                    e_send_retry_write;
1990                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1991                         wake_asender(mdev->tconn);
1992
1993                         err = -ENOENT;
1994                         goto out;
1995                 } else {
1996                         struct drbd_request *req =
1997                                 container_of(i, struct drbd_request, i);
1998
1999                         if (!equal)
2000                                 dev_alert(DEV, "Concurrent writes detected: "
2001                                                "local=%llus +%u, remote=%llus +%u\n",
2002                                           (unsigned long long)i->sector, i->size,
2003                                           (unsigned long long)sector, size);
2004
2005                         if (req->rq_state & RQ_LOCAL_PENDING ||
2006                             !(req->rq_state & RQ_POSTPONED)) {
2007                                 /*
2008                                  * Wait for the node with the discard flag to
2009                                  * decide if this request will be discarded or
2010                                  * retried.  Requests that are discarded will
2011                                  * disappear from the write_requests tree.
2012                                  *
2013                                  * In addition, wait for the conflicting
2014                                  * request to finish locally before submitting
2015                                  * the conflicting peer request.
2016                                  */
2017                                 err = drbd_wait_misc(mdev, &req->i);
2018                                 if (err) {
2019                                         _conn_request_state(mdev->tconn,
2020                                                             NS(conn, C_TIMEOUT),
2021                                                             CS_HARD);
2022                                         fail_postponed_requests(mdev, sector, size);
2023                                         goto out;
2024                                 }
2025                                 goto repeat;
2026                         }
2027                         /*
2028                          * Remember to restart the conflicting requests after
2029                          * the new peer request has completed.
2030                          */
2031                         peer_req->flags |= EE_RESTART_REQUESTS;
2032                 }
2033         }
2034         err = 0;
2035
2036     out:
2037         if (err)
2038                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2039         return err;
2040 }
2041
2042 /* mirrored write */
2043 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2044 {
2045         struct drbd_conf *mdev;
2046         sector_t sector;
2047         struct drbd_peer_request *peer_req;
2048         struct p_data *p = pi->data;
2049         u32 peer_seq = be32_to_cpu(p->seq_num);
2050         int rw = WRITE;
2051         u32 dp_flags;
2052         int err;
2053
2054         mdev = vnr_to_mdev(tconn, pi->vnr);
2055         if (!mdev)
2056                 return -EIO;
2057
2058         if (!get_ldev(mdev)) {
2059                 int err2;
2060
2061                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2062                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2063                 atomic_inc(&mdev->current_epoch->epoch_size);
2064                 err2 = drbd_drain_block(mdev, pi->size);
2065                 if (!err)
2066                         err = err2;
2067                 return err;
2068         }
2069
2070         /*
2071          * Corresponding put_ldev done either below (on various errors), or in
2072          * drbd_peer_request_endio, if we successfully submit the data at the
2073          * end of this function.
2074          */
2075
2076         sector = be64_to_cpu(p->sector);
2077         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2078         if (!peer_req) {
2079                 put_ldev(mdev);
2080                 return -EIO;
2081         }
2082
2083         peer_req->w.cb = e_end_block;
2084
2085         dp_flags = be32_to_cpu(p->dp_flags);
2086         rw |= wire_flags_to_bio(mdev, dp_flags);
2087
2088         if (dp_flags & DP_MAY_SET_IN_SYNC)
2089                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2090
2091         spin_lock(&mdev->epoch_lock);
2092         peer_req->epoch = mdev->current_epoch;
2093         atomic_inc(&peer_req->epoch->epoch_size);
2094         atomic_inc(&peer_req->epoch->active);
2095         spin_unlock(&mdev->epoch_lock);
2096
2097         if (mdev->tconn->net_conf->two_primaries) {
2098                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2099                 if (err)
2100                         goto out_interrupted;
2101                 spin_lock_irq(&mdev->tconn->req_lock);
2102                 err = handle_write_conflicts(mdev, peer_req);
2103                 if (err) {
2104                         spin_unlock_irq(&mdev->tconn->req_lock);
2105                         if (err == -ENOENT) {
2106                                 put_ldev(mdev);
2107                                 return 0;
2108                         }
2109                         goto out_interrupted;
2110                 }
2111         } else
2112                 spin_lock_irq(&mdev->tconn->req_lock);
2113         list_add(&peer_req->w.list, &mdev->active_ee);
2114         spin_unlock_irq(&mdev->tconn->req_lock);
2115
2116         if (mdev->tconn->agreed_pro_version < 100) {
2117                 rcu_read_lock();
2118                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2119                 case DRBD_PROT_C:
2120                         dp_flags |= DP_SEND_WRITE_ACK;
2121                         break;
2122                 case DRBD_PROT_B:
2123                         dp_flags |= DP_SEND_RECEIVE_ACK;
2124                         break;
2125                 }
2126                 rcu_read_unlock();
2127         }
2128
2129         if (dp_flags & DP_SEND_WRITE_ACK) {
2130                 peer_req->flags |= EE_SEND_WRITE_ACK;
2131                 inc_unacked(mdev);
2132                 /* corresponding dec_unacked() in e_end_block()
2133                  * respective _drbd_clear_done_ee */
2134         }
2135
2136         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2137                 /* I really don't like it that the receiver thread
2138                  * sends on the msock, but anyways */
2139                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2140         }
2141
2142         if (mdev->state.pdsk < D_INCONSISTENT) {
2143                 /* In case we have the only disk of the cluster, */
2144                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2145                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2146                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2147                 drbd_al_begin_io(mdev, &peer_req->i);
2148         }
2149
2150         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2151         if (!err)
2152                 return 0;
2153
2154         /* don't care for the reason here */
2155         dev_err(DEV, "submit failed, triggering re-connect\n");
2156         spin_lock_irq(&mdev->tconn->req_lock);
2157         list_del(&peer_req->w.list);
2158         drbd_remove_epoch_entry_interval(mdev, peer_req);
2159         spin_unlock_irq(&mdev->tconn->req_lock);
2160         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2161                 drbd_al_complete_io(mdev, &peer_req->i);
2162
2163 out_interrupted:
2164         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2165         put_ldev(mdev);
2166         drbd_free_peer_req(mdev, peer_req);
2167         return err;
2168 }
2169
2170 /* We may throttle resync, if the lower device seems to be busy,
2171  * and current sync rate is above c_min_rate.
2172  *
2173  * To decide whether or not the lower device is busy, we use a scheme similar
2174  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2175  * (more than 64 sectors) of activity we cannot account for with our own resync
2176  * activity, it obviously is "busy".
2177  *
2178  * The current sync rate used here uses only the most recent two step marks,
2179  * to have a short time average so we can react faster.
2180  */
2181 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2182 {
2183         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2184         unsigned long db, dt, dbdt;
2185         struct lc_element *tmp;
2186         int curr_events;
2187         int throttle = 0;
2188
2189         /* feature disabled? */
2190         if (mdev->ldev->dc.c_min_rate == 0)
2191                 return 0;
2192
2193         spin_lock_irq(&mdev->al_lock);
2194         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2195         if (tmp) {
2196                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2197                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2198                         spin_unlock_irq(&mdev->al_lock);
2199                         return 0;
2200                 }
2201                 /* Do not slow down if app IO is already waiting for this extent */
2202         }
2203         spin_unlock_irq(&mdev->al_lock);
2204
2205         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2206                       (int)part_stat_read(&disk->part0, sectors[1]) -
2207                         atomic_read(&mdev->rs_sect_ev);
2208
2209         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2210                 unsigned long rs_left;
2211                 int i;
2212
2213                 mdev->rs_last_events = curr_events;
2214
2215                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2216                  * approx. */
2217                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2218
2219                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2220                         rs_left = mdev->ov_left;
2221                 else
2222                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2223
2224                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2225                 if (!dt)
2226                         dt++;
2227                 db = mdev->rs_mark_left[i] - rs_left;
2228                 dbdt = Bit2KB(db/dt);
2229
2230                 if (dbdt > mdev->ldev->dc.c_min_rate)
2231                         throttle = 1;
2232         }
2233         return throttle;
2234 }
2235
2236
2237 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2238 {
2239         struct drbd_conf *mdev;
2240         sector_t sector;
2241         sector_t capacity;
2242         struct drbd_peer_request *peer_req;
2243         struct digest_info *di = NULL;
2244         int size, verb;
2245         unsigned int fault_type;
2246         struct p_block_req *p = pi->data;
2247
2248         mdev = vnr_to_mdev(tconn, pi->vnr);
2249         if (!mdev)
2250                 return -EIO;
2251         capacity = drbd_get_capacity(mdev->this_bdev);
2252
2253         sector = be64_to_cpu(p->sector);
2254         size   = be32_to_cpu(p->blksize);
2255
2256         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2257                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2258                                 (unsigned long long)sector, size);
2259                 return -EINVAL;
2260         }
2261         if (sector + (size>>9) > capacity) {
2262                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2263                                 (unsigned long long)sector, size);
2264                 return -EINVAL;
2265         }
2266
2267         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2268                 verb = 1;
2269                 switch (pi->cmd) {
2270                 case P_DATA_REQUEST:
2271                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2272                         break;
2273                 case P_RS_DATA_REQUEST:
2274                 case P_CSUM_RS_REQUEST:
2275                 case P_OV_REQUEST:
2276                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2277                         break;
2278                 case P_OV_REPLY:
2279                         verb = 0;
2280                         dec_rs_pending(mdev);
2281                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2282                         break;
2283                 default:
2284                         BUG();
2285                 }
2286                 if (verb && __ratelimit(&drbd_ratelimit_state))
2287                         dev_err(DEV, "Can not satisfy peer's read request, "
2288                             "no local data.\n");
2289
2290                 /* drain possibly payload */
2291                 return drbd_drain_block(mdev, pi->size);
2292         }
2293
2294         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2295          * "criss-cross" setup, that might cause write-out on some other DRBD,
2296          * which in turn might block on the other node at this very place.  */
2297         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2298         if (!peer_req) {
2299                 put_ldev(mdev);
2300                 return -ENOMEM;
2301         }
2302
2303         switch (pi->cmd) {
2304         case P_DATA_REQUEST:
2305                 peer_req->w.cb = w_e_end_data_req;
2306                 fault_type = DRBD_FAULT_DT_RD;
2307                 /* application IO, don't drbd_rs_begin_io */
2308                 goto submit;
2309
2310         case P_RS_DATA_REQUEST:
2311                 peer_req->w.cb = w_e_end_rsdata_req;
2312                 fault_type = DRBD_FAULT_RS_RD;
2313                 /* used in the sector offset progress display */
2314                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2315                 break;
2316
2317         case P_OV_REPLY:
2318         case P_CSUM_RS_REQUEST:
2319                 fault_type = DRBD_FAULT_RS_RD;
2320                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2321                 if (!di)
2322                         goto out_free_e;
2323
2324                 di->digest_size = pi->size;
2325                 di->digest = (((char *)di)+sizeof(struct digest_info));
2326
2327                 peer_req->digest = di;
2328                 peer_req->flags |= EE_HAS_DIGEST;
2329
2330                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2331                         goto out_free_e;
2332
2333                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2334                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2335                         peer_req->w.cb = w_e_end_csum_rs_req;
2336                         /* used in the sector offset progress display */
2337                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2338                 } else if (pi->cmd == P_OV_REPLY) {
2339                         /* track progress, we may need to throttle */
2340                         atomic_add(size >> 9, &mdev->rs_sect_in);
2341                         peer_req->w.cb = w_e_end_ov_reply;
2342                         dec_rs_pending(mdev);
2343                         /* drbd_rs_begin_io done when we sent this request,
2344                          * but accounting still needs to be done. */
2345                         goto submit_for_resync;
2346                 }
2347                 break;
2348
2349         case P_OV_REQUEST:
2350                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2351                     mdev->tconn->agreed_pro_version >= 90) {
2352                         unsigned long now = jiffies;
2353                         int i;
2354                         mdev->ov_start_sector = sector;
2355                         mdev->ov_position = sector;
2356                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2357                         mdev->rs_total = mdev->ov_left;
2358                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2359                                 mdev->rs_mark_left[i] = mdev->ov_left;
2360                                 mdev->rs_mark_time[i] = now;
2361                         }
2362                         dev_info(DEV, "Online Verify start sector: %llu\n",
2363                                         (unsigned long long)sector);
2364                 }
2365                 peer_req->w.cb = w_e_end_ov_req;
2366                 fault_type = DRBD_FAULT_RS_RD;
2367                 break;
2368
2369         default:
2370                 BUG();
2371         }
2372
2373         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2374          * wrt the receiver, but it is not as straightforward as it may seem.
2375          * Various places in the resync start and stop logic assume resync
2376          * requests are processed in order, requeuing this on the worker thread
2377          * introduces a bunch of new code for synchronization between threads.
2378          *
2379          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2380          * "forever", throttling after drbd_rs_begin_io will lock that extent
2381          * for application writes for the same time.  For now, just throttle
2382          * here, where the rest of the code expects the receiver to sleep for
2383          * a while, anyways.
2384          */
2385
2386         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2387          * this defers syncer requests for some time, before letting at least
2388          * on request through.  The resync controller on the receiving side
2389          * will adapt to the incoming rate accordingly.
2390          *
2391          * We cannot throttle here if remote is Primary/SyncTarget:
2392          * we would also throttle its application reads.
2393          * In that case, throttling is done on the SyncTarget only.
2394          */
2395         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2396                 schedule_timeout_uninterruptible(HZ/10);
2397         if (drbd_rs_begin_io(mdev, sector))
2398                 goto out_free_e;
2399
2400 submit_for_resync:
2401         atomic_add(size >> 9, &mdev->rs_sect_ev);
2402
2403 submit:
2404         inc_unacked(mdev);
2405         spin_lock_irq(&mdev->tconn->req_lock);
2406         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2407         spin_unlock_irq(&mdev->tconn->req_lock);
2408
2409         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2410                 return 0;
2411
2412         /* don't care for the reason here */
2413         dev_err(DEV, "submit failed, triggering re-connect\n");
2414         spin_lock_irq(&mdev->tconn->req_lock);
2415         list_del(&peer_req->w.list);
2416         spin_unlock_irq(&mdev->tconn->req_lock);
2417         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2418
2419 out_free_e:
2420         put_ldev(mdev);
2421         drbd_free_peer_req(mdev, peer_req);
2422         return -EIO;
2423 }
2424
2425 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2426 {
2427         int self, peer, rv = -100;
2428         unsigned long ch_self, ch_peer;
2429         enum drbd_after_sb_p after_sb_0p;
2430
2431         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2432         peer = mdev->p_uuid[UI_BITMAP] & 1;
2433
2434         ch_peer = mdev->p_uuid[UI_SIZE];
2435         ch_self = mdev->comm_bm_set;
2436
2437         rcu_read_lock();
2438         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2439         rcu_read_unlock();
2440         switch (after_sb_0p) {
2441         case ASB_CONSENSUS:
2442         case ASB_DISCARD_SECONDARY:
2443         case ASB_CALL_HELPER:
2444         case ASB_VIOLENTLY:
2445                 dev_err(DEV, "Configuration error.\n");
2446                 break;
2447         case ASB_DISCONNECT:
2448                 break;
2449         case ASB_DISCARD_YOUNGER_PRI:
2450                 if (self == 0 && peer == 1) {
2451                         rv = -1;
2452                         break;
2453                 }
2454                 if (self == 1 && peer == 0) {
2455                         rv =  1;
2456                         break;
2457                 }
2458                 /* Else fall through to one of the other strategies... */
2459         case ASB_DISCARD_OLDER_PRI:
2460                 if (self == 0 && peer == 1) {
2461                         rv = 1;
2462                         break;
2463                 }
2464                 if (self == 1 && peer == 0) {
2465                         rv = -1;
2466                         break;
2467                 }
2468                 /* Else fall through to one of the other strategies... */
2469                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2470                      "Using discard-least-changes instead\n");
2471         case ASB_DISCARD_ZERO_CHG:
2472                 if (ch_peer == 0 && ch_self == 0) {
2473                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2474                                 ? -1 : 1;
2475                         break;
2476                 } else {
2477                         if (ch_peer == 0) { rv =  1; break; }
2478                         if (ch_self == 0) { rv = -1; break; }
2479                 }
2480                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2481                         break;
2482         case ASB_DISCARD_LEAST_CHG:
2483                 if      (ch_self < ch_peer)
2484                         rv = -1;
2485                 else if (ch_self > ch_peer)
2486                         rv =  1;
2487                 else /* ( ch_self == ch_peer ) */
2488                      /* Well, then use something else. */
2489                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2490                                 ? -1 : 1;
2491                 break;
2492         case ASB_DISCARD_LOCAL:
2493                 rv = -1;
2494                 break;
2495         case ASB_DISCARD_REMOTE:
2496                 rv =  1;
2497         }
2498
2499         return rv;
2500 }
2501
2502 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2503 {
2504         int hg, rv = -100;
2505         enum drbd_after_sb_p after_sb_1p;
2506
2507         rcu_read_lock();
2508         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2509         rcu_read_unlock();
2510         switch (after_sb_1p) {
2511         case ASB_DISCARD_YOUNGER_PRI:
2512         case ASB_DISCARD_OLDER_PRI:
2513         case ASB_DISCARD_LEAST_CHG:
2514         case ASB_DISCARD_LOCAL:
2515         case ASB_DISCARD_REMOTE:
2516         case ASB_DISCARD_ZERO_CHG:
2517                 dev_err(DEV, "Configuration error.\n");
2518                 break;
2519         case ASB_DISCONNECT:
2520                 break;
2521         case ASB_CONSENSUS:
2522                 hg = drbd_asb_recover_0p(mdev);
2523                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2524                         rv = hg;
2525                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2526                         rv = hg;
2527                 break;
2528         case ASB_VIOLENTLY:
2529                 rv = drbd_asb_recover_0p(mdev);
2530                 break;
2531         case ASB_DISCARD_SECONDARY:
2532                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2533         case ASB_CALL_HELPER:
2534                 hg = drbd_asb_recover_0p(mdev);
2535                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2536                         enum drbd_state_rv rv2;
2537
2538                         drbd_set_role(mdev, R_SECONDARY, 0);
2539                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2540                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2541                           * we do not need to wait for the after state change work either. */
2542                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2543                         if (rv2 != SS_SUCCESS) {
2544                                 drbd_khelper(mdev, "pri-lost-after-sb");
2545                         } else {
2546                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2547                                 rv = hg;
2548                         }
2549                 } else
2550                         rv = hg;
2551         }
2552
2553         return rv;
2554 }
2555
2556 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2557 {
2558         int hg, rv = -100;
2559         enum drbd_after_sb_p after_sb_2p;
2560
2561         rcu_read_lock();
2562         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2563         rcu_read_unlock();
2564         switch (after_sb_2p) {
2565         case ASB_DISCARD_YOUNGER_PRI:
2566         case ASB_DISCARD_OLDER_PRI:
2567         case ASB_DISCARD_LEAST_CHG:
2568         case ASB_DISCARD_LOCAL:
2569         case ASB_DISCARD_REMOTE:
2570         case ASB_CONSENSUS:
2571         case ASB_DISCARD_SECONDARY:
2572         case ASB_DISCARD_ZERO_CHG:
2573                 dev_err(DEV, "Configuration error.\n");
2574                 break;
2575         case ASB_VIOLENTLY:
2576                 rv = drbd_asb_recover_0p(mdev);
2577                 break;
2578         case ASB_DISCONNECT:
2579                 break;
2580         case ASB_CALL_HELPER:
2581                 hg = drbd_asb_recover_0p(mdev);
2582                 if (hg == -1) {
2583                         enum drbd_state_rv rv2;
2584
2585                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2586                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2587                           * we do not need to wait for the after state change work either. */
2588                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2589                         if (rv2 != SS_SUCCESS) {
2590                                 drbd_khelper(mdev, "pri-lost-after-sb");
2591                         } else {
2592                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2593                                 rv = hg;
2594                         }
2595                 } else
2596                         rv = hg;
2597         }
2598
2599         return rv;
2600 }
2601
2602 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2603                            u64 bits, u64 flags)
2604 {
2605         if (!uuid) {
2606                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2607                 return;
2608         }
2609         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2610              text,
2611              (unsigned long long)uuid[UI_CURRENT],
2612              (unsigned long long)uuid[UI_BITMAP],
2613              (unsigned long long)uuid[UI_HISTORY_START],
2614              (unsigned long long)uuid[UI_HISTORY_END],
2615              (unsigned long long)bits,
2616              (unsigned long long)flags);
2617 }
2618
2619 /*
2620   100   after split brain try auto recover
2621     2   C_SYNC_SOURCE set BitMap
2622     1   C_SYNC_SOURCE use BitMap
2623     0   no Sync
2624    -1   C_SYNC_TARGET use BitMap
2625    -2   C_SYNC_TARGET set BitMap
2626  -100   after split brain, disconnect
2627 -1000   unrelated data
2628 -1091   requires proto 91
2629 -1096   requires proto 96
2630  */
2631 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2632 {
2633         u64 self, peer;
2634         int i, j;
2635
2636         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2637         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2638
2639         *rule_nr = 10;
2640         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2641                 return 0;
2642
2643         *rule_nr = 20;
2644         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2645              peer != UUID_JUST_CREATED)
2646                 return -2;
2647
2648         *rule_nr = 30;
2649         if (self != UUID_JUST_CREATED &&
2650             (peer == UUID_JUST_CREATED || peer == (u64)0))
2651                 return 2;
2652
2653         if (self == peer) {
2654                 int rct, dc; /* roles at crash time */
2655
2656                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2657
2658                         if (mdev->tconn->agreed_pro_version < 91)
2659                                 return -1091;
2660
2661                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2662                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2663                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2664                                 drbd_uuid_set_bm(mdev, 0UL);
2665
2666                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2667                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2668                                 *rule_nr = 34;
2669                         } else {
2670                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2671                                 *rule_nr = 36;
2672                         }
2673
2674                         return 1;
2675                 }
2676
2677                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2678
2679                         if (mdev->tconn->agreed_pro_version < 91)
2680                                 return -1091;
2681
2682                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2683                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2684                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2685
2686                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2687                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2688                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2689
2690                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2691                                 *rule_nr = 35;
2692                         } else {
2693                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2694                                 *rule_nr = 37;
2695                         }
2696
2697                         return -1;
2698                 }
2699
2700                 /* Common power [off|failure] */
2701                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2702                         (mdev->p_uuid[UI_FLAGS] & 2);
2703                 /* lowest bit is set when we were primary,
2704                  * next bit (weight 2) is set when peer was primary */
2705                 *rule_nr = 40;
2706
2707                 switch (rct) {
2708                 case 0: /* !self_pri && !peer_pri */ return 0;
2709                 case 1: /*  self_pri && !peer_pri */ return 1;
2710                 case 2: /* !self_pri &&  peer_pri */ return -1;
2711                 case 3: /*  self_pri &&  peer_pri */
2712                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2713                         return dc ? -1 : 1;
2714                 }
2715         }
2716
2717         *rule_nr = 50;
2718         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2719         if (self == peer)
2720                 return -1;
2721
2722         *rule_nr = 51;
2723         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2724         if (self == peer) {
2725                 if (mdev->tconn->agreed_pro_version < 96 ?
2726                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2727                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2728                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2729                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2730                            resync as sync source modifications of the peer's UUIDs. */
2731
2732                         if (mdev->tconn->agreed_pro_version < 91)
2733                                 return -1091;
2734
2735                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2736                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2737
2738                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2739                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2740
2741                         return -1;
2742                 }
2743         }
2744
2745         *rule_nr = 60;
2746         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2747         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2748                 peer = mdev->p_uuid[i] & ~((u64)1);
2749                 if (self == peer)
2750                         return -2;
2751         }
2752
2753         *rule_nr = 70;
2754         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2755         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2756         if (self == peer)
2757                 return 1;
2758
2759         *rule_nr = 71;
2760         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2761         if (self == peer) {
2762                 if (mdev->tconn->agreed_pro_version < 96 ?
2763                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2764                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2765                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2766                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2767                            resync as sync source modifications of our UUIDs. */
2768
2769                         if (mdev->tconn->agreed_pro_version < 91)
2770                                 return -1091;
2771
2772                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2773                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2774
2775                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2776                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2777                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2778
2779                         return 1;
2780                 }
2781         }
2782
2783
2784         *rule_nr = 80;
2785         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2786         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2787                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2788                 if (self == peer)
2789                         return 2;
2790         }
2791
2792         *rule_nr = 90;
2793         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2794         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2795         if (self == peer && self != ((u64)0))
2796                 return 100;
2797
2798         *rule_nr = 100;
2799         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2800                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2801                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2802                         peer = mdev->p_uuid[j] & ~((u64)1);
2803                         if (self == peer)
2804                                 return -100;
2805                 }
2806         }
2807
2808         return -1000;
2809 }
2810
2811 /* drbd_sync_handshake() returns the new conn state on success, or
2812    CONN_MASK (-1) on failure.
2813  */
2814 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2815                                            enum drbd_disk_state peer_disk) __must_hold(local)
2816 {
2817         enum drbd_conns rv = C_MASK;
2818         enum drbd_disk_state mydisk;
2819         struct net_conf *nc;
2820         int hg, rule_nr, rr_conflict, dry_run;
2821
2822         mydisk = mdev->state.disk;
2823         if (mydisk == D_NEGOTIATING)
2824                 mydisk = mdev->new_state_tmp.disk;
2825
2826         dev_info(DEV, "drbd_sync_handshake:\n");
2827         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2828         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2829                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2830
2831         hg = drbd_uuid_compare(mdev, &rule_nr);
2832
2833         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2834
2835         if (hg == -1000) {
2836                 dev_alert(DEV, "Unrelated data, aborting!\n");
2837                 return C_MASK;
2838         }
2839         if (hg < -1000) {
2840                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2841                 return C_MASK;
2842         }
2843
2844         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2845             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2846                 int f = (hg == -100) || abs(hg) == 2;
2847                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2848                 if (f)
2849                         hg = hg*2;
2850                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2851                      hg > 0 ? "source" : "target");
2852         }
2853
2854         if (abs(hg) == 100)
2855                 drbd_khelper(mdev, "initial-split-brain");
2856
2857         rcu_read_lock();
2858         nc = rcu_dereference(mdev->tconn->net_conf);
2859
2860         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2861                 int pcount = (mdev->state.role == R_PRIMARY)
2862                            + (peer_role == R_PRIMARY);
2863                 int forced = (hg == -100);
2864
2865                 switch (pcount) {
2866                 case 0:
2867                         hg = drbd_asb_recover_0p(mdev);
2868                         break;
2869                 case 1:
2870                         hg = drbd_asb_recover_1p(mdev);
2871                         break;
2872                 case 2:
2873                         hg = drbd_asb_recover_2p(mdev);
2874                         break;
2875                 }
2876                 if (abs(hg) < 100) {
2877                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2878                              "automatically solved. Sync from %s node\n",
2879                              pcount, (hg < 0) ? "peer" : "this");
2880                         if (forced) {
2881                                 dev_warn(DEV, "Doing a full sync, since"
2882                                      " UUIDs where ambiguous.\n");
2883                                 hg = hg*2;
2884                         }
2885                 }
2886         }
2887
2888         if (hg == -100) {
2889                 if (nc->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2890                         hg = -1;
2891                 if (!nc->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2892                         hg = 1;
2893
2894                 if (abs(hg) < 100)
2895                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2896                              "Sync from %s node\n",
2897                              (hg < 0) ? "peer" : "this");
2898         }
2899         rr_conflict = nc->rr_conflict;
2900         dry_run = nc->dry_run;
2901         rcu_read_unlock();
2902
2903         if (hg == -100) {
2904                 /* FIXME this log message is not correct if we end up here
2905                  * after an attempted attach on a diskless node.
2906                  * We just refuse to attach -- well, we drop the "connection"
2907                  * to that disk, in a way... */
2908                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2909                 drbd_khelper(mdev, "split-brain");
2910                 return C_MASK;
2911         }
2912
2913         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2914                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2915                 return C_MASK;
2916         }
2917
2918         if (hg < 0 && /* by intention we do not use mydisk here. */
2919             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2920                 switch (rr_conflict) {
2921                 case ASB_CALL_HELPER:
2922                         drbd_khelper(mdev, "pri-lost");
2923                         /* fall through */
2924                 case ASB_DISCONNECT:
2925                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2926                         return C_MASK;
2927                 case ASB_VIOLENTLY:
2928                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2929                              "assumption\n");
2930                 }
2931         }
2932
2933         if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2934                 if (hg == 0)
2935                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2936                 else
2937                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2938                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2939                                  abs(hg) >= 2 ? "full" : "bit-map based");
2940                 return C_MASK;
2941         }
2942
2943         if (abs(hg) >= 2) {
2944                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2945                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2946                                         BM_LOCKED_SET_ALLOWED))
2947                         return C_MASK;
2948         }
2949
2950         if (hg > 0) { /* become sync source. */
2951                 rv = C_WF_BITMAP_S;
2952         } else if (hg < 0) { /* become sync target */
2953                 rv = C_WF_BITMAP_T;
2954         } else {
2955                 rv = C_CONNECTED;
2956                 if (drbd_bm_total_weight(mdev)) {
2957                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2958                              drbd_bm_total_weight(mdev));
2959                 }
2960         }
2961
2962         return rv;
2963 }
2964
2965 /* returns 1 if invalid */
2966 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2967 {
2968         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2969         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2970             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2971                 return 0;
2972
2973         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2974         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2975             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2976                 return 1;
2977
2978         /* everything else is valid if they are equal on both sides. */
2979         if (peer == self)
2980                 return 0;
2981
2982         /* everything es is invalid. */
2983         return 1;
2984 }
2985
2986 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2987 {
2988         struct p_protocol *p = pi->data;
2989         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2990         int p_want_lose, p_two_primaries, cf;
2991         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2992         unsigned char *my_alg;
2993         struct net_conf *nc;
2994
2995         p_proto         = be32_to_cpu(p->protocol);
2996         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2997         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2998         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2999         p_two_primaries = be32_to_cpu(p->two_primaries);
3000         cf              = be32_to_cpu(p->conn_flags);
3001         p_want_lose = cf & CF_WANT_LOSE;
3002
3003         clear_bit(CONN_DRY_RUN, &tconn->flags);
3004
3005         if (cf & CF_DRY_RUN)
3006                 set_bit(CONN_DRY_RUN, &tconn->flags);
3007
3008         rcu_read_lock();
3009         nc = rcu_dereference(tconn->net_conf);
3010
3011         if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
3012                 conn_err(tconn, "incompatible communication protocols\n");
3013                 goto disconnect_rcu_unlock;
3014         }
3015
3016         if (cmp_after_sb(p_after_sb_0p, nc->after_sb_0p)) {
3017                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3018                 goto disconnect_rcu_unlock;
3019         }
3020
3021         if (cmp_after_sb(p_after_sb_1p, nc->after_sb_1p)) {
3022                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3023                 goto disconnect_rcu_unlock;
3024         }
3025
3026         if (cmp_after_sb(p_after_sb_2p, nc->after_sb_2p)) {
3027                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3028                 goto disconnect_rcu_unlock;
3029         }
3030
3031         if (p_want_lose && nc->want_lose) {
3032                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
3033                 goto disconnect_rcu_unlock;
3034         }
3035
3036         if (p_two_primaries != nc->two_primaries) {
3037                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3038                 goto disconnect_rcu_unlock;
3039         }
3040
3041         my_alg = nc->integrity_alg;
3042         rcu_read_unlock();
3043
3044         if (tconn->agreed_pro_version >= 87) {
3045                 int err;
3046
3047                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
3048                 if (err)
3049                         return err;
3050
3051                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
3052                 if (strcmp(p_integrity_alg, my_alg)) {
3053                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
3054                         goto disconnect;
3055                 }
3056                 conn_info(tconn, "data-integrity-alg: %s\n",
3057                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
3058         }
3059
3060         return 0;
3061
3062 disconnect_rcu_unlock:
3063         rcu_read_unlock();
3064 disconnect:
3065         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3066         return -EIO;
3067 }
3068
3069 /* helper function
3070  * input: alg name, feature name
3071  * return: NULL (alg name was "")
3072  *         ERR_PTR(error) if something goes wrong
3073  *         or the crypto hash ptr, if it worked out ok. */
3074 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3075                 const char *alg, const char *name)
3076 {
3077         struct crypto_hash *tfm;
3078
3079         if (!alg[0])
3080                 return NULL;
3081
3082         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3083         if (IS_ERR(tfm)) {
3084                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3085                         alg, name, PTR_ERR(tfm));
3086                 return tfm;
3087         }
3088         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3089                 crypto_free_hash(tfm);
3090                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3091                 return ERR_PTR(-EINVAL);
3092         }
3093         return tfm;
3094 }
3095
3096 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3097 {
3098         void *buffer = tconn->data.rbuf;
3099         int size = pi->size;
3100
3101         while (size) {
3102                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3103                 s = drbd_recv(tconn, buffer, s);
3104                 if (s <= 0) {
3105                         if (s < 0)
3106                                 return s;
3107                         break;
3108                 }
3109                 size -= s;
3110         }
3111         if (size)
3112                 return -EIO;
3113         return 0;
3114 }
3115
3116 /*
3117  * config_unknown_volume  -  device configuration command for unknown volume
3118  *
3119  * When a device is added to an existing connection, the node on which the
3120  * device is added first will send configuration commands to its peer but the
3121  * peer will not know about the device yet.  It will warn and ignore these
3122  * commands.  Once the device is added on the second node, the second node will
3123  * send the same device configuration commands, but in the other direction.
3124  *
3125  * (We can also end up here if drbd is misconfigured.)
3126  */
3127 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3128 {
3129         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3130                   pi->vnr, cmdname(pi->cmd));
3131         return ignore_remaining_packet(tconn, pi);
3132 }
3133
3134 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3135 {
3136         struct drbd_conf *mdev;
3137         struct p_rs_param_95 *p;
3138         unsigned int header_size, data_size, exp_max_sz;
3139         struct crypto_hash *verify_tfm = NULL;
3140         struct crypto_hash *csums_tfm = NULL;
3141         const int apv = tconn->agreed_pro_version;
3142         int *rs_plan_s = NULL;
3143         int fifo_size = 0;
3144         int err;
3145
3146         mdev = vnr_to_mdev(tconn, pi->vnr);
3147         if (!mdev)
3148                 return config_unknown_volume(tconn, pi);
3149
3150         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3151                     : apv == 88 ? sizeof(struct p_rs_param)
3152                                         + SHARED_SECRET_MAX
3153                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3154                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3155
3156         if (pi->size > exp_max_sz) {
3157                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3158                     pi->size, exp_max_sz);
3159                 return -EIO;
3160         }
3161
3162         if (apv <= 88) {
3163                 header_size = sizeof(struct p_rs_param);
3164                 data_size = pi->size - header_size;
3165         } else if (apv <= 94) {
3166                 header_size = sizeof(struct p_rs_param_89);
3167                 data_size = pi->size - header_size;
3168                 D_ASSERT(data_size == 0);
3169         } else {
3170                 header_size = sizeof(struct p_rs_param_95);
3171                 data_size = pi->size - header_size;
3172                 D_ASSERT(data_size == 0);
3173         }
3174
3175         /* initialize verify_alg and csums_alg */
3176         p = pi->data;
3177         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3178
3179         err = drbd_recv_all(mdev->tconn, p, header_size);
3180         if (err)
3181                 return err;
3182
3183         if (get_ldev(mdev)) {
3184                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3185                 put_ldev(mdev);
3186         }
3187
3188         if (apv >= 88) {
3189                 if (apv == 88) {
3190                         if (data_size > SHARED_SECRET_MAX) {
3191                                 dev_err(DEV, "verify-alg too long, "
3192                                     "peer wants %u, accepting only %u byte\n",
3193                                                 data_size, SHARED_SECRET_MAX);
3194                                 return -EIO;
3195                         }
3196
3197                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3198                         if (err)
3199                                 return err;
3200
3201                         /* we expect NUL terminated string */
3202                         /* but just in case someone tries to be evil */
3203                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3204                         p->verify_alg[data_size-1] = 0;
3205
3206                 } else /* apv >= 89 */ {
3207                         /* we still expect NUL terminated strings */
3208                         /* but just in case someone tries to be evil */
3209                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3210                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3211                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3212                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3213                 }
3214
3215                 if (strcmp(mdev->tconn->net_conf->verify_alg, p->verify_alg)) {
3216                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3217                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3218                                     mdev->tconn->net_conf->verify_alg, p->verify_alg);
3219                                 goto disconnect;
3220                         }
3221                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3222                                         p->verify_alg, "verify-alg");
3223                         if (IS_ERR(verify_tfm)) {
3224                                 verify_tfm = NULL;
3225                                 goto disconnect;
3226                         }
3227                 }
3228
3229                 if (apv >= 89 && strcmp(mdev->tconn->net_conf->csums_alg, p->csums_alg)) {
3230                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3231                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3232                                     mdev->tconn->net_conf->csums_alg, p->csums_alg);
3233                                 goto disconnect;
3234                         }
3235                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3236                                         p->csums_alg, "csums-alg");
3237                         if (IS_ERR(csums_tfm)) {
3238                                 csums_tfm = NULL;
3239                                 goto disconnect;
3240                         }
3241                 }
3242
3243                 if (apv > 94 && get_ldev(mdev)) {
3244                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3245                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3246                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3247                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3248                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3249
3250                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3251                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3252                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3253                                 if (!rs_plan_s) {
3254                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3255                                         put_ldev(mdev);
3256                                         goto disconnect;
3257                                 }
3258                         }
3259                         put_ldev(mdev);
3260                 }
3261
3262                 spin_lock(&mdev->peer_seq_lock);
3263                 /* lock against drbd_nl_syncer_conf() */
3264                 if (verify_tfm) {
3265                         strcpy(mdev->tconn->net_conf->verify_alg, p->verify_alg);
3266                         mdev->tconn->net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3267                         crypto_free_hash(mdev->tconn->verify_tfm);
3268                         mdev->tconn->verify_tfm = verify_tfm;
3269                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3270                 }
3271                 if (csums_tfm) {
3272                         strcpy(mdev->tconn->net_conf->csums_alg, p->csums_alg);
3273                         mdev->tconn->net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3274                         crypto_free_hash(mdev->tconn->csums_tfm);
3275                         mdev->tconn->csums_tfm = csums_tfm;
3276                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3277                 }
3278                 if (fifo_size != mdev->rs_plan_s.size) {
3279                         kfree(mdev->rs_plan_s.values);
3280                         mdev->rs_plan_s.values = rs_plan_s;
3281                         mdev->rs_plan_s.size   = fifo_size;
3282                         mdev->rs_planed = 0;
3283                 }
3284                 spin_unlock(&mdev->peer_seq_lock);
3285         }
3286         return 0;
3287
3288 disconnect:
3289         /* just for completeness: actually not needed,
3290          * as this is not reached if csums_tfm was ok. */
3291         crypto_free_hash(csums_tfm);
3292         /* but free the verify_tfm again, if csums_tfm did not work out */
3293         crypto_free_hash(verify_tfm);
3294         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3295         return -EIO;
3296 }
3297
3298 /* warn if the arguments differ by more than 12.5% */
3299 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3300         const char *s, sector_t a, sector_t b)
3301 {
3302         sector_t d;
3303         if (a == 0 || b == 0)
3304                 return;
3305         d = (a > b) ? (a - b) : (b - a);
3306         if (d > (a>>3) || d > (b>>3))
3307                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3308                      (unsigned long long)a, (unsigned long long)b);
3309 }
3310
3311 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3312 {
3313         struct drbd_conf *mdev;
3314         struct p_sizes *p = pi->data;
3315         enum determine_dev_size dd = unchanged;
3316         sector_t p_size, p_usize, my_usize;
3317         int ldsc = 0; /* local disk size changed */
3318         enum dds_flags ddsf;
3319
3320         mdev = vnr_to_mdev(tconn, pi->vnr);
3321         if (!mdev)
3322                 return config_unknown_volume(tconn, pi);
3323
3324         p_size = be64_to_cpu(p->d_size);
3325         p_usize = be64_to_cpu(p->u_size);
3326
3327         /* just store the peer's disk size for now.
3328          * we still need to figure out whether we accept that. */
3329         mdev->p_size = p_size;
3330
3331         if (get_ldev(mdev)) {
3332                 warn_if_differ_considerably(mdev, "lower level device sizes",
3333                            p_size, drbd_get_max_capacity(mdev->ldev));
3334                 warn_if_differ_considerably(mdev, "user requested size",
3335                                             p_usize, mdev->ldev->dc.disk_size);
3336
3337                 /* if this is the first connect, or an otherwise expected
3338                  * param exchange, choose the minimum */
3339                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3340                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3341                                              p_usize);
3342
3343                 my_usize = mdev->ldev->dc.disk_size;
3344
3345                 if (mdev->ldev->dc.disk_size != p_usize) {
3346                         mdev->ldev->dc.disk_size = p_usize;
3347                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3348                              (unsigned long)mdev->ldev->dc.disk_size);
3349                 }
3350
3351                 /* Never shrink a device with usable data during connect.
3352                    But allow online shrinking if we are connected. */
3353                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3354                    drbd_get_capacity(mdev->this_bdev) &&
3355                    mdev->state.disk >= D_OUTDATED &&
3356                    mdev->state.conn < C_CONNECTED) {
3357                         dev_err(DEV, "The peer's disk size is too small!\n");
3358                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3359                         mdev->ldev->dc.disk_size = my_usize;
3360                         put_ldev(mdev);
3361                         return -EIO;
3362                 }
3363                 put_ldev(mdev);
3364         }
3365
3366         ddsf = be16_to_cpu(p->dds_flags);
3367         if (get_ldev(mdev)) {
3368                 dd = drbd_determine_dev_size(mdev, ddsf);
3369                 put_ldev(mdev);
3370                 if (dd == dev_size_error)
3371                         return -EIO;
3372                 drbd_md_sync(mdev);
3373         } else {
3374                 /* I am diskless, need to accept the peer's size. */
3375                 drbd_set_my_capacity(mdev, p_size);
3376         }
3377
3378         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3379         drbd_reconsider_max_bio_size(mdev);
3380
3381         if (get_ldev(mdev)) {
3382                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3383                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3384                         ldsc = 1;
3385                 }
3386
3387                 put_ldev(mdev);
3388         }
3389
3390         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3391                 if (be64_to_cpu(p->c_size) !=
3392                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3393                         /* we have different sizes, probably peer
3394                          * needs to know my new size... */
3395                         drbd_send_sizes(mdev, 0, ddsf);
3396                 }
3397                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3398                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3399                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3400                             mdev->state.disk >= D_INCONSISTENT) {
3401                                 if (ddsf & DDSF_NO_RESYNC)
3402                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3403                                 else
3404                                         resync_after_online_grow(mdev);
3405                         } else
3406                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3407                 }
3408         }
3409
3410         return 0;
3411 }
3412
3413 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3414 {
3415         struct drbd_conf *mdev;
3416         struct p_uuids *p = pi->data;
3417         u64 *p_uuid;
3418         int i, updated_uuids = 0;
3419
3420         mdev = vnr_to_mdev(tconn, pi->vnr);
3421         if (!mdev)
3422                 return config_unknown_volume(tconn, pi);
3423
3424         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3425
3426         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3427                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3428
3429         kfree(mdev->p_uuid);
3430         mdev->p_uuid = p_uuid;
3431
3432         if (mdev->state.conn < C_CONNECTED &&
3433             mdev->state.disk < D_INCONSISTENT &&
3434             mdev->state.role == R_PRIMARY &&
3435             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3436                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3437                     (unsigned long long)mdev->ed_uuid);
3438                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3439                 return -EIO;
3440         }
3441
3442         if (get_ldev(mdev)) {
3443                 int skip_initial_sync =
3444                         mdev->state.conn == C_CONNECTED &&
3445                         mdev->tconn->agreed_pro_version >= 90 &&
3446                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3447                         (p_uuid[UI_FLAGS] & 8);
3448                 if (skip_initial_sync) {
3449                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3450                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3451                                         "clear_n_write from receive_uuids",
3452                                         BM_LOCKED_TEST_ALLOWED);
3453                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3454                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3455                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3456                                         CS_VERBOSE, NULL);
3457                         drbd_md_sync(mdev);
3458                         updated_uuids = 1;
3459                 }
3460                 put_ldev(mdev);
3461         } else if (mdev->state.disk < D_INCONSISTENT &&
3462                    mdev->state.role == R_PRIMARY) {
3463                 /* I am a diskless primary, the peer just created a new current UUID
3464                    for me. */
3465                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3466         }
3467
3468         /* Before we test for the disk state, we should wait until an eventually
3469            ongoing cluster wide state change is finished. That is important if
3470            we are primary and are detaching from our disk. We need to see the
3471            new disk state... */
3472         mutex_lock(mdev->state_mutex);
3473         mutex_unlock(mdev->state_mutex);
3474         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3475                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3476
3477         if (updated_uuids)
3478                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3479
3480         return 0;
3481 }
3482
3483 /**
3484  * convert_state() - Converts the peer's view of the cluster state to our point of view
3485  * @ps:         The state as seen by the peer.
3486  */
3487 static union drbd_state convert_state(union drbd_state ps)
3488 {
3489         union drbd_state ms;
3490
3491         static enum drbd_conns c_tab[] = {
3492                 [C_CONNECTED] = C_CONNECTED,
3493
3494                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3495                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3496                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3497                 [C_VERIFY_S]       = C_VERIFY_T,
3498                 [C_MASK]   = C_MASK,
3499         };
3500
3501         ms.i = ps.i;
3502
3503         ms.conn = c_tab[ps.conn];
3504         ms.peer = ps.role;
3505         ms.role = ps.peer;
3506         ms.pdsk = ps.disk;
3507         ms.disk = ps.pdsk;
3508         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3509
3510         return ms;
3511 }
3512
3513 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3514 {
3515         struct drbd_conf *mdev;
3516         struct p_req_state *p = pi->data;
3517         union drbd_state mask, val;
3518         enum drbd_state_rv rv;
3519
3520         mdev = vnr_to_mdev(tconn, pi->vnr);
3521         if (!mdev)
3522                 return -EIO;
3523
3524         mask.i = be32_to_cpu(p->mask);
3525         val.i = be32_to_cpu(p->val);
3526
3527         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3528             mutex_is_locked(mdev->state_mutex)) {
3529                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3530                 return 0;
3531         }
3532
3533         mask = convert_state(mask);
3534         val = convert_state(val);
3535
3536         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3537         drbd_send_sr_reply(mdev, rv);
3538
3539         drbd_md_sync(mdev);
3540
3541         return 0;
3542 }
3543
3544 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3545 {
3546         struct p_req_state *p = pi->data;
3547         union drbd_state mask, val;
3548         enum drbd_state_rv rv;
3549
3550         mask.i = be32_to_cpu(p->mask);
3551         val.i = be32_to_cpu(p->val);
3552
3553         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3554             mutex_is_locked(&tconn->cstate_mutex)) {
3555                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3556                 return 0;
3557         }
3558
3559         mask = convert_state(mask);
3560         val = convert_state(val);
3561
3562         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3563         conn_send_sr_reply(tconn, rv);
3564
3565         return 0;
3566 }
3567
3568 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3569 {
3570         struct drbd_conf *mdev;
3571         struct p_state *p = pi->data;
3572         union drbd_state os, ns, peer_state;
3573         enum drbd_disk_state real_peer_disk;
3574         enum chg_state_flags cs_flags;
3575         int rv;
3576
3577         mdev = vnr_to_mdev(tconn, pi->vnr);
3578         if (!mdev)
3579                 return config_unknown_volume(tconn, pi);
3580
3581         peer_state.i = be32_to_cpu(p->state);
3582
3583         real_peer_disk = peer_state.disk;
3584         if (peer_state.disk == D_NEGOTIATING) {
3585                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3586                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3587         }
3588
3589         spin_lock_irq(&mdev->tconn->req_lock);
3590  retry:
3591         os = ns = drbd_read_state(mdev);
3592         spin_unlock_irq(&mdev->tconn->req_lock);
3593
3594         /* peer says his disk is uptodate, while we think it is inconsistent,
3595          * and this happens while we think we have a sync going on. */
3596         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3597             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3598                 /* If we are (becoming) SyncSource, but peer is still in sync
3599                  * preparation, ignore its uptodate-ness to avoid flapping, it
3600                  * will change to inconsistent once the peer reaches active
3601                  * syncing states.
3602                  * It may have changed syncer-paused flags, however, so we
3603                  * cannot ignore this completely. */
3604                 if (peer_state.conn > C_CONNECTED &&
3605                     peer_state.conn < C_SYNC_SOURCE)
3606                         real_peer_disk = D_INCONSISTENT;
3607
3608                 /* if peer_state changes to connected at the same time,
3609                  * it explicitly notifies us that it finished resync.
3610                  * Maybe we should finish it up, too? */
3611                 else if (os.conn >= C_SYNC_SOURCE &&
3612                          peer_state.conn == C_CONNECTED) {
3613                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3614                                 drbd_resync_finished(mdev);
3615                         return 0;
3616                 }
3617         }
3618
3619         /* peer says his disk is inconsistent, while we think it is uptodate,
3620          * and this happens while the peer still thinks we have a sync going on,
3621          * but we think we are already done with the sync.
3622          * We ignore this to avoid flapping pdsk.
3623          * This should not happen, if the peer is a recent version of drbd. */
3624         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3625             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3626                 real_peer_disk = D_UP_TO_DATE;
3627
3628         if (ns.conn == C_WF_REPORT_PARAMS)
3629                 ns.conn = C_CONNECTED;
3630
3631         if (peer_state.conn == C_AHEAD)
3632                 ns.conn = C_BEHIND;
3633
3634         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3635             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3636                 int cr; /* consider resync */
3637
3638                 /* if we established a new connection */
3639                 cr  = (os.conn < C_CONNECTED);
3640                 /* if we had an established connection
3641                  * and one of the nodes newly attaches a disk */
3642                 cr |= (os.conn == C_CONNECTED &&
3643                        (peer_state.disk == D_NEGOTIATING ||
3644                         os.disk == D_NEGOTIATING));
3645                 /* if we have both been inconsistent, and the peer has been
3646                  * forced to be UpToDate with --overwrite-data */
3647                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3648                 /* if we had been plain connected, and the admin requested to
3649                  * start a sync by "invalidate" or "invalidate-remote" */
3650                 cr |= (os.conn == C_CONNECTED &&
3651                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3652                                  peer_state.conn <= C_WF_BITMAP_T));
3653
3654                 if (cr)
3655                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3656
3657                 put_ldev(mdev);
3658                 if (ns.conn == C_MASK) {
3659                         ns.conn = C_CONNECTED;
3660                         if (mdev->state.disk == D_NEGOTIATING) {
3661                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3662                         } else if (peer_state.disk == D_NEGOTIATING) {
3663                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3664                                 peer_state.disk = D_DISKLESS;
3665                                 real_peer_disk = D_DISKLESS;
3666                         } else {
3667                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3668                                         return -EIO;
3669                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3670                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3671                                 return -EIO;
3672                         }
3673                 }
3674         }
3675
3676         spin_lock_irq(&mdev->tconn->req_lock);
3677         if (os.i != drbd_read_state(mdev).i)
3678                 goto retry;
3679         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3680         ns.peer = peer_state.role;
3681         ns.pdsk = real_peer_disk;
3682         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3683         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3684                 ns.disk = mdev->new_state_tmp.disk;
3685         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3686         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3687             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3688                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3689                    for temporal network outages! */
3690                 spin_unlock_irq(&mdev->tconn->req_lock);
3691                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3692                 tl_clear(mdev->tconn);
3693                 drbd_uuid_new_current(mdev);
3694                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3695                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3696                 return -EIO;
3697         }
3698         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3699         ns = drbd_read_state(mdev);
3700         spin_unlock_irq(&mdev->tconn->req_lock);
3701
3702         if (rv < SS_SUCCESS) {
3703                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3704                 return -EIO;
3705         }
3706
3707         if (os.conn > C_WF_REPORT_PARAMS) {
3708                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3709                     peer_state.disk != D_NEGOTIATING ) {
3710                         /* we want resync, peer has not yet decided to sync... */
3711                         /* Nowadays only used when forcing a node into primary role and
3712                            setting its disk to UpToDate with that */
3713                         drbd_send_uuids(mdev);
3714                         drbd_send_state(mdev);
3715                 }
3716         }
3717
3718         mdev->tconn->net_conf->want_lose = 0;
3719
3720         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3721
3722         return 0;
3723 }
3724
3725 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3726 {
3727         struct drbd_conf *mdev;
3728         struct p_rs_uuid *p = pi->data;
3729
3730         mdev = vnr_to_mdev(tconn, pi->vnr);
3731         if (!mdev)
3732                 return -EIO;
3733
3734         wait_event(mdev->misc_wait,
3735                    mdev->state.conn == C_WF_SYNC_UUID ||
3736                    mdev->state.conn == C_BEHIND ||
3737                    mdev->state.conn < C_CONNECTED ||
3738                    mdev->state.disk < D_NEGOTIATING);
3739
3740         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3741
3742         /* Here the _drbd_uuid_ functions are right, current should
3743            _not_ be rotated into the history */
3744         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3745                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3746                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3747
3748                 drbd_print_uuids(mdev, "updated sync uuid");
3749                 drbd_start_resync(mdev, C_SYNC_TARGET);
3750
3751                 put_ldev(mdev);
3752         } else
3753                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3754
3755         return 0;
3756 }
3757
3758 /**
3759  * receive_bitmap_plain
3760  *
3761  * Return 0 when done, 1 when another iteration is needed, and a negative error
3762  * code upon failure.
3763  */
3764 static int
3765 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3766                      unsigned long *p, struct bm_xfer_ctx *c)
3767 {
3768         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3769                                  drbd_header_size(mdev->tconn);
3770         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3771                                        c->bm_words - c->word_offset);
3772         unsigned int want = num_words * sizeof(*p);
3773         int err;
3774
3775         if (want != size) {
3776                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3777                 return -EIO;
3778         }
3779         if (want == 0)
3780                 return 0;
3781         err = drbd_recv_all(mdev->tconn, p, want);
3782         if (err)
3783                 return err;
3784
3785         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3786
3787         c->word_offset += num_words;
3788         c->bit_offset = c->word_offset * BITS_PER_LONG;
3789         if (c->bit_offset > c->bm_bits)
3790                 c->bit_offset = c->bm_bits;
3791
3792         return 1;
3793 }
3794
3795 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3796 {
3797         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3798 }
3799
3800 static int dcbp_get_start(struct p_compressed_bm *p)
3801 {
3802         return (p->encoding & 0x80) != 0;
3803 }
3804
3805 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3806 {
3807         return (p->encoding >> 4) & 0x7;
3808 }
3809
3810 /**
3811  * recv_bm_rle_bits
3812  *
3813  * Return 0 when done, 1 when another iteration is needed, and a negative error
3814  * code upon failure.
3815  */
3816 static int
3817 recv_bm_rle_bits(struct drbd_conf *mdev,
3818                 struct p_compressed_bm *p,
3819                  struct bm_xfer_ctx *c,
3820                  unsigned int len)
3821 {
3822         struct bitstream bs;
3823         u64 look_ahead;
3824         u64 rl;
3825         u64 tmp;
3826         unsigned long s = c->bit_offset;
3827         unsigned long e;
3828         int toggle = dcbp_get_start(p);
3829         int have;
3830         int bits;
3831
3832         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3833
3834         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3835         if (bits < 0)
3836                 return -EIO;
3837
3838         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3839                 bits = vli_decode_bits(&rl, look_ahead);
3840                 if (bits <= 0)
3841                         return -EIO;
3842
3843                 if (toggle) {
3844                         e = s + rl -1;
3845                         if (e >= c->bm_bits) {
3846                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3847                                 return -EIO;
3848                         }
3849                         _drbd_bm_set_bits(mdev, s, e);
3850                 }
3851
3852                 if (have < bits) {
3853                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3854                                 have, bits, look_ahead,
3855                                 (unsigned int)(bs.cur.b - p->code),
3856                                 (unsigned int)bs.buf_len);
3857                         return -EIO;
3858                 }
3859                 look_ahead >>= bits;
3860                 have -= bits;
3861
3862                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3863                 if (bits < 0)
3864                         return -EIO;
3865                 look_ahead |= tmp << have;
3866                 have += bits;
3867         }
3868
3869         c->bit_offset = s;
3870         bm_xfer_ctx_bit_to_word_offset(c);
3871
3872         return (s != c->bm_bits);
3873 }
3874
3875 /**
3876  * decode_bitmap_c
3877  *
3878  * Return 0 when done, 1 when another iteration is needed, and a negative error
3879  * code upon failure.
3880  */
3881 static int
3882 decode_bitmap_c(struct drbd_conf *mdev,
3883                 struct p_compressed_bm *p,
3884                 struct bm_xfer_ctx *c,
3885                 unsigned int len)
3886 {
3887         if (dcbp_get_code(p) == RLE_VLI_Bits)
3888                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3889
3890         /* other variants had been implemented for evaluation,
3891          * but have been dropped as this one turned out to be "best"
3892          * during all our tests. */
3893
3894         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3895         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3896         return -EIO;
3897 }
3898
3899 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3900                 const char *direction, struct bm_xfer_ctx *c)
3901 {
3902         /* what would it take to transfer it "plaintext" */
3903         unsigned int header_size = drbd_header_size(mdev->tconn);
3904         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3905         unsigned int plain =
3906                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3907                 c->bm_words * sizeof(unsigned long);
3908         unsigned int total = c->bytes[0] + c->bytes[1];
3909         unsigned int r;
3910
3911         /* total can not be zero. but just in case: */
3912         if (total == 0)
3913                 return;
3914
3915         /* don't report if not compressed */
3916         if (total >= plain)
3917                 return;
3918
3919         /* total < plain. check for overflow, still */
3920         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3921                                     : (1000 * total / plain);
3922
3923         if (r > 1000)
3924                 r = 1000;
3925
3926         r = 1000 - r;
3927         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3928              "total %u; compression: %u.%u%%\n",
3929                         direction,
3930                         c->bytes[1], c->packets[1],
3931                         c->bytes[0], c->packets[0],
3932                         total, r/10, r % 10);
3933 }
3934
3935 /* Since we are processing the bitfield from lower addresses to higher,
3936    it does not matter if the process it in 32 bit chunks or 64 bit
3937    chunks as long as it is little endian. (Understand it as byte stream,
3938    beginning with the lowest byte...) If we would use big endian
3939    we would need to process it from the highest address to the lowest,
3940    in order to be agnostic to the 32 vs 64 bits issue.
3941
3942    returns 0 on failure, 1 if we successfully received it. */
3943 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3944 {
3945         struct drbd_conf *mdev;
3946         struct bm_xfer_ctx c;
3947         int err;
3948
3949         mdev = vnr_to_mdev(tconn, pi->vnr);
3950         if (!mdev)
3951                 return -EIO;
3952
3953         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3954         /* you are supposed to send additional out-of-sync information
3955          * if you actually set bits during this phase */
3956
3957         c = (struct bm_xfer_ctx) {
3958                 .bm_bits = drbd_bm_bits(mdev),
3959                 .bm_words = drbd_bm_words(mdev),
3960         };
3961
3962         for(;;) {
3963                 if (pi->cmd == P_BITMAP)
3964                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3965                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3966                         /* MAYBE: sanity check that we speak proto >= 90,
3967                          * and the feature is enabled! */
3968                         struct p_compressed_bm *p = pi->data;
3969
3970                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
3971                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3972                                 err = -EIO;
3973                                 goto out;
3974                         }
3975                         if (pi->size <= sizeof(*p)) {
3976                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
3977                                 err = -EIO;
3978                                 goto out;
3979                         }
3980                         err = drbd_recv_all(mdev->tconn, p, pi->size);
3981                         if (err)
3982                                goto out;
3983                         err = decode_bitmap_c(mdev, p, &c, pi->size);
3984                 } else {
3985                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
3986                         err = -EIO;
3987                         goto out;
3988                 }
3989
3990                 c.packets[pi->cmd == P_BITMAP]++;
3991                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
3992
3993                 if (err <= 0) {
3994                         if (err < 0)
3995                                 goto out;
3996                         break;
3997                 }
3998                 err = drbd_recv_header(mdev->tconn, pi);
3999                 if (err)
4000                         goto out;
4001         }
4002
4003         INFO_bm_xfer_stats(mdev, "receive", &c);
4004
4005         if (mdev->state.conn == C_WF_BITMAP_T) {
4006                 enum drbd_state_rv rv;
4007
4008                 err = drbd_send_bitmap(mdev);
4009                 if (err)
4010                         goto out;
4011                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4012                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4013                 D_ASSERT(rv == SS_SUCCESS);
4014         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4015                 /* admin may have requested C_DISCONNECTING,
4016                  * other threads may have noticed network errors */
4017                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4018                     drbd_conn_str(mdev->state.conn));
4019         }
4020         err = 0;
4021
4022  out:
4023         drbd_bm_unlock(mdev);
4024         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4025                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4026         return err;
4027 }
4028
4029 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4030 {
4031         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4032                  pi->cmd, pi->size);
4033
4034         return ignore_remaining_packet(tconn, pi);
4035 }
4036
4037 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4038 {
4039         /* Make sure we've acked all the TCP data associated
4040          * with the data requests being unplugged */
4041         drbd_tcp_quickack(tconn->data.socket);
4042
4043         return 0;
4044 }
4045
4046 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4047 {
4048         struct drbd_conf *mdev;
4049         struct p_block_desc *p = pi->data;
4050
4051         mdev = vnr_to_mdev(tconn, pi->vnr);
4052         if (!mdev)
4053                 return -EIO;
4054
4055         switch (mdev->state.conn) {
4056         case C_WF_SYNC_UUID:
4057         case C_WF_BITMAP_T:
4058         case C_BEHIND:
4059                         break;
4060         default:
4061                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4062                                 drbd_conn_str(mdev->state.conn));
4063         }
4064
4065         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4066
4067         return 0;
4068 }
4069
4070 struct data_cmd {
4071         int expect_payload;
4072         size_t pkt_size;
4073         int (*fn)(struct drbd_tconn *, struct packet_info *);
4074 };
4075
4076 static struct data_cmd drbd_cmd_handler[] = {
4077         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4078         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4079         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4080         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4081         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4082         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4083         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4084         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4085         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4086         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4087         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4088         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4089         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4090         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4091         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4092         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4093         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4094         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4095         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4096         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4097         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4098         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4099         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4100 };
4101
4102 static void drbdd(struct drbd_tconn *tconn)
4103 {
4104         struct packet_info pi;
4105         size_t shs; /* sub header size */
4106         int err;
4107
4108         while (get_t_state(&tconn->receiver) == RUNNING) {
4109                 struct data_cmd *cmd;
4110
4111                 drbd_thread_current_set_cpu(&tconn->receiver);
4112                 if (drbd_recv_header(tconn, &pi))
4113                         goto err_out;
4114
4115                 cmd = &drbd_cmd_handler[pi.cmd];
4116                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4117                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4118                         goto err_out;
4119                 }
4120
4121                 shs = cmd->pkt_size;
4122                 if (pi.size > shs && !cmd->expect_payload) {
4123                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4124                         goto err_out;
4125                 }
4126
4127                 if (shs) {
4128                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4129                         if (err)
4130                                 goto err_out;
4131                         pi.size -= shs;
4132                 }
4133
4134                 err = cmd->fn(tconn, &pi);
4135                 if (err) {
4136                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4137                                  cmdname(pi.cmd), err, pi.size);
4138                         goto err_out;
4139                 }
4140         }
4141         return;
4142
4143     err_out:
4144         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4145 }
4146
4147 void conn_flush_workqueue(struct drbd_tconn *tconn)
4148 {
4149         struct drbd_wq_barrier barr;
4150
4151         barr.w.cb = w_prev_work_done;
4152         barr.w.tconn = tconn;
4153         init_completion(&barr.done);
4154         drbd_queue_work(&tconn->data.work, &barr.w);
4155         wait_for_completion(&barr.done);
4156 }
4157
4158 static void drbd_disconnect(struct drbd_tconn *tconn)
4159 {
4160         enum drbd_conns oc;
4161         int rv = SS_UNKNOWN_ERROR;
4162
4163         if (tconn->cstate == C_STANDALONE)
4164                 return;
4165
4166         /* asender does not clean up anything. it must not interfere, either */
4167         drbd_thread_stop(&tconn->asender);
4168         drbd_free_sock(tconn);
4169
4170         down_read(&drbd_cfg_rwsem);
4171         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4172         up_read(&drbd_cfg_rwsem);
4173         conn_info(tconn, "Connection closed\n");
4174
4175         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4176                 conn_try_outdate_peer_async(tconn);
4177
4178         spin_lock_irq(&tconn->req_lock);
4179         oc = tconn->cstate;
4180         if (oc >= C_UNCONNECTED)
4181                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4182
4183         spin_unlock_irq(&tconn->req_lock);
4184
4185         if (oc == C_DISCONNECTING) {
4186                 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
4187
4188                 crypto_free_hash(tconn->cram_hmac_tfm);
4189                 tconn->cram_hmac_tfm = NULL;
4190
4191                 kfree(tconn->net_conf);
4192                 tconn->net_conf = NULL;
4193                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE);
4194         }
4195 }
4196
4197 static int drbd_disconnected(int vnr, void *p, void *data)
4198 {
4199         struct drbd_conf *mdev = (struct drbd_conf *)p;
4200         enum drbd_fencing_p fp;
4201         unsigned int i;
4202
4203         /* wait for current activity to cease. */
4204         spin_lock_irq(&mdev->tconn->req_lock);
4205         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4206         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4207         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4208         spin_unlock_irq(&mdev->tconn->req_lock);
4209
4210         /* We do not have data structures that would allow us to
4211          * get the rs_pending_cnt down to 0 again.
4212          *  * On C_SYNC_TARGET we do not have any data structures describing
4213          *    the pending RSDataRequest's we have sent.
4214          *  * On C_SYNC_SOURCE there is no data structure that tracks
4215          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4216          *  And no, it is not the sum of the reference counts in the
4217          *  resync_LRU. The resync_LRU tracks the whole operation including
4218          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4219          *  on the fly. */
4220         drbd_rs_cancel_all(mdev);
4221         mdev->rs_total = 0;
4222         mdev->rs_failed = 0;
4223         atomic_set(&mdev->rs_pending_cnt, 0);
4224         wake_up(&mdev->misc_wait);
4225
4226         del_timer(&mdev->request_timer);
4227
4228         del_timer_sync(&mdev->resync_timer);
4229         resync_timer_fn((unsigned long)mdev);
4230
4231         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4232          * w_make_resync_request etc. which may still be on the worker queue
4233          * to be "canceled" */
4234         drbd_flush_workqueue(mdev);
4235
4236         drbd_finish_peer_reqs(mdev);
4237
4238         kfree(mdev->p_uuid);
4239         mdev->p_uuid = NULL;
4240
4241         if (!drbd_suspended(mdev))
4242                 tl_clear(mdev->tconn);
4243
4244         drbd_md_sync(mdev);
4245
4246         fp = FP_DONT_CARE;
4247         if (get_ldev(mdev)) {
4248                 fp = mdev->ldev->dc.fencing;
4249                 put_ldev(mdev);
4250         }
4251
4252         /* serialize with bitmap writeout triggered by the state change,
4253          * if any. */
4254         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4255
4256         /* tcp_close and release of sendpage pages can be deferred.  I don't
4257          * want to use SO_LINGER, because apparently it can be deferred for
4258          * more than 20 seconds (longest time I checked).
4259          *
4260          * Actually we don't care for exactly when the network stack does its
4261          * put_page(), but release our reference on these pages right here.
4262          */
4263         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4264         if (i)
4265                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4266         i = atomic_read(&mdev->pp_in_use_by_net);
4267         if (i)
4268                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4269         i = atomic_read(&mdev->pp_in_use);
4270         if (i)
4271                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4272
4273         D_ASSERT(list_empty(&mdev->read_ee));
4274         D_ASSERT(list_empty(&mdev->active_ee));
4275         D_ASSERT(list_empty(&mdev->sync_ee));
4276         D_ASSERT(list_empty(&mdev->done_ee));
4277
4278         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4279         atomic_set(&mdev->current_epoch->epoch_size, 0);
4280         D_ASSERT(list_empty(&mdev->current_epoch->list));
4281
4282         return 0;
4283 }
4284
4285 /*
4286  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4287  * we can agree on is stored in agreed_pro_version.
4288  *
4289  * feature flags and the reserved array should be enough room for future
4290  * enhancements of the handshake protocol, and possible plugins...
4291  *
4292  * for now, they are expected to be zero, but ignored.
4293  */
4294 static int drbd_send_features(struct drbd_tconn *tconn)
4295 {
4296         struct drbd_socket *sock;
4297         struct p_connection_features *p;
4298
4299         sock = &tconn->data;
4300         p = conn_prepare_command(tconn, sock);
4301         if (!p)
4302                 return -EIO;
4303         memset(p, 0, sizeof(*p));
4304         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4305         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4306         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4307 }
4308
4309 /*
4310  * return values:
4311  *   1 yes, we have a valid connection
4312  *   0 oops, did not work out, please try again
4313  *  -1 peer talks different language,
4314  *     no point in trying again, please go standalone.
4315  */
4316 static int drbd_do_features(struct drbd_tconn *tconn)
4317 {
4318         /* ASSERT current == tconn->receiver ... */
4319         struct p_connection_features *p;
4320         const int expect = sizeof(struct p_connection_features);
4321         struct packet_info pi;
4322         int err;
4323
4324         err = drbd_send_features(tconn);
4325         if (err)
4326                 return 0;
4327
4328         err = drbd_recv_header(tconn, &pi);
4329         if (err)
4330                 return 0;
4331
4332         if (pi.cmd != P_CONNECTION_FEATURES) {
4333                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4334                      cmdname(pi.cmd), pi.cmd);
4335                 return -1;
4336         }
4337
4338         if (pi.size != expect) {
4339                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4340                      expect, pi.size);
4341                 return -1;
4342         }
4343
4344         p = pi.data;
4345         err = drbd_recv_all_warn(tconn, p, expect);
4346         if (err)
4347                 return 0;
4348
4349         p->protocol_min = be32_to_cpu(p->protocol_min);
4350         p->protocol_max = be32_to_cpu(p->protocol_max);
4351         if (p->protocol_max == 0)
4352                 p->protocol_max = p->protocol_min;
4353
4354         if (PRO_VERSION_MAX < p->protocol_min ||
4355             PRO_VERSION_MIN > p->protocol_max)
4356                 goto incompat;
4357
4358         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4359
4360         conn_info(tconn, "Handshake successful: "
4361              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4362
4363         return 1;
4364
4365  incompat:
4366         conn_err(tconn, "incompatible DRBD dialects: "
4367             "I support %d-%d, peer supports %d-%d\n",
4368             PRO_VERSION_MIN, PRO_VERSION_MAX,
4369             p->protocol_min, p->protocol_max);
4370         return -1;
4371 }
4372
4373 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4374 static int drbd_do_auth(struct drbd_tconn *tconn)
4375 {
4376         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4377         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4378         return -1;
4379 }
4380 #else
4381 #define CHALLENGE_LEN 64
4382
4383 /* Return value:
4384         1 - auth succeeded,
4385         0 - failed, try again (network error),
4386         -1 - auth failed, don't try again.
4387 */
4388
4389 static int drbd_do_auth(struct drbd_tconn *tconn)
4390 {
4391         struct drbd_socket *sock;
4392         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4393         struct scatterlist sg;
4394         char *response = NULL;
4395         char *right_response = NULL;
4396         char *peers_ch = NULL;
4397         unsigned int key_len;
4398         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4399         unsigned int resp_size;
4400         struct hash_desc desc;
4401         struct packet_info pi;
4402         struct net_conf *nc;
4403         int err, rv;
4404
4405         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4406
4407         rcu_read_lock();
4408         nc = rcu_dereference(tconn->net_conf);
4409         key_len = strlen(nc->shared_secret);
4410         memcpy(secret, nc->shared_secret, key_len);
4411         rcu_read_unlock();
4412
4413         desc.tfm = tconn->cram_hmac_tfm;
4414         desc.flags = 0;
4415
4416         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4417         if (rv) {
4418                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4419                 rv = -1;
4420                 goto fail;
4421         }
4422
4423         get_random_bytes(my_challenge, CHALLENGE_LEN);
4424
4425         sock = &tconn->data;
4426         if (!conn_prepare_command(tconn, sock)) {
4427                 rv = 0;
4428                 goto fail;
4429         }
4430         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4431                                 my_challenge, CHALLENGE_LEN);
4432         if (!rv)
4433                 goto fail;
4434
4435         err = drbd_recv_header(tconn, &pi);
4436         if (err) {
4437                 rv = 0;
4438                 goto fail;
4439         }
4440
4441         if (pi.cmd != P_AUTH_CHALLENGE) {
4442                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4443                     cmdname(pi.cmd), pi.cmd);
4444                 rv = 0;
4445                 goto fail;
4446         }
4447
4448         if (pi.size > CHALLENGE_LEN * 2) {
4449                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4450                 rv = -1;
4451                 goto fail;
4452         }
4453
4454         peers_ch = kmalloc(pi.size, GFP_NOIO);
4455         if (peers_ch == NULL) {
4456                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4457                 rv = -1;
4458                 goto fail;
4459         }
4460
4461         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4462         if (err) {
4463                 rv = 0;
4464                 goto fail;
4465         }
4466
4467         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4468         response = kmalloc(resp_size, GFP_NOIO);
4469         if (response == NULL) {
4470                 conn_err(tconn, "kmalloc of response failed\n");
4471                 rv = -1;
4472                 goto fail;
4473         }
4474
4475         sg_init_table(&sg, 1);
4476         sg_set_buf(&sg, peers_ch, pi.size);
4477
4478         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4479         if (rv) {
4480                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4481                 rv = -1;
4482                 goto fail;
4483         }
4484
4485         if (!conn_prepare_command(tconn, sock)) {
4486                 rv = 0;
4487                 goto fail;
4488         }
4489         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4490                                 response, resp_size);
4491         if (!rv)
4492                 goto fail;
4493
4494         err = drbd_recv_header(tconn, &pi);
4495         if (err) {
4496                 rv = 0;
4497                 goto fail;
4498         }
4499
4500         if (pi.cmd != P_AUTH_RESPONSE) {
4501                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4502                         cmdname(pi.cmd), pi.cmd);
4503                 rv = 0;
4504                 goto fail;
4505         }
4506
4507         if (pi.size != resp_size) {
4508                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4509                 rv = 0;
4510                 goto fail;
4511         }
4512
4513         err = drbd_recv_all_warn(tconn, response , resp_size);
4514         if (err) {
4515                 rv = 0;
4516                 goto fail;
4517         }
4518
4519         right_response = kmalloc(resp_size, GFP_NOIO);
4520         if (right_response == NULL) {
4521                 conn_err(tconn, "kmalloc of right_response failed\n");
4522                 rv = -1;
4523                 goto fail;
4524         }
4525
4526         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4527
4528         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4529         if (rv) {
4530                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4531                 rv = -1;
4532                 goto fail;
4533         }
4534
4535         rv = !memcmp(response, right_response, resp_size);
4536
4537         if (rv)
4538                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4539                      resp_size);
4540         else
4541                 rv = -1;
4542
4543  fail:
4544         kfree(peers_ch);
4545         kfree(response);
4546         kfree(right_response);
4547
4548         return rv;
4549 }
4550 #endif
4551
4552 int drbdd_init(struct drbd_thread *thi)
4553 {
4554         struct drbd_tconn *tconn = thi->tconn;
4555         int h;
4556
4557         conn_info(tconn, "receiver (re)started\n");
4558
4559         do {
4560                 h = drbd_connect(tconn);
4561                 if (h == 0) {
4562                         drbd_disconnect(tconn);
4563                         schedule_timeout_interruptible(HZ);
4564                 }
4565                 if (h == -1) {
4566                         conn_warn(tconn, "Discarding network configuration.\n");
4567                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4568                 }
4569         } while (h == 0);
4570
4571         if (h > 0) {
4572                 if (get_net_conf(tconn)) {
4573                         drbdd(tconn);
4574                         put_net_conf(tconn);
4575                 }
4576         }
4577
4578         drbd_disconnect(tconn);
4579
4580         conn_info(tconn, "receiver terminated\n");
4581         return 0;
4582 }
4583
4584 /* ********* acknowledge sender ******** */
4585
4586 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4587 {
4588         struct p_req_state_reply *p = pi->data;
4589         int retcode = be32_to_cpu(p->retcode);
4590
4591         if (retcode >= SS_SUCCESS) {
4592                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4593         } else {
4594                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4595                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4596                          drbd_set_st_err_str(retcode), retcode);
4597         }
4598         wake_up(&tconn->ping_wait);
4599
4600         return 0;
4601 }
4602
4603 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4604 {
4605         struct drbd_conf *mdev;
4606         struct p_req_state_reply *p = pi->data;
4607         int retcode = be32_to_cpu(p->retcode);
4608
4609         mdev = vnr_to_mdev(tconn, pi->vnr);
4610         if (!mdev)
4611                 return -EIO;
4612
4613         if (retcode >= SS_SUCCESS) {
4614                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4615         } else {
4616                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4617                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4618                         drbd_set_st_err_str(retcode), retcode);
4619         }
4620         wake_up(&mdev->state_wait);
4621
4622         return 0;
4623 }
4624
4625 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4626 {
4627         return drbd_send_ping_ack(tconn);
4628
4629 }
4630
4631 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4632 {
4633         /* restore idle timeout */
4634         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4635         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4636                 wake_up(&tconn->ping_wait);
4637
4638         return 0;
4639 }
4640
4641 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4642 {
4643         struct drbd_conf *mdev;
4644         struct p_block_ack *p = pi->data;
4645         sector_t sector = be64_to_cpu(p->sector);
4646         int blksize = be32_to_cpu(p->blksize);
4647
4648         mdev = vnr_to_mdev(tconn, pi->vnr);
4649         if (!mdev)
4650                 return -EIO;
4651
4652         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4653
4654         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4655
4656         if (get_ldev(mdev)) {
4657                 drbd_rs_complete_io(mdev, sector);
4658                 drbd_set_in_sync(mdev, sector, blksize);
4659                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4660                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4661                 put_ldev(mdev);
4662         }
4663         dec_rs_pending(mdev);
4664         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4665
4666         return 0;
4667 }
4668
4669 static int
4670 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4671                               struct rb_root *root, const char *func,
4672                               enum drbd_req_event what, bool missing_ok)
4673 {
4674         struct drbd_request *req;
4675         struct bio_and_error m;
4676
4677         spin_lock_irq(&mdev->tconn->req_lock);
4678         req = find_request(mdev, root, id, sector, missing_ok, func);
4679         if (unlikely(!req)) {
4680                 spin_unlock_irq(&mdev->tconn->req_lock);
4681                 return -EIO;
4682         }
4683         __req_mod(req, what, &m);
4684         spin_unlock_irq(&mdev->tconn->req_lock);
4685
4686         if (m.bio)
4687                 complete_master_bio(mdev, &m);
4688         return 0;
4689 }
4690
4691 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4692 {
4693         struct drbd_conf *mdev;
4694         struct p_block_ack *p = pi->data;
4695         sector_t sector = be64_to_cpu(p->sector);
4696         int blksize = be32_to_cpu(p->blksize);
4697         enum drbd_req_event what;
4698
4699         mdev = vnr_to_mdev(tconn, pi->vnr);
4700         if (!mdev)
4701                 return -EIO;
4702
4703         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4704
4705         if (p->block_id == ID_SYNCER) {
4706                 drbd_set_in_sync(mdev, sector, blksize);
4707                 dec_rs_pending(mdev);
4708                 return 0;
4709         }
4710         switch (pi->cmd) {
4711         case P_RS_WRITE_ACK:
4712                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4713                 break;
4714         case P_WRITE_ACK:
4715                 what = WRITE_ACKED_BY_PEER;
4716                 break;
4717         case P_RECV_ACK:
4718                 what = RECV_ACKED_BY_PEER;
4719                 break;
4720         case P_DISCARD_WRITE:
4721                 what = DISCARD_WRITE;
4722                 break;
4723         case P_RETRY_WRITE:
4724                 what = POSTPONE_WRITE;
4725                 break;
4726         default:
4727                 BUG();
4728         }
4729
4730         return validate_req_change_req_state(mdev, p->block_id, sector,
4731                                              &mdev->write_requests, __func__,
4732                                              what, false);
4733 }
4734
4735 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4736 {
4737         struct drbd_conf *mdev;
4738         struct p_block_ack *p = pi->data;
4739         sector_t sector = be64_to_cpu(p->sector);
4740         int size = be32_to_cpu(p->blksize);
4741         int err;
4742
4743         mdev = vnr_to_mdev(tconn, pi->vnr);
4744         if (!mdev)
4745                 return -EIO;
4746
4747         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4748
4749         if (p->block_id == ID_SYNCER) {
4750                 dec_rs_pending(mdev);
4751                 drbd_rs_failed_io(mdev, sector, size);
4752                 return 0;
4753         }
4754
4755         err = validate_req_change_req_state(mdev, p->block_id, sector,
4756                                             &mdev->write_requests, __func__,
4757                                             NEG_ACKED, true);
4758         if (err) {
4759                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4760                    The master bio might already be completed, therefore the
4761                    request is no longer in the collision hash. */
4762                 /* In Protocol B we might already have got a P_RECV_ACK
4763                    but then get a P_NEG_ACK afterwards. */
4764                 drbd_set_out_of_sync(mdev, sector, size);
4765         }
4766         return 0;
4767 }
4768
4769 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4770 {
4771         struct drbd_conf *mdev;
4772         struct p_block_ack *p = pi->data;
4773         sector_t sector = be64_to_cpu(p->sector);
4774
4775         mdev = vnr_to_mdev(tconn, pi->vnr);
4776         if (!mdev)
4777                 return -EIO;
4778
4779         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4780
4781         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4782             (unsigned long long)sector, be32_to_cpu(p->blksize));
4783
4784         return validate_req_change_req_state(mdev, p->block_id, sector,
4785                                              &mdev->read_requests, __func__,
4786                                              NEG_ACKED, false);
4787 }
4788
4789 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4790 {
4791         struct drbd_conf *mdev;
4792         sector_t sector;
4793         int size;
4794         struct p_block_ack *p = pi->data;
4795
4796         mdev = vnr_to_mdev(tconn, pi->vnr);
4797         if (!mdev)
4798                 return -EIO;
4799
4800         sector = be64_to_cpu(p->sector);
4801         size = be32_to_cpu(p->blksize);
4802
4803         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4804
4805         dec_rs_pending(mdev);
4806
4807         if (get_ldev_if_state(mdev, D_FAILED)) {
4808                 drbd_rs_complete_io(mdev, sector);
4809                 switch (pi->cmd) {
4810                 case P_NEG_RS_DREPLY:
4811                         drbd_rs_failed_io(mdev, sector, size);
4812                 case P_RS_CANCEL:
4813                         break;
4814                 default:
4815                         BUG();
4816                 }
4817                 put_ldev(mdev);
4818         }
4819
4820         return 0;
4821 }
4822
4823 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4824 {
4825         struct drbd_conf *mdev;
4826         struct p_barrier_ack *p = pi->data;
4827
4828         mdev = vnr_to_mdev(tconn, pi->vnr);
4829         if (!mdev)
4830                 return -EIO;
4831
4832         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4833
4834         if (mdev->state.conn == C_AHEAD &&
4835             atomic_read(&mdev->ap_in_flight) == 0 &&
4836             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4837                 mdev->start_resync_timer.expires = jiffies + HZ;
4838                 add_timer(&mdev->start_resync_timer);
4839         }
4840
4841         return 0;
4842 }
4843
4844 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4845 {
4846         struct drbd_conf *mdev;
4847         struct p_block_ack *p = pi->data;
4848         struct drbd_work *w;
4849         sector_t sector;
4850         int size;
4851
4852         mdev = vnr_to_mdev(tconn, pi->vnr);
4853         if (!mdev)
4854                 return -EIO;
4855
4856         sector = be64_to_cpu(p->sector);
4857         size = be32_to_cpu(p->blksize);
4858
4859         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4860
4861         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4862                 drbd_ov_out_of_sync_found(mdev, sector, size);
4863         else
4864                 ov_out_of_sync_print(mdev);
4865
4866         if (!get_ldev(mdev))
4867                 return 0;
4868
4869         drbd_rs_complete_io(mdev, sector);
4870         dec_rs_pending(mdev);
4871
4872         --mdev->ov_left;
4873
4874         /* let's advance progress step marks only for every other megabyte */
4875         if ((mdev->ov_left & 0x200) == 0x200)
4876                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4877
4878         if (mdev->ov_left == 0) {
4879                 w = kmalloc(sizeof(*w), GFP_NOIO);
4880                 if (w) {
4881                         w->cb = w_ov_finished;
4882                         w->mdev = mdev;
4883                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4884                 } else {
4885                         dev_err(DEV, "kmalloc(w) failed.");
4886                         ov_out_of_sync_print(mdev);
4887                         drbd_resync_finished(mdev);
4888                 }
4889         }
4890         put_ldev(mdev);
4891         return 0;
4892 }
4893
4894 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4895 {
4896         return 0;
4897 }
4898
4899 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4900 {
4901         struct drbd_conf *mdev;
4902         int i, not_empty = 0;
4903
4904         do {
4905                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4906                 flush_signals(current);
4907                 down_read(&drbd_cfg_rwsem);
4908                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4909                         if (drbd_finish_peer_reqs(mdev)) {
4910                                 up_read(&drbd_cfg_rwsem);
4911                                 return 1; /* error */
4912                         }
4913                 }
4914                 up_read(&drbd_cfg_rwsem);
4915                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4916
4917                 spin_lock_irq(&tconn->req_lock);
4918                 rcu_read_lock();
4919                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4920                         not_empty = !list_empty(&mdev->done_ee);
4921                         if (not_empty)
4922                                 break;
4923                 }
4924                 rcu_read_unlock();
4925                 spin_unlock_irq(&tconn->req_lock);
4926         } while (not_empty);
4927
4928         return 0;
4929 }
4930
4931 struct asender_cmd {
4932         size_t pkt_size;
4933         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4934 };
4935
4936 static struct asender_cmd asender_tbl[] = {
4937         [P_PING]            = { 0, got_Ping },
4938         [P_PING_ACK]        = { 0, got_PingAck },
4939         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4940         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4941         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4942         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4943         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4944         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4945         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4946         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4947         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4948         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4949         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4950         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4951         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4952         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4953         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4954 };
4955
4956 int drbd_asender(struct drbd_thread *thi)
4957 {
4958         struct drbd_tconn *tconn = thi->tconn;
4959         struct asender_cmd *cmd = NULL;
4960         struct packet_info pi;
4961         int rv;
4962         void *buf    = tconn->meta.rbuf;
4963         int received = 0;
4964         unsigned int header_size = drbd_header_size(tconn);
4965         int expect   = header_size;
4966         bool ping_timeout_active = false;
4967         struct net_conf *nc;
4968         int ping_timeo, no_cork, ping_int;
4969
4970         current->policy = SCHED_RR;  /* Make this a realtime task! */
4971         current->rt_priority = 2;    /* more important than all other tasks */
4972
4973         while (get_t_state(thi) == RUNNING) {
4974                 drbd_thread_current_set_cpu(thi);
4975
4976                 rcu_read_lock();
4977                 nc = rcu_dereference(tconn->net_conf);
4978                 ping_timeo = nc->ping_timeo;
4979                 no_cork = nc->no_cork;
4980                 ping_int = nc->ping_int;
4981                 rcu_read_unlock();
4982
4983                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4984                         if (drbd_send_ping(tconn)) {
4985                                 conn_err(tconn, "drbd_send_ping has failed\n");
4986                                 goto reconnect;
4987                         }
4988                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
4989                         ping_timeout_active = true;
4990                 }
4991
4992                 /* TODO: conditionally cork; it may hurt latency if we cork without
4993                    much to send */
4994                 if (!no_cork)
4995                         drbd_tcp_cork(tconn->meta.socket);
4996                 if (tconn_finish_peer_reqs(tconn)) {
4997                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
4998                         goto reconnect;
4999                 }
5000                 /* but unconditionally uncork unless disabled */
5001                 if (!no_cork)
5002                         drbd_tcp_uncork(tconn->meta.socket);
5003
5004                 /* short circuit, recv_msg would return EINTR anyways. */
5005                 if (signal_pending(current))
5006                         continue;
5007
5008                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5009                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5010
5011                 flush_signals(current);
5012
5013                 /* Note:
5014                  * -EINTR        (on meta) we got a signal
5015                  * -EAGAIN       (on meta) rcvtimeo expired
5016                  * -ECONNRESET   other side closed the connection
5017                  * -ERESTARTSYS  (on data) we got a signal
5018                  * rv <  0       other than above: unexpected error!
5019                  * rv == expected: full header or command
5020                  * rv <  expected: "woken" by signal during receive
5021                  * rv == 0       : "connection shut down by peer"
5022                  */
5023                 if (likely(rv > 0)) {
5024                         received += rv;
5025                         buf      += rv;
5026                 } else if (rv == 0) {
5027                         conn_err(tconn, "meta connection shut down by peer.\n");
5028                         goto reconnect;
5029                 } else if (rv == -EAGAIN) {
5030                         /* If the data socket received something meanwhile,
5031                          * that is good enough: peer is still alive. */
5032                         if (time_after(tconn->last_received,
5033                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5034                                 continue;
5035                         if (ping_timeout_active) {
5036                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5037                                 goto reconnect;
5038                         }
5039                         set_bit(SEND_PING, &tconn->flags);
5040                         continue;
5041                 } else if (rv == -EINTR) {
5042                         continue;
5043                 } else {
5044                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5045                         goto reconnect;
5046                 }
5047
5048                 if (received == expect && cmd == NULL) {
5049                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5050                                 goto reconnect;
5051                         cmd = &asender_tbl[pi.cmd];
5052                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5053                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
5054                                         pi.cmd, pi.size);
5055                                 goto disconnect;
5056                         }
5057                         expect = header_size + cmd->pkt_size;
5058                         if (pi.size != expect - header_size) {
5059                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5060                                         pi.cmd, pi.size);
5061                                 goto reconnect;
5062                         }
5063                 }
5064                 if (received == expect) {
5065                         bool err;
5066
5067                         err = cmd->fn(tconn, &pi);
5068                         if (err) {
5069                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5070                                 goto reconnect;
5071                         }
5072
5073                         tconn->last_received = jiffies;
5074
5075                         if (cmd == &asender_tbl[P_PING_ACK]) {
5076                                 /* restore idle timeout */
5077                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5078                                 ping_timeout_active = false;
5079                         }
5080
5081                         buf      = tconn->meta.rbuf;
5082                         received = 0;
5083                         expect   = header_size;
5084                         cmd      = NULL;
5085                 }
5086         }
5087
5088         if (0) {
5089 reconnect:
5090                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5091         }
5092         if (0) {
5093 disconnect:
5094                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5095         }
5096         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5097
5098         conn_info(tconn, "asender terminated\n");
5099
5100         return 0;
5101 }