4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
67 * some helper functions to deal with single linked page lists,
68 * page->private being our "next" pointer.
71 /* If at least n pages are linked at head, get n pages off.
72 * Otherwise, don't modify head, and return NULL.
73 * Locking is the responsibility of the caller.
75 static struct page *page_chain_del(struct page **head, int n)
89 tmp = page_chain_next(page);
91 break; /* found sufficient pages */
93 /* insufficient pages, don't use any of them. */
98 /* add end of list marker for the returned list */
99 set_page_private(page, 0);
100 /* actual return value, and adjustment of head */
106 /* may be used outside of locks to find the tail of a (usually short)
107 * "private" page chain, before adding it back to a global chain head
108 * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
113 while ((tmp = page_chain_next(page)))
120 static int page_chain_free(struct page *page)
124 page_chain_for_each_safe(page, tmp) {
131 static void page_chain_add(struct page **head,
132 struct page *chain_first, struct page *chain_last)
136 tmp = page_chain_tail(chain_first, NULL);
137 BUG_ON(tmp != chain_last);
140 /* add chain to head */
141 set_page_private(chain_last, (unsigned long)*head);
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
147 struct page *page = NULL;
148 struct page *tmp = NULL;
151 /* Yes, testing drbd_pp_vacant outside the lock is racy.
152 * So what. It saves a spin_lock. */
153 if (drbd_pp_vacant >= number) {
154 spin_lock(&drbd_pp_lock);
155 page = page_chain_del(&drbd_pp_pool, number);
157 drbd_pp_vacant -= number;
158 spin_unlock(&drbd_pp_lock);
163 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 * which in turn might block on the other node at this very place. */
166 for (i = 0; i < number; i++) {
167 tmp = alloc_page(GFP_TRY);
170 set_page_private(tmp, (unsigned long)page);
177 /* Not enough pages immediately available this time.
178 * No need to jump around here, drbd_pp_alloc will retry this
179 * function "soon". */
181 tmp = page_chain_tail(page, NULL);
182 spin_lock(&drbd_pp_lock);
183 page_chain_add(&drbd_pp_pool, page, tmp);
185 spin_unlock(&drbd_pp_lock);
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
192 struct drbd_peer_request *peer_req;
193 struct list_head *le, *tle;
195 /* The EEs are always appended to the end of the list. Since
196 they are sent in order over the wire, they have to finish
197 in order. As soon as we see the first not finished we can
198 stop to examine the list... */
200 list_for_each_safe(le, tle, &mdev->net_ee) {
201 peer_req = list_entry(le, struct drbd_peer_request, w.list);
202 if (drbd_ee_has_active_page(peer_req))
204 list_move(le, to_be_freed);
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
210 LIST_HEAD(reclaimed);
211 struct drbd_peer_request *peer_req, *t;
213 spin_lock_irq(&mdev->tconn->req_lock);
214 reclaim_net_ee(mdev, &reclaimed);
215 spin_unlock_irq(&mdev->tconn->req_lock);
217 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
218 drbd_free_net_ee(mdev, peer_req);
222 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223 * @mdev: DRBD device.
224 * @number: number of pages requested
225 * @retry: whether to retry, if not enough pages are available right now
227 * Tries to allocate number pages, first from our own page pool, then from
228 * the kernel, unless this allocation would exceed the max_buffers setting.
229 * Possibly retry until DRBD frees sufficient pages somewhere else.
231 * Returns a page chain linked via page->private.
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
235 struct page *page = NULL;
238 /* Yes, we may run up to @number over max_buffers. If we
239 * follow it strictly, the admin will get it wrong anyways. */
240 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
241 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
243 while (page == NULL) {
244 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
246 drbd_kick_lo_and_reclaim_net(mdev);
248 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
249 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
257 if (signal_pending(current)) {
258 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
264 finish_wait(&drbd_pp_wait, &wait);
267 atomic_add(number, &mdev->pp_in_use);
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
273 * Either links the page chain back to the global pool,
274 * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
277 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
280 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281 i = page_chain_free(page);
284 tmp = page_chain_tail(page, &i);
285 spin_lock(&drbd_pp_lock);
286 page_chain_add(&drbd_pp_pool, page, tmp);
288 spin_unlock(&drbd_pp_lock);
290 i = atomic_sub_return(i, a);
292 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294 wake_up(&drbd_pp_wait);
298 You need to hold the req_lock:
299 _drbd_wait_ee_list_empty()
301 You must not have the req_lock:
307 drbd_process_done_ee()
309 drbd_wait_ee_list_empty()
312 struct drbd_peer_request *
313 drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
314 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
316 struct drbd_peer_request *peer_req;
318 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
320 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
325 if (!(gfp_mask & __GFP_NOWARN))
326 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
330 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
334 drbd_clear_interval(&peer_req->i);
335 peer_req->i.size = data_size;
336 peer_req->i.sector = sector;
337 peer_req->i.local = false;
338 peer_req->i.waiting = false;
340 peer_req->epoch = NULL;
341 peer_req->mdev = mdev;
342 peer_req->pages = page;
343 atomic_set(&peer_req->pending_bios, 0);
346 * The block_id is opaque to the receiver. It is not endianness
347 * converted, and sent back to the sender unchanged.
349 peer_req->block_id = id;
354 mempool_free(peer_req, drbd_ee_mempool);
358 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
361 if (peer_req->flags & EE_HAS_DIGEST)
362 kfree(peer_req->digest);
363 drbd_pp_free(mdev, peer_req->pages, is_net);
364 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
365 D_ASSERT(drbd_interval_empty(&peer_req->i));
366 mempool_free(peer_req, drbd_ee_mempool);
369 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
371 LIST_HEAD(work_list);
372 struct drbd_peer_request *peer_req, *t;
374 int is_net = list == &mdev->net_ee;
376 spin_lock_irq(&mdev->tconn->req_lock);
377 list_splice_init(list, &work_list);
378 spin_unlock_irq(&mdev->tconn->req_lock);
380 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
381 drbd_free_some_ee(mdev, peer_req, is_net);
389 * This function is called from _asender only_
390 * but see also comments in _req_mod(,BARRIER_ACKED)
391 * and receive_Barrier.
393 * Move entries from net_ee to done_ee, if ready.
394 * Grab done_ee, call all callbacks, free the entries.
395 * The callbacks typically send out ACKs.
397 static int drbd_process_done_ee(struct drbd_conf *mdev)
399 LIST_HEAD(work_list);
400 LIST_HEAD(reclaimed);
401 struct drbd_peer_request *peer_req, *t;
402 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404 spin_lock_irq(&mdev->tconn->req_lock);
405 reclaim_net_ee(mdev, &reclaimed);
406 list_splice_init(&mdev->done_ee, &work_list);
407 spin_unlock_irq(&mdev->tconn->req_lock);
409 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
410 drbd_free_net_ee(mdev, peer_req);
412 /* possible callbacks here:
413 * e_end_block, and e_end_resync_block, e_send_discard_ack.
414 * all ignore the last argument.
416 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
417 /* list_del not necessary, next/prev members not touched */
418 ok = peer_req->w.cb(mdev, &peer_req->w, !ok) && ok;
419 drbd_free_ee(mdev, peer_req);
421 wake_up(&mdev->ee_wait);
426 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
430 /* avoids spin_lock/unlock
431 * and calling prepare_to_wait in the fast path */
432 while (!list_empty(head)) {
433 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
434 spin_unlock_irq(&mdev->tconn->req_lock);
436 finish_wait(&mdev->ee_wait, &wait);
437 spin_lock_irq(&mdev->tconn->req_lock);
441 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
443 spin_lock_irq(&mdev->tconn->req_lock);
444 _drbd_wait_ee_list_empty(mdev, head);
445 spin_unlock_irq(&mdev->tconn->req_lock);
448 /* see also kernel_accept; which is only present since 2.6.18.
449 * also we want to log which part of it failed, exactly */
450 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
452 struct sock *sk = sock->sk;
456 err = sock->ops->listen(sock, 5);
460 *what = "sock_create_lite";
461 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
467 err = sock->ops->accept(sock, *newsock, 0);
469 sock_release(*newsock);
473 (*newsock)->ops = sock->ops;
479 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
486 struct msghdr msg = {
488 .msg_iov = (struct iovec *)&iov,
489 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
495 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
501 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
508 struct msghdr msg = {
510 .msg_iov = (struct iovec *)&iov,
511 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
519 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
524 * ECONNRESET other side closed the connection
525 * ERESTARTSYS (on sock) we got a signal
529 if (rv == -ECONNRESET)
530 conn_info(tconn, "sock was reset by peer\n");
531 else if (rv != -ERESTARTSYS)
532 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
534 } else if (rv == 0) {
535 conn_info(tconn, "sock was shut down by peer\n");
538 /* signal came in, or peer/link went down,
539 * after we read a partial message
541 /* D_ASSERT(signal_pending(current)); */
549 drbd_force_state(tconn->volume0, NS(conn, C_BROKEN_PIPE));
555 * On individual connections, the socket buffer size must be set prior to the
556 * listen(2) or connect(2) calls in order to have it take effect.
557 * This is our wrapper to do so.
559 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
562 /* open coded SO_SNDBUF, SO_RCVBUF */
564 sock->sk->sk_sndbuf = snd;
565 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
568 sock->sk->sk_rcvbuf = rcv;
569 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
573 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
577 struct sockaddr_in6 src_in6;
579 int disconnect_on_error = 1;
581 if (!get_net_conf(tconn))
584 what = "sock_create_kern";
585 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
586 SOCK_STREAM, IPPROTO_TCP, &sock);
592 sock->sk->sk_rcvtimeo =
593 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
594 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
595 tconn->net_conf->rcvbuf_size);
597 /* explicitly bind to the configured IP as source IP
598 * for the outgoing connections.
599 * This is needed for multihomed hosts and to be
600 * able to use lo: interfaces for drbd.
601 * Make sure to use 0 as port number, so linux selects
602 * a free one dynamically.
604 memcpy(&src_in6, tconn->net_conf->my_addr,
605 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
606 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
607 src_in6.sin6_port = 0;
609 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
611 what = "bind before connect";
612 err = sock->ops->bind(sock,
613 (struct sockaddr *) &src_in6,
614 tconn->net_conf->my_addr_len);
618 /* connect may fail, peer not yet available.
619 * stay C_WF_CONNECTION, don't go Disconnecting! */
620 disconnect_on_error = 0;
622 err = sock->ops->connect(sock,
623 (struct sockaddr *)tconn->net_conf->peer_addr,
624 tconn->net_conf->peer_addr_len, 0);
633 /* timeout, busy, signal pending */
634 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
635 case EINTR: case ERESTARTSYS:
636 /* peer not (yet) available, network problem */
637 case ECONNREFUSED: case ENETUNREACH:
638 case EHOSTDOWN: case EHOSTUNREACH:
639 disconnect_on_error = 0;
642 conn_err(tconn, "%s failed, err = %d\n", what, err);
644 if (disconnect_on_error)
645 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
651 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
654 struct socket *s_estab = NULL, *s_listen;
657 if (!get_net_conf(tconn))
660 what = "sock_create_kern";
661 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
662 SOCK_STREAM, IPPROTO_TCP, &s_listen);
668 timeo = tconn->net_conf->try_connect_int * HZ;
669 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
671 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
672 s_listen->sk->sk_rcvtimeo = timeo;
673 s_listen->sk->sk_sndtimeo = timeo;
674 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
675 tconn->net_conf->rcvbuf_size);
677 what = "bind before listen";
678 err = s_listen->ops->bind(s_listen,
679 (struct sockaddr *) tconn->net_conf->my_addr,
680 tconn->net_conf->my_addr_len);
684 err = drbd_accept(&what, s_listen, &s_estab);
688 sock_release(s_listen);
690 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
691 conn_err(tconn, "%s failed, err = %d\n", what, err);
692 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
700 static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
702 struct p_header *h = &tconn->data.sbuf.header;
704 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
707 static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
709 struct p_header80 *h = &tconn->data.rbuf.header.h80;
712 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
714 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
715 return be16_to_cpu(h->command);
721 * drbd_socket_okay() - Free the socket if its connection is not okay
722 * @sock: pointer to the pointer to the socket.
724 static int drbd_socket_okay(struct socket **sock)
732 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
734 if (rr > 0 || rr == -EAGAIN) {
745 * 1 yes, we have a valid connection
746 * 0 oops, did not work out, please try again
747 * -1 peer talks different language,
748 * no point in trying again, please go standalone.
749 * -2 We do not have a network config...
751 static int drbd_connect(struct drbd_conf *mdev)
753 struct socket *s, *sock, *msock;
756 D_ASSERT(!mdev->tconn->data.socket);
758 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
761 clear_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
762 mdev->tconn->agreed_pro_version = 99;
763 /* agreed_pro_version must be smaller than 100 so we send the old
764 header (h80) in the first packet and in the handshake packet. */
771 /* 3 tries, this should take less than a second! */
772 s = drbd_try_connect(mdev->tconn);
775 /* give the other side time to call bind() & listen() */
776 schedule_timeout_interruptible(HZ / 10);
781 drbd_send_fp(mdev->tconn, s, P_HAND_SHAKE_S);
785 drbd_send_fp(mdev->tconn, s, P_HAND_SHAKE_M);
789 dev_err(DEV, "Logic error in drbd_connect()\n");
790 goto out_release_sockets;
795 schedule_timeout_interruptible(mdev->tconn->net_conf->ping_timeo*HZ/10);
796 ok = drbd_socket_okay(&sock);
797 ok = drbd_socket_okay(&msock) && ok;
803 s = drbd_wait_for_connect(mdev->tconn);
805 try = drbd_recv_fp(mdev->tconn, s);
806 drbd_socket_okay(&sock);
807 drbd_socket_okay(&msock);
811 dev_warn(DEV, "initial packet S crossed\n");
818 dev_warn(DEV, "initial packet M crossed\n");
822 set_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
825 dev_warn(DEV, "Error receiving initial packet\n");
832 if (mdev->state.conn <= C_DISCONNECTING)
833 goto out_release_sockets;
834 if (signal_pending(current)) {
835 flush_signals(current);
837 if (get_t_state(&mdev->tconn->receiver) == EXITING)
838 goto out_release_sockets;
842 ok = drbd_socket_okay(&sock);
843 ok = drbd_socket_okay(&msock) && ok;
849 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
850 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
852 sock->sk->sk_allocation = GFP_NOIO;
853 msock->sk->sk_allocation = GFP_NOIO;
855 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
856 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
859 * sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
860 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
861 * first set it to the P_HAND_SHAKE timeout,
862 * which we set to 4x the configured ping_timeout. */
863 sock->sk->sk_sndtimeo =
864 sock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_timeo*4*HZ/10;
866 msock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
867 msock->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
869 /* we don't want delays.
870 * we use TCP_CORK where appropriate, though */
871 drbd_tcp_nodelay(sock);
872 drbd_tcp_nodelay(msock);
874 mdev->tconn->data.socket = sock;
875 mdev->tconn->meta.socket = msock;
876 mdev->tconn->last_received = jiffies;
878 D_ASSERT(mdev->tconn->asender.task == NULL);
880 h = drbd_do_handshake(mdev);
884 if (mdev->tconn->cram_hmac_tfm) {
885 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
886 switch (drbd_do_auth(mdev)) {
888 dev_err(DEV, "Authentication of peer failed\n");
891 dev_err(DEV, "Authentication of peer failed, trying again.\n");
896 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
899 sock->sk->sk_sndtimeo = mdev->tconn->net_conf->timeout*HZ/10;
900 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
902 atomic_set(&mdev->packet_seq, 0);
905 drbd_thread_start(&mdev->tconn->asender);
907 if (drbd_send_protocol(mdev) == -1)
909 drbd_send_sync_param(mdev, &mdev->sync_conf);
910 drbd_send_sizes(mdev, 0, 0);
911 drbd_send_uuids(mdev);
912 drbd_send_state(mdev);
913 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
914 clear_bit(RESIZE_PENDING, &mdev->flags);
915 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
927 static bool decode_header(struct drbd_conf *mdev, struct p_header *h,
928 enum drbd_packet *cmd, unsigned int *packet_size)
930 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
931 *cmd = be16_to_cpu(h->h80.command);
932 *packet_size = be16_to_cpu(h->h80.length);
933 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
934 *cmd = be16_to_cpu(h->h95.command);
935 *packet_size = be32_to_cpu(h->h95.length) & 0x00ffffff;
937 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
938 be32_to_cpu(h->h80.magic),
939 be16_to_cpu(h->h80.command),
940 be16_to_cpu(h->h80.length));
946 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packet *cmd,
947 unsigned int *packet_size)
949 struct p_header *h = &mdev->tconn->data.rbuf.header;
952 r = drbd_recv(mdev->tconn, h, sizeof(*h));
953 if (unlikely(r != sizeof(*h))) {
954 if (!signal_pending(current))
955 dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
959 r = decode_header(mdev, h, cmd, packet_size);
960 mdev->tconn->last_received = jiffies;
965 static void drbd_flush(struct drbd_conf *mdev)
969 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
970 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
973 dev_err(DEV, "local disk flush failed with status %d\n", rv);
974 /* would rather check on EOPNOTSUPP, but that is not reliable.
975 * don't try again for ANY return value != 0
976 * if (rv == -EOPNOTSUPP) */
977 drbd_bump_write_ordering(mdev, WO_drain_io);
984 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
985 * @mdev: DRBD device.
986 * @epoch: Epoch object.
989 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
990 struct drbd_epoch *epoch,
994 struct drbd_epoch *next_epoch;
995 enum finish_epoch rv = FE_STILL_LIVE;
997 spin_lock(&mdev->epoch_lock);
1001 epoch_size = atomic_read(&epoch->epoch_size);
1003 switch (ev & ~EV_CLEANUP) {
1005 atomic_dec(&epoch->active);
1007 case EV_GOT_BARRIER_NR:
1008 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1010 case EV_BECAME_LAST:
1015 if (epoch_size != 0 &&
1016 atomic_read(&epoch->active) == 0 &&
1017 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1018 if (!(ev & EV_CLEANUP)) {
1019 spin_unlock(&mdev->epoch_lock);
1020 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1021 spin_lock(&mdev->epoch_lock);
1025 if (mdev->current_epoch != epoch) {
1026 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1027 list_del(&epoch->list);
1028 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1032 if (rv == FE_STILL_LIVE)
1036 atomic_set(&epoch->epoch_size, 0);
1037 /* atomic_set(&epoch->active, 0); is already zero */
1038 if (rv == FE_STILL_LIVE)
1040 wake_up(&mdev->ee_wait);
1050 spin_unlock(&mdev->epoch_lock);
1056 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1057 * @mdev: DRBD device.
1058 * @wo: Write ordering method to try.
1060 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1062 enum write_ordering_e pwo;
1063 static char *write_ordering_str[] = {
1065 [WO_drain_io] = "drain",
1066 [WO_bdev_flush] = "flush",
1069 pwo = mdev->write_ordering;
1071 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1073 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1075 mdev->write_ordering = wo;
1076 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1077 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1082 * @mdev: DRBD device.
1083 * @peer_req: peer request
1084 * @rw: flag field, see bio->bi_rw
1086 * May spread the pages to multiple bios,
1087 * depending on bio_add_page restrictions.
1089 * Returns 0 if all bios have been submitted,
1090 * -ENOMEM if we could not allocate enough bios,
1091 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1092 * single page to an empty bio (which should never happen and likely indicates
1093 * that the lower level IO stack is in some way broken). This has been observed
1094 * on certain Xen deployments.
1096 /* TODO allocate from our own bio_set. */
1097 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
1098 const unsigned rw, const int fault_type)
1100 struct bio *bios = NULL;
1102 struct page *page = peer_req->pages;
1103 sector_t sector = peer_req->i.sector;
1104 unsigned ds = peer_req->i.size;
1105 unsigned n_bios = 0;
1106 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1109 /* In most cases, we will only need one bio. But in case the lower
1110 * level restrictions happen to be different at this offset on this
1111 * side than those of the sending peer, we may need to submit the
1112 * request in more than one bio. */
1114 bio = bio_alloc(GFP_NOIO, nr_pages);
1116 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1119 /* > peer_req->i.sector, unless this is the first bio */
1120 bio->bi_sector = sector;
1121 bio->bi_bdev = mdev->ldev->backing_bdev;
1123 bio->bi_private = peer_req;
1124 bio->bi_end_io = drbd_endio_sec;
1126 bio->bi_next = bios;
1130 page_chain_for_each(page) {
1131 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1132 if (!bio_add_page(bio, page, len, 0)) {
1133 /* A single page must always be possible!
1134 * But in case it fails anyways,
1135 * we deal with it, and complain (below). */
1136 if (bio->bi_vcnt == 0) {
1138 "bio_add_page failed for len=%u, "
1139 "bi_vcnt=0 (bi_sector=%llu)\n",
1140 len, (unsigned long long)bio->bi_sector);
1150 D_ASSERT(page == NULL);
1153 atomic_set(&peer_req->pending_bios, n_bios);
1156 bios = bios->bi_next;
1157 bio->bi_next = NULL;
1159 drbd_generic_make_request(mdev, fault_type, bio);
1166 bios = bios->bi_next;
1172 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1173 struct drbd_peer_request *peer_req)
1175 struct drbd_interval *i = &peer_req->i;
1177 drbd_remove_interval(&mdev->write_requests, i);
1178 drbd_clear_interval(i);
1180 /* Wake up any processes waiting for this peer request to complete. */
1182 wake_up(&mdev->misc_wait);
1185 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1186 unsigned int data_size)
1189 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
1190 struct drbd_epoch *epoch;
1194 mdev->current_epoch->barrier_nr = p->barrier;
1195 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1197 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1198 * the activity log, which means it would not be resynced in case the
1199 * R_PRIMARY crashes now.
1200 * Therefore we must send the barrier_ack after the barrier request was
1202 switch (mdev->write_ordering) {
1204 if (rv == FE_RECYCLED)
1207 /* receiver context, in the writeout path of the other node.
1208 * avoid potential distributed deadlock */
1209 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1213 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1218 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1221 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1222 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1227 epoch = mdev->current_epoch;
1228 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1230 D_ASSERT(atomic_read(&epoch->active) == 0);
1231 D_ASSERT(epoch->flags == 0);
1235 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1240 atomic_set(&epoch->epoch_size, 0);
1241 atomic_set(&epoch->active, 0);
1243 spin_lock(&mdev->epoch_lock);
1244 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1245 list_add(&epoch->list, &mdev->current_epoch->list);
1246 mdev->current_epoch = epoch;
1249 /* The current_epoch got recycled while we allocated this one... */
1252 spin_unlock(&mdev->epoch_lock);
1257 /* used from receive_RSDataReply (recv_resync_read)
1258 * and from receive_Data */
1259 static struct drbd_peer_request *
1260 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1261 int data_size) __must_hold(local)
1263 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1264 struct drbd_peer_request *peer_req;
1267 void *dig_in = mdev->tconn->int_dig_in;
1268 void *dig_vv = mdev->tconn->int_dig_vv;
1269 unsigned long *data;
1271 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1272 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1275 rr = drbd_recv(mdev->tconn, dig_in, dgs);
1277 if (!signal_pending(current))
1279 "short read receiving data digest: read %d expected %d\n",
1287 if (!expect(data_size != 0))
1289 if (!expect(IS_ALIGNED(data_size, 512)))
1291 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1294 /* even though we trust out peer,
1295 * we sometimes have to double check. */
1296 if (sector + (data_size>>9) > capacity) {
1297 dev_err(DEV, "request from peer beyond end of local disk: "
1298 "capacity: %llus < sector: %llus + size: %u\n",
1299 (unsigned long long)capacity,
1300 (unsigned long long)sector, data_size);
1304 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1305 * "criss-cross" setup, that might cause write-out on some other DRBD,
1306 * which in turn might block on the other node at this very place. */
1307 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1312 page = peer_req->pages;
1313 page_chain_for_each(page) {
1314 unsigned len = min_t(int, ds, PAGE_SIZE);
1316 rr = drbd_recv(mdev->tconn, data, len);
1317 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1318 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1319 data[0] = data[0] ^ (unsigned long)-1;
1323 drbd_free_ee(mdev, peer_req);
1324 if (!signal_pending(current))
1325 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1333 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1334 if (memcmp(dig_in, dig_vv, dgs)) {
1335 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1336 (unsigned long long)sector, data_size);
1337 drbd_bcast_ee(mdev, "digest failed",
1338 dgs, dig_in, dig_vv, peer_req);
1339 drbd_free_ee(mdev, peer_req);
1343 mdev->recv_cnt += data_size>>9;
1347 /* drbd_drain_block() just takes a data block
1348 * out of the socket input buffer, and discards it.
1350 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1359 page = drbd_pp_alloc(mdev, 1, 1);
1363 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
1364 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1366 if (!signal_pending(current))
1368 "short read receiving data: read %d expected %d\n",
1369 rr, min_t(int, data_size, PAGE_SIZE));
1375 drbd_pp_free(mdev, page, 0);
1379 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1380 sector_t sector, int data_size)
1382 struct bio_vec *bvec;
1384 int dgs, rr, i, expect;
1385 void *dig_in = mdev->tconn->int_dig_in;
1386 void *dig_vv = mdev->tconn->int_dig_vv;
1388 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1389 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1392 rr = drbd_recv(mdev->tconn, dig_in, dgs);
1394 if (!signal_pending(current))
1396 "short read receiving data reply digest: read %d expected %d\n",
1404 /* optimistically update recv_cnt. if receiving fails below,
1405 * we disconnect anyways, and counters will be reset. */
1406 mdev->recv_cnt += data_size>>9;
1408 bio = req->master_bio;
1409 D_ASSERT(sector == bio->bi_sector);
1411 bio_for_each_segment(bvec, bio, i) {
1412 expect = min_t(int, data_size, bvec->bv_len);
1413 rr = drbd_recv(mdev->tconn,
1414 kmap(bvec->bv_page)+bvec->bv_offset,
1416 kunmap(bvec->bv_page);
1418 if (!signal_pending(current))
1419 dev_warn(DEV, "short read receiving data reply: "
1420 "read %d expected %d\n",
1428 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1429 if (memcmp(dig_in, dig_vv, dgs)) {
1430 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1435 D_ASSERT(data_size == 0);
1439 /* e_end_resync_block() is called via
1440 * drbd_process_done_ee() by asender only */
1441 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1443 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1444 sector_t sector = peer_req->i.sector;
1447 D_ASSERT(drbd_interval_empty(&peer_req->i));
1449 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1450 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1451 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1453 /* Record failure to sync */
1454 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1456 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1463 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1465 struct drbd_peer_request *peer_req;
1467 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1471 dec_rs_pending(mdev);
1474 /* corresponding dec_unacked() in e_end_resync_block()
1475 * respective _drbd_clear_done_ee */
1477 peer_req->w.cb = e_end_resync_block;
1479 spin_lock_irq(&mdev->tconn->req_lock);
1480 list_add(&peer_req->w.list, &mdev->sync_ee);
1481 spin_unlock_irq(&mdev->tconn->req_lock);
1483 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1484 if (drbd_submit_ee(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1487 /* don't care for the reason here */
1488 dev_err(DEV, "submit failed, triggering re-connect\n");
1489 spin_lock_irq(&mdev->tconn->req_lock);
1490 list_del(&peer_req->w.list);
1491 spin_unlock_irq(&mdev->tconn->req_lock);
1493 drbd_free_ee(mdev, peer_req);
1499 static struct drbd_request *
1500 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1501 sector_t sector, bool missing_ok, const char *func)
1503 struct drbd_request *req;
1505 /* Request object according to our peer */
1506 req = (struct drbd_request *)(unsigned long)id;
1507 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1510 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1511 (unsigned long)id, (unsigned long long)sector);
1516 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1517 unsigned int data_size)
1519 struct drbd_request *req;
1522 struct p_data *p = &mdev->tconn->data.rbuf.data;
1524 sector = be64_to_cpu(p->sector);
1526 spin_lock_irq(&mdev->tconn->req_lock);
1527 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1528 spin_unlock_irq(&mdev->tconn->req_lock);
1532 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1533 * special casing it there for the various failure cases.
1534 * still no race with drbd_fail_pending_reads */
1535 ok = recv_dless_read(mdev, req, sector, data_size);
1538 req_mod(req, DATA_RECEIVED);
1539 /* else: nothing. handled from drbd_disconnect...
1540 * I don't think we may complete this just yet
1541 * in case we are "on-disconnect: freeze" */
1546 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1547 unsigned int data_size)
1551 struct p_data *p = &mdev->tconn->data.rbuf.data;
1553 sector = be64_to_cpu(p->sector);
1554 D_ASSERT(p->block_id == ID_SYNCER);
1556 if (get_ldev(mdev)) {
1557 /* data is submitted to disk within recv_resync_read.
1558 * corresponding put_ldev done below on error,
1559 * or in drbd_endio_sec. */
1560 ok = recv_resync_read(mdev, sector, data_size);
1562 if (__ratelimit(&drbd_ratelimit_state))
1563 dev_err(DEV, "Can not write resync data to local disk.\n");
1565 ok = drbd_drain_block(mdev, data_size);
1567 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1570 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1575 /* e_end_block() is called via drbd_process_done_ee().
1576 * this means this function only runs in the asender thread
1578 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1580 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1581 sector_t sector = peer_req->i.sector;
1584 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1585 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1586 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1587 mdev->state.conn <= C_PAUSED_SYNC_T &&
1588 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1589 P_RS_WRITE_ACK : P_WRITE_ACK;
1590 ok &= drbd_send_ack(mdev, pcmd, peer_req);
1591 if (pcmd == P_RS_WRITE_ACK)
1592 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1594 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1595 /* we expect it to be marked out of sync anyways...
1596 * maybe assert this? */
1600 /* we delete from the conflict detection hash _after_ we sent out the
1601 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1602 if (mdev->tconn->net_conf->two_primaries) {
1603 spin_lock_irq(&mdev->tconn->req_lock);
1604 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1605 drbd_remove_epoch_entry_interval(mdev, peer_req);
1606 spin_unlock_irq(&mdev->tconn->req_lock);
1608 D_ASSERT(drbd_interval_empty(&peer_req->i));
1610 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1615 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1617 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1620 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
1621 ok = drbd_send_ack(mdev, P_DISCARD_ACK, peer_req);
1623 spin_lock_irq(&mdev->tconn->req_lock);
1624 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1625 drbd_remove_epoch_entry_interval(mdev, peer_req);
1626 spin_unlock_irq(&mdev->tconn->req_lock);
1633 static bool seq_greater(u32 a, u32 b)
1636 * We assume 32-bit wrap-around here.
1637 * For 24-bit wrap-around, we would have to shift:
1640 return (s32)a - (s32)b > 0;
1643 static u32 seq_max(u32 a, u32 b)
1645 return seq_greater(a, b) ? a : b;
1648 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1650 unsigned int old_peer_seq;
1652 spin_lock(&mdev->peer_seq_lock);
1653 old_peer_seq = mdev->peer_seq;
1654 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1655 spin_unlock(&mdev->peer_seq_lock);
1656 if (old_peer_seq != peer_seq)
1657 wake_up(&mdev->seq_wait);
1660 /* Called from receive_Data.
1661 * Synchronize packets on sock with packets on msock.
1663 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1664 * packet traveling on msock, they are still processed in the order they have
1667 * Note: we don't care for Ack packets overtaking P_DATA packets.
1669 * In case packet_seq is larger than mdev->peer_seq number, there are
1670 * outstanding packets on the msock. We wait for them to arrive.
1671 * In case we are the logically next packet, we update mdev->peer_seq
1672 * ourselves. Correctly handles 32bit wrap around.
1674 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1675 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1676 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1677 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1679 * returns 0 if we may process the packet,
1680 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1681 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1687 spin_lock(&mdev->peer_seq_lock);
1689 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1690 if (!seq_greater(packet_seq, mdev->peer_seq + 1))
1692 if (signal_pending(current)) {
1696 p_seq = mdev->peer_seq;
1697 spin_unlock(&mdev->peer_seq_lock);
1698 timeout = schedule_timeout(30*HZ);
1699 spin_lock(&mdev->peer_seq_lock);
1700 if (timeout == 0 && p_seq == mdev->peer_seq) {
1702 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1706 finish_wait(&mdev->seq_wait, &wait);
1707 if (mdev->peer_seq+1 == packet_seq)
1709 spin_unlock(&mdev->peer_seq_lock);
1713 /* see also bio_flags_to_wire()
1714 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1715 * flags and back. We may replicate to other kernel versions. */
1716 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1718 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1719 (dpf & DP_FUA ? REQ_FUA : 0) |
1720 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1721 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1724 /* mirrored write */
1725 static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1726 unsigned int data_size)
1729 struct drbd_peer_request *peer_req;
1730 struct p_data *p = &mdev->tconn->data.rbuf.data;
1734 if (!get_ldev(mdev)) {
1735 spin_lock(&mdev->peer_seq_lock);
1736 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1738 spin_unlock(&mdev->peer_seq_lock);
1740 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1741 atomic_inc(&mdev->current_epoch->epoch_size);
1742 return drbd_drain_block(mdev, data_size);
1745 /* get_ldev(mdev) successful.
1746 * Corresponding put_ldev done either below (on various errors),
1747 * or in drbd_endio_sec, if we successfully submit the data at
1748 * the end of this function. */
1750 sector = be64_to_cpu(p->sector);
1751 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1757 peer_req->w.cb = e_end_block;
1759 dp_flags = be32_to_cpu(p->dp_flags);
1760 rw |= wire_flags_to_bio(mdev, dp_flags);
1762 if (dp_flags & DP_MAY_SET_IN_SYNC)
1763 peer_req->flags |= EE_MAY_SET_IN_SYNC;
1765 spin_lock(&mdev->epoch_lock);
1766 peer_req->epoch = mdev->current_epoch;
1767 atomic_inc(&peer_req->epoch->epoch_size);
1768 atomic_inc(&peer_req->epoch->active);
1769 spin_unlock(&mdev->epoch_lock);
1771 /* I'm the receiver, I do hold a net_cnt reference. */
1772 if (!mdev->tconn->net_conf->two_primaries) {
1773 spin_lock_irq(&mdev->tconn->req_lock);
1775 /* don't get the req_lock yet,
1776 * we may sleep in drbd_wait_peer_seq */
1777 const int size = peer_req->i.size;
1778 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
1782 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
1784 /* conflict detection and handling:
1785 * 1. wait on the sequence number,
1786 * in case this data packet overtook ACK packets.
1787 * 2. check for conflicting write requests.
1789 * Note: for two_primaries, we are protocol C,
1790 * so there cannot be any request that is DONE
1791 * but still on the transfer log.
1793 * if no conflicting request is found:
1796 * if any conflicting request is found
1797 * that has not yet been acked,
1798 * AND I have the "discard concurrent writes" flag:
1799 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1801 * if any conflicting request is found:
1802 * block the receiver, waiting on misc_wait
1803 * until no more conflicting requests are there,
1804 * or we get interrupted (disconnect).
1806 * we do not just write after local io completion of those
1807 * requests, but only after req is done completely, i.e.
1808 * we wait for the P_DISCARD_ACK to arrive!
1810 * then proceed normally, i.e. submit.
1812 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1813 goto out_interrupted;
1815 spin_lock_irq(&mdev->tconn->req_lock);
1819 struct drbd_interval *i;
1820 int have_unacked = 0;
1821 int have_conflict = 0;
1822 prepare_to_wait(&mdev->misc_wait, &wait,
1823 TASK_INTERRUPTIBLE);
1825 i = drbd_find_overlap(&mdev->write_requests, sector, size);
1827 /* only ALERT on first iteration,
1828 * we may be woken up early... */
1830 dev_alert(DEV, "%s[%u] Concurrent %s write detected!"
1831 " new: %llus +%u; pending: %llus +%u\n",
1832 current->comm, current->pid,
1833 i->local ? "local" : "remote",
1834 (unsigned long long)sector, size,
1835 (unsigned long long)i->sector, i->size);
1838 struct drbd_request *req2;
1840 req2 = container_of(i, struct drbd_request, i);
1841 if (req2->rq_state & RQ_NET_PENDING)
1849 /* Discard Ack only for the _first_ iteration */
1850 if (first && discard && have_unacked) {
1851 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1852 (unsigned long long)sector);
1854 peer_req->w.cb = e_send_discard_ack;
1855 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1857 spin_unlock_irq(&mdev->tconn->req_lock);
1859 /* we could probably send that P_DISCARD_ACK ourselves,
1860 * but I don't like the receiver using the msock */
1863 wake_asender(mdev->tconn);
1864 finish_wait(&mdev->misc_wait, &wait);
1868 if (signal_pending(current)) {
1869 spin_unlock_irq(&mdev->tconn->req_lock);
1870 finish_wait(&mdev->misc_wait, &wait);
1871 goto out_interrupted;
1874 /* Indicate to wake up mdev->misc_wait upon completion. */
1877 spin_unlock_irq(&mdev->tconn->req_lock);
1880 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1881 "sec=%llus\n", (unsigned long long)sector);
1882 } else if (discard) {
1883 /* we had none on the first iteration.
1884 * there must be none now. */
1885 D_ASSERT(have_unacked == 0);
1888 spin_lock_irq(&mdev->tconn->req_lock);
1890 finish_wait(&mdev->misc_wait, &wait);
1892 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1895 list_add(&peer_req->w.list, &mdev->active_ee);
1896 spin_unlock_irq(&mdev->tconn->req_lock);
1898 switch (mdev->tconn->net_conf->wire_protocol) {
1901 /* corresponding dec_unacked() in e_end_block()
1902 * respective _drbd_clear_done_ee */
1905 /* I really don't like it that the receiver thread
1906 * sends on the msock, but anyways */
1907 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
1914 if (mdev->state.pdsk < D_INCONSISTENT) {
1915 /* In case we have the only disk of the cluster, */
1916 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
1917 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
1918 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
1919 drbd_al_begin_io(mdev, peer_req->i.sector);
1922 if (drbd_submit_ee(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
1925 /* don't care for the reason here */
1926 dev_err(DEV, "submit failed, triggering re-connect\n");
1927 spin_lock_irq(&mdev->tconn->req_lock);
1928 list_del(&peer_req->w.list);
1929 drbd_remove_epoch_entry_interval(mdev, peer_req);
1930 spin_unlock_irq(&mdev->tconn->req_lock);
1931 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
1932 drbd_al_complete_io(mdev, peer_req->i.sector);
1935 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
1937 drbd_free_ee(mdev, peer_req);
1941 /* We may throttle resync, if the lower device seems to be busy,
1942 * and current sync rate is above c_min_rate.
1944 * To decide whether or not the lower device is busy, we use a scheme similar
1945 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1946 * (more than 64 sectors) of activity we cannot account for with our own resync
1947 * activity, it obviously is "busy".
1949 * The current sync rate used here uses only the most recent two step marks,
1950 * to have a short time average so we can react faster.
1952 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1954 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1955 unsigned long db, dt, dbdt;
1956 struct lc_element *tmp;
1960 /* feature disabled? */
1961 if (mdev->sync_conf.c_min_rate == 0)
1964 spin_lock_irq(&mdev->al_lock);
1965 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1967 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1968 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1969 spin_unlock_irq(&mdev->al_lock);
1972 /* Do not slow down if app IO is already waiting for this extent */
1974 spin_unlock_irq(&mdev->al_lock);
1976 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1977 (int)part_stat_read(&disk->part0, sectors[1]) -
1978 atomic_read(&mdev->rs_sect_ev);
1980 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1981 unsigned long rs_left;
1984 mdev->rs_last_events = curr_events;
1986 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1988 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1990 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1991 rs_left = mdev->ov_left;
1993 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1995 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1998 db = mdev->rs_mark_left[i] - rs_left;
1999 dbdt = Bit2KB(db/dt);
2001 if (dbdt > mdev->sync_conf.c_min_rate)
2008 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2009 unsigned int digest_size)
2012 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2013 struct drbd_peer_request *peer_req;
2014 struct digest_info *di = NULL;
2016 unsigned int fault_type;
2017 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
2019 sector = be64_to_cpu(p->sector);
2020 size = be32_to_cpu(p->blksize);
2022 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
2023 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2024 (unsigned long long)sector, size);
2027 if (sector + (size>>9) > capacity) {
2028 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2029 (unsigned long long)sector, size);
2033 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2036 case P_DATA_REQUEST:
2037 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2039 case P_RS_DATA_REQUEST:
2040 case P_CSUM_RS_REQUEST:
2042 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2046 dec_rs_pending(mdev);
2047 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2050 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2053 if (verb && __ratelimit(&drbd_ratelimit_state))
2054 dev_err(DEV, "Can not satisfy peer's read request, "
2055 "no local data.\n");
2057 /* drain possibly payload */
2058 return drbd_drain_block(mdev, digest_size);
2061 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2062 * "criss-cross" setup, that might cause write-out on some other DRBD,
2063 * which in turn might block on the other node at this very place. */
2064 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2071 case P_DATA_REQUEST:
2072 peer_req->w.cb = w_e_end_data_req;
2073 fault_type = DRBD_FAULT_DT_RD;
2074 /* application IO, don't drbd_rs_begin_io */
2077 case P_RS_DATA_REQUEST:
2078 peer_req->w.cb = w_e_end_rsdata_req;
2079 fault_type = DRBD_FAULT_RS_RD;
2080 /* used in the sector offset progress display */
2081 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2085 case P_CSUM_RS_REQUEST:
2086 fault_type = DRBD_FAULT_RS_RD;
2087 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2091 di->digest_size = digest_size;
2092 di->digest = (((char *)di)+sizeof(struct digest_info));
2094 peer_req->digest = di;
2095 peer_req->flags |= EE_HAS_DIGEST;
2097 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
2100 if (cmd == P_CSUM_RS_REQUEST) {
2101 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2102 peer_req->w.cb = w_e_end_csum_rs_req;
2103 /* used in the sector offset progress display */
2104 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2105 } else if (cmd == P_OV_REPLY) {
2106 /* track progress, we may need to throttle */
2107 atomic_add(size >> 9, &mdev->rs_sect_in);
2108 peer_req->w.cb = w_e_end_ov_reply;
2109 dec_rs_pending(mdev);
2110 /* drbd_rs_begin_io done when we sent this request,
2111 * but accounting still needs to be done. */
2112 goto submit_for_resync;
2117 if (mdev->ov_start_sector == ~(sector_t)0 &&
2118 mdev->tconn->agreed_pro_version >= 90) {
2119 unsigned long now = jiffies;
2121 mdev->ov_start_sector = sector;
2122 mdev->ov_position = sector;
2123 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2124 mdev->rs_total = mdev->ov_left;
2125 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2126 mdev->rs_mark_left[i] = mdev->ov_left;
2127 mdev->rs_mark_time[i] = now;
2129 dev_info(DEV, "Online Verify start sector: %llu\n",
2130 (unsigned long long)sector);
2132 peer_req->w.cb = w_e_end_ov_req;
2133 fault_type = DRBD_FAULT_RS_RD;
2137 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2139 fault_type = DRBD_FAULT_MAX;
2143 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2144 * wrt the receiver, but it is not as straightforward as it may seem.
2145 * Various places in the resync start and stop logic assume resync
2146 * requests are processed in order, requeuing this on the worker thread
2147 * introduces a bunch of new code for synchronization between threads.
2149 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2150 * "forever", throttling after drbd_rs_begin_io will lock that extent
2151 * for application writes for the same time. For now, just throttle
2152 * here, where the rest of the code expects the receiver to sleep for
2156 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2157 * this defers syncer requests for some time, before letting at least
2158 * on request through. The resync controller on the receiving side
2159 * will adapt to the incoming rate accordingly.
2161 * We cannot throttle here if remote is Primary/SyncTarget:
2162 * we would also throttle its application reads.
2163 * In that case, throttling is done on the SyncTarget only.
2165 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2166 schedule_timeout_uninterruptible(HZ/10);
2167 if (drbd_rs_begin_io(mdev, sector))
2171 atomic_add(size >> 9, &mdev->rs_sect_ev);
2175 spin_lock_irq(&mdev->tconn->req_lock);
2176 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2177 spin_unlock_irq(&mdev->tconn->req_lock);
2179 if (drbd_submit_ee(mdev, peer_req, READ, fault_type) == 0)
2182 /* don't care for the reason here */
2183 dev_err(DEV, "submit failed, triggering re-connect\n");
2184 spin_lock_irq(&mdev->tconn->req_lock);
2185 list_del(&peer_req->w.list);
2186 spin_unlock_irq(&mdev->tconn->req_lock);
2187 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2191 drbd_free_ee(mdev, peer_req);
2195 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2197 int self, peer, rv = -100;
2198 unsigned long ch_self, ch_peer;
2200 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2201 peer = mdev->p_uuid[UI_BITMAP] & 1;
2203 ch_peer = mdev->p_uuid[UI_SIZE];
2204 ch_self = mdev->comm_bm_set;
2206 switch (mdev->tconn->net_conf->after_sb_0p) {
2208 case ASB_DISCARD_SECONDARY:
2209 case ASB_CALL_HELPER:
2210 dev_err(DEV, "Configuration error.\n");
2212 case ASB_DISCONNECT:
2214 case ASB_DISCARD_YOUNGER_PRI:
2215 if (self == 0 && peer == 1) {
2219 if (self == 1 && peer == 0) {
2223 /* Else fall through to one of the other strategies... */
2224 case ASB_DISCARD_OLDER_PRI:
2225 if (self == 0 && peer == 1) {
2229 if (self == 1 && peer == 0) {
2233 /* Else fall through to one of the other strategies... */
2234 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2235 "Using discard-least-changes instead\n");
2236 case ASB_DISCARD_ZERO_CHG:
2237 if (ch_peer == 0 && ch_self == 0) {
2238 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2242 if (ch_peer == 0) { rv = 1; break; }
2243 if (ch_self == 0) { rv = -1; break; }
2245 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2247 case ASB_DISCARD_LEAST_CHG:
2248 if (ch_self < ch_peer)
2250 else if (ch_self > ch_peer)
2252 else /* ( ch_self == ch_peer ) */
2253 /* Well, then use something else. */
2254 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2257 case ASB_DISCARD_LOCAL:
2260 case ASB_DISCARD_REMOTE:
2267 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2271 switch (mdev->tconn->net_conf->after_sb_1p) {
2272 case ASB_DISCARD_YOUNGER_PRI:
2273 case ASB_DISCARD_OLDER_PRI:
2274 case ASB_DISCARD_LEAST_CHG:
2275 case ASB_DISCARD_LOCAL:
2276 case ASB_DISCARD_REMOTE:
2277 dev_err(DEV, "Configuration error.\n");
2279 case ASB_DISCONNECT:
2282 hg = drbd_asb_recover_0p(mdev);
2283 if (hg == -1 && mdev->state.role == R_SECONDARY)
2285 if (hg == 1 && mdev->state.role == R_PRIMARY)
2289 rv = drbd_asb_recover_0p(mdev);
2291 case ASB_DISCARD_SECONDARY:
2292 return mdev->state.role == R_PRIMARY ? 1 : -1;
2293 case ASB_CALL_HELPER:
2294 hg = drbd_asb_recover_0p(mdev);
2295 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2296 enum drbd_state_rv rv2;
2298 drbd_set_role(mdev, R_SECONDARY, 0);
2299 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2300 * we might be here in C_WF_REPORT_PARAMS which is transient.
2301 * we do not need to wait for the after state change work either. */
2302 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2303 if (rv2 != SS_SUCCESS) {
2304 drbd_khelper(mdev, "pri-lost-after-sb");
2306 dev_warn(DEV, "Successfully gave up primary role.\n");
2316 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2320 switch (mdev->tconn->net_conf->after_sb_2p) {
2321 case ASB_DISCARD_YOUNGER_PRI:
2322 case ASB_DISCARD_OLDER_PRI:
2323 case ASB_DISCARD_LEAST_CHG:
2324 case ASB_DISCARD_LOCAL:
2325 case ASB_DISCARD_REMOTE:
2327 case ASB_DISCARD_SECONDARY:
2328 dev_err(DEV, "Configuration error.\n");
2331 rv = drbd_asb_recover_0p(mdev);
2333 case ASB_DISCONNECT:
2335 case ASB_CALL_HELPER:
2336 hg = drbd_asb_recover_0p(mdev);
2338 enum drbd_state_rv rv2;
2340 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2341 * we might be here in C_WF_REPORT_PARAMS which is transient.
2342 * we do not need to wait for the after state change work either. */
2343 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2344 if (rv2 != SS_SUCCESS) {
2345 drbd_khelper(mdev, "pri-lost-after-sb");
2347 dev_warn(DEV, "Successfully gave up primary role.\n");
2357 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2358 u64 bits, u64 flags)
2361 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2364 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2366 (unsigned long long)uuid[UI_CURRENT],
2367 (unsigned long long)uuid[UI_BITMAP],
2368 (unsigned long long)uuid[UI_HISTORY_START],
2369 (unsigned long long)uuid[UI_HISTORY_END],
2370 (unsigned long long)bits,
2371 (unsigned long long)flags);
2375 100 after split brain try auto recover
2376 2 C_SYNC_SOURCE set BitMap
2377 1 C_SYNC_SOURCE use BitMap
2379 -1 C_SYNC_TARGET use BitMap
2380 -2 C_SYNC_TARGET set BitMap
2381 -100 after split brain, disconnect
2382 -1000 unrelated data
2383 -1091 requires proto 91
2384 -1096 requires proto 96
2386 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2391 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2392 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2395 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2399 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2400 peer != UUID_JUST_CREATED)
2404 if (self != UUID_JUST_CREATED &&
2405 (peer == UUID_JUST_CREATED || peer == (u64)0))
2409 int rct, dc; /* roles at crash time */
2411 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2413 if (mdev->tconn->agreed_pro_version < 91)
2416 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2417 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2418 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2419 drbd_uuid_set_bm(mdev, 0UL);
2421 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2422 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2425 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2432 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2434 if (mdev->tconn->agreed_pro_version < 91)
2437 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2438 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2439 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2441 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2442 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2443 mdev->p_uuid[UI_BITMAP] = 0UL;
2445 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2448 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2455 /* Common power [off|failure] */
2456 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2457 (mdev->p_uuid[UI_FLAGS] & 2);
2458 /* lowest bit is set when we were primary,
2459 * next bit (weight 2) is set when peer was primary */
2463 case 0: /* !self_pri && !peer_pri */ return 0;
2464 case 1: /* self_pri && !peer_pri */ return 1;
2465 case 2: /* !self_pri && peer_pri */ return -1;
2466 case 3: /* self_pri && peer_pri */
2467 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2473 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2478 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2480 if (mdev->tconn->agreed_pro_version < 96 ?
2481 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2482 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2483 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2484 /* The last P_SYNC_UUID did not get though. Undo the last start of
2485 resync as sync source modifications of the peer's UUIDs. */
2487 if (mdev->tconn->agreed_pro_version < 91)
2490 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2491 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2493 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2494 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2501 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2502 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2503 peer = mdev->p_uuid[i] & ~((u64)1);
2509 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2510 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2515 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2517 if (mdev->tconn->agreed_pro_version < 96 ?
2518 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2519 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2520 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2521 /* The last P_SYNC_UUID did not get though. Undo the last start of
2522 resync as sync source modifications of our UUIDs. */
2524 if (mdev->tconn->agreed_pro_version < 91)
2527 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2528 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2530 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2531 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2532 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2540 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2541 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2542 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2548 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2549 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2550 if (self == peer && self != ((u64)0))
2554 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2555 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2556 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2557 peer = mdev->p_uuid[j] & ~((u64)1);
2566 /* drbd_sync_handshake() returns the new conn state on success, or
2567 CONN_MASK (-1) on failure.
2569 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2570 enum drbd_disk_state peer_disk) __must_hold(local)
2573 enum drbd_conns rv = C_MASK;
2574 enum drbd_disk_state mydisk;
2576 mydisk = mdev->state.disk;
2577 if (mydisk == D_NEGOTIATING)
2578 mydisk = mdev->new_state_tmp.disk;
2580 dev_info(DEV, "drbd_sync_handshake:\n");
2581 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2582 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2583 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2585 hg = drbd_uuid_compare(mdev, &rule_nr);
2587 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2590 dev_alert(DEV, "Unrelated data, aborting!\n");
2594 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2598 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2599 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2600 int f = (hg == -100) || abs(hg) == 2;
2601 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2604 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2605 hg > 0 ? "source" : "target");
2609 drbd_khelper(mdev, "initial-split-brain");
2611 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2612 int pcount = (mdev->state.role == R_PRIMARY)
2613 + (peer_role == R_PRIMARY);
2614 int forced = (hg == -100);
2618 hg = drbd_asb_recover_0p(mdev);
2621 hg = drbd_asb_recover_1p(mdev);
2624 hg = drbd_asb_recover_2p(mdev);
2627 if (abs(hg) < 100) {
2628 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2629 "automatically solved. Sync from %s node\n",
2630 pcount, (hg < 0) ? "peer" : "this");
2632 dev_warn(DEV, "Doing a full sync, since"
2633 " UUIDs where ambiguous.\n");
2640 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2642 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2646 dev_warn(DEV, "Split-Brain detected, manually solved. "
2647 "Sync from %s node\n",
2648 (hg < 0) ? "peer" : "this");
2652 /* FIXME this log message is not correct if we end up here
2653 * after an attempted attach on a diskless node.
2654 * We just refuse to attach -- well, we drop the "connection"
2655 * to that disk, in a way... */
2656 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2657 drbd_khelper(mdev, "split-brain");
2661 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2662 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2666 if (hg < 0 && /* by intention we do not use mydisk here. */
2667 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2668 switch (mdev->tconn->net_conf->rr_conflict) {
2669 case ASB_CALL_HELPER:
2670 drbd_khelper(mdev, "pri-lost");
2672 case ASB_DISCONNECT:
2673 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2676 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2681 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2683 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2685 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2686 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2687 abs(hg) >= 2 ? "full" : "bit-map based");
2692 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2693 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2694 BM_LOCKED_SET_ALLOWED))
2698 if (hg > 0) { /* become sync source. */
2700 } else if (hg < 0) { /* become sync target */
2704 if (drbd_bm_total_weight(mdev)) {
2705 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2706 drbd_bm_total_weight(mdev));
2713 /* returns 1 if invalid */
2714 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2716 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2717 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2718 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2721 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2722 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2723 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2726 /* everything else is valid if they are equal on both sides. */
2730 /* everything es is invalid. */
2734 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2735 unsigned int data_size)
2737 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
2738 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2739 int p_want_lose, p_two_primaries, cf;
2740 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2742 p_proto = be32_to_cpu(p->protocol);
2743 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2744 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2745 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2746 p_two_primaries = be32_to_cpu(p->two_primaries);
2747 cf = be32_to_cpu(p->conn_flags);
2748 p_want_lose = cf & CF_WANT_LOSE;
2750 clear_bit(CONN_DRY_RUN, &mdev->flags);
2752 if (cf & CF_DRY_RUN)
2753 set_bit(CONN_DRY_RUN, &mdev->flags);
2755 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
2756 dev_err(DEV, "incompatible communication protocols\n");
2760 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
2761 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2765 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
2766 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2770 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
2771 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2775 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
2776 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2780 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
2781 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2785 if (mdev->tconn->agreed_pro_version >= 87) {
2786 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
2788 if (drbd_recv(mdev->tconn, p_integrity_alg, data_size) != data_size)
2791 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2792 if (strcmp(p_integrity_alg, my_alg)) {
2793 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2796 dev_info(DEV, "data-integrity-alg: %s\n",
2797 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2803 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2808 * input: alg name, feature name
2809 * return: NULL (alg name was "")
2810 * ERR_PTR(error) if something goes wrong
2811 * or the crypto hash ptr, if it worked out ok. */
2812 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2813 const char *alg, const char *name)
2815 struct crypto_hash *tfm;
2820 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2822 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2823 alg, name, PTR_ERR(tfm));
2826 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2827 crypto_free_hash(tfm);
2828 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2829 return ERR_PTR(-EINVAL);
2834 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2835 unsigned int packet_size)
2838 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
2839 unsigned int header_size, data_size, exp_max_sz;
2840 struct crypto_hash *verify_tfm = NULL;
2841 struct crypto_hash *csums_tfm = NULL;
2842 const int apv = mdev->tconn->agreed_pro_version;
2843 int *rs_plan_s = NULL;
2846 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2847 : apv == 88 ? sizeof(struct p_rs_param)
2849 : apv <= 94 ? sizeof(struct p_rs_param_89)
2850 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2852 if (packet_size > exp_max_sz) {
2853 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2854 packet_size, exp_max_sz);
2859 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
2860 data_size = packet_size - header_size;
2861 } else if (apv <= 94) {
2862 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
2863 data_size = packet_size - header_size;
2864 D_ASSERT(data_size == 0);
2866 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
2867 data_size = packet_size - header_size;
2868 D_ASSERT(data_size == 0);
2871 /* initialize verify_alg and csums_alg */
2872 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2874 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
2877 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2881 if (data_size > SHARED_SECRET_MAX) {
2882 dev_err(DEV, "verify-alg too long, "
2883 "peer wants %u, accepting only %u byte\n",
2884 data_size, SHARED_SECRET_MAX);
2888 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
2891 /* we expect NUL terminated string */
2892 /* but just in case someone tries to be evil */
2893 D_ASSERT(p->verify_alg[data_size-1] == 0);
2894 p->verify_alg[data_size-1] = 0;
2896 } else /* apv >= 89 */ {
2897 /* we still expect NUL terminated strings */
2898 /* but just in case someone tries to be evil */
2899 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2900 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2901 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2902 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2905 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2906 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2907 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2908 mdev->sync_conf.verify_alg, p->verify_alg);
2911 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2912 p->verify_alg, "verify-alg");
2913 if (IS_ERR(verify_tfm)) {
2919 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2920 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2921 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2922 mdev->sync_conf.csums_alg, p->csums_alg);
2925 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2926 p->csums_alg, "csums-alg");
2927 if (IS_ERR(csums_tfm)) {
2934 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2935 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2936 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2937 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2938 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2940 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2941 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2942 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2944 dev_err(DEV, "kmalloc of fifo_buffer failed");
2950 spin_lock(&mdev->peer_seq_lock);
2951 /* lock against drbd_nl_syncer_conf() */
2953 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2954 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2955 crypto_free_hash(mdev->verify_tfm);
2956 mdev->verify_tfm = verify_tfm;
2957 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2960 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2961 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2962 crypto_free_hash(mdev->csums_tfm);
2963 mdev->csums_tfm = csums_tfm;
2964 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2966 if (fifo_size != mdev->rs_plan_s.size) {
2967 kfree(mdev->rs_plan_s.values);
2968 mdev->rs_plan_s.values = rs_plan_s;
2969 mdev->rs_plan_s.size = fifo_size;
2970 mdev->rs_planed = 0;
2972 spin_unlock(&mdev->peer_seq_lock);
2977 /* just for completeness: actually not needed,
2978 * as this is not reached if csums_tfm was ok. */
2979 crypto_free_hash(csums_tfm);
2980 /* but free the verify_tfm again, if csums_tfm did not work out */
2981 crypto_free_hash(verify_tfm);
2982 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2986 /* warn if the arguments differ by more than 12.5% */
2987 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2988 const char *s, sector_t a, sector_t b)
2991 if (a == 0 || b == 0)
2993 d = (a > b) ? (a - b) : (b - a);
2994 if (d > (a>>3) || d > (b>>3))
2995 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2996 (unsigned long long)a, (unsigned long long)b);
2999 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3000 unsigned int data_size)
3002 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
3003 enum determine_dev_size dd = unchanged;
3004 sector_t p_size, p_usize, my_usize;
3005 int ldsc = 0; /* local disk size changed */
3006 enum dds_flags ddsf;
3008 p_size = be64_to_cpu(p->d_size);
3009 p_usize = be64_to_cpu(p->u_size);
3011 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3012 dev_err(DEV, "some backing storage is needed\n");
3013 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3017 /* just store the peer's disk size for now.
3018 * we still need to figure out whether we accept that. */
3019 mdev->p_size = p_size;
3021 if (get_ldev(mdev)) {
3022 warn_if_differ_considerably(mdev, "lower level device sizes",
3023 p_size, drbd_get_max_capacity(mdev->ldev));
3024 warn_if_differ_considerably(mdev, "user requested size",
3025 p_usize, mdev->ldev->dc.disk_size);
3027 /* if this is the first connect, or an otherwise expected
3028 * param exchange, choose the minimum */
3029 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3030 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3033 my_usize = mdev->ldev->dc.disk_size;
3035 if (mdev->ldev->dc.disk_size != p_usize) {
3036 mdev->ldev->dc.disk_size = p_usize;
3037 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3038 (unsigned long)mdev->ldev->dc.disk_size);
3041 /* Never shrink a device with usable data during connect.
3042 But allow online shrinking if we are connected. */
3043 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3044 drbd_get_capacity(mdev->this_bdev) &&
3045 mdev->state.disk >= D_OUTDATED &&
3046 mdev->state.conn < C_CONNECTED) {
3047 dev_err(DEV, "The peer's disk size is too small!\n");
3048 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3049 mdev->ldev->dc.disk_size = my_usize;
3056 ddsf = be16_to_cpu(p->dds_flags);
3057 if (get_ldev(mdev)) {
3058 dd = drbd_determine_dev_size(mdev, ddsf);
3060 if (dd == dev_size_error)
3064 /* I am diskless, need to accept the peer's size. */
3065 drbd_set_my_capacity(mdev, p_size);
3068 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3069 drbd_reconsider_max_bio_size(mdev);
3071 if (get_ldev(mdev)) {
3072 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3073 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3080 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3081 if (be64_to_cpu(p->c_size) !=
3082 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3083 /* we have different sizes, probably peer
3084 * needs to know my new size... */
3085 drbd_send_sizes(mdev, 0, ddsf);
3087 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3088 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3089 if (mdev->state.pdsk >= D_INCONSISTENT &&
3090 mdev->state.disk >= D_INCONSISTENT) {
3091 if (ddsf & DDSF_NO_RESYNC)
3092 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3094 resync_after_online_grow(mdev);
3096 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3103 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3104 unsigned int data_size)
3106 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
3108 int i, updated_uuids = 0;
3110 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3112 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3113 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3115 kfree(mdev->p_uuid);
3116 mdev->p_uuid = p_uuid;
3118 if (mdev->state.conn < C_CONNECTED &&
3119 mdev->state.disk < D_INCONSISTENT &&
3120 mdev->state.role == R_PRIMARY &&
3121 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3122 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3123 (unsigned long long)mdev->ed_uuid);
3124 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3128 if (get_ldev(mdev)) {
3129 int skip_initial_sync =
3130 mdev->state.conn == C_CONNECTED &&
3131 mdev->tconn->agreed_pro_version >= 90 &&
3132 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3133 (p_uuid[UI_FLAGS] & 8);
3134 if (skip_initial_sync) {
3135 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3136 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3137 "clear_n_write from receive_uuids",
3138 BM_LOCKED_TEST_ALLOWED);
3139 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3140 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3141 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3147 } else if (mdev->state.disk < D_INCONSISTENT &&
3148 mdev->state.role == R_PRIMARY) {
3149 /* I am a diskless primary, the peer just created a new current UUID
3151 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3154 /* Before we test for the disk state, we should wait until an eventually
3155 ongoing cluster wide state change is finished. That is important if
3156 we are primary and are detaching from our disk. We need to see the
3157 new disk state... */
3158 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3159 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3160 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3163 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3169 * convert_state() - Converts the peer's view of the cluster state to our point of view
3170 * @ps: The state as seen by the peer.
3172 static union drbd_state convert_state(union drbd_state ps)
3174 union drbd_state ms;
3176 static enum drbd_conns c_tab[] = {
3177 [C_CONNECTED] = C_CONNECTED,
3179 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3180 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3181 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3182 [C_VERIFY_S] = C_VERIFY_T,
3188 ms.conn = c_tab[ps.conn];
3193 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3198 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3199 unsigned int data_size)
3201 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
3202 union drbd_state mask, val;
3203 enum drbd_state_rv rv;
3205 mask.i = be32_to_cpu(p->mask);
3206 val.i = be32_to_cpu(p->val);
3208 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3209 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3210 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3214 mask = convert_state(mask);
3215 val = convert_state(val);
3217 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3219 drbd_send_sr_reply(mdev, rv);
3225 static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3226 unsigned int data_size)
3228 struct p_state *p = &mdev->tconn->data.rbuf.state;
3229 union drbd_state os, ns, peer_state;
3230 enum drbd_disk_state real_peer_disk;
3231 enum chg_state_flags cs_flags;
3234 peer_state.i = be32_to_cpu(p->state);
3236 real_peer_disk = peer_state.disk;
3237 if (peer_state.disk == D_NEGOTIATING) {
3238 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3239 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3242 spin_lock_irq(&mdev->tconn->req_lock);
3244 os = ns = mdev->state;
3245 spin_unlock_irq(&mdev->tconn->req_lock);
3247 /* peer says his disk is uptodate, while we think it is inconsistent,
3248 * and this happens while we think we have a sync going on. */
3249 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3250 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3251 /* If we are (becoming) SyncSource, but peer is still in sync
3252 * preparation, ignore its uptodate-ness to avoid flapping, it
3253 * will change to inconsistent once the peer reaches active
3255 * It may have changed syncer-paused flags, however, so we
3256 * cannot ignore this completely. */
3257 if (peer_state.conn > C_CONNECTED &&
3258 peer_state.conn < C_SYNC_SOURCE)
3259 real_peer_disk = D_INCONSISTENT;
3261 /* if peer_state changes to connected at the same time,
3262 * it explicitly notifies us that it finished resync.
3263 * Maybe we should finish it up, too? */
3264 else if (os.conn >= C_SYNC_SOURCE &&
3265 peer_state.conn == C_CONNECTED) {
3266 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3267 drbd_resync_finished(mdev);
3272 /* peer says his disk is inconsistent, while we think it is uptodate,
3273 * and this happens while the peer still thinks we have a sync going on,
3274 * but we think we are already done with the sync.
3275 * We ignore this to avoid flapping pdsk.
3276 * This should not happen, if the peer is a recent version of drbd. */
3277 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3278 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3279 real_peer_disk = D_UP_TO_DATE;
3281 if (ns.conn == C_WF_REPORT_PARAMS)
3282 ns.conn = C_CONNECTED;
3284 if (peer_state.conn == C_AHEAD)
3287 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3288 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3289 int cr; /* consider resync */
3291 /* if we established a new connection */
3292 cr = (os.conn < C_CONNECTED);
3293 /* if we had an established connection
3294 * and one of the nodes newly attaches a disk */
3295 cr |= (os.conn == C_CONNECTED &&
3296 (peer_state.disk == D_NEGOTIATING ||
3297 os.disk == D_NEGOTIATING));
3298 /* if we have both been inconsistent, and the peer has been
3299 * forced to be UpToDate with --overwrite-data */
3300 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3301 /* if we had been plain connected, and the admin requested to
3302 * start a sync by "invalidate" or "invalidate-remote" */
3303 cr |= (os.conn == C_CONNECTED &&
3304 (peer_state.conn >= C_STARTING_SYNC_S &&
3305 peer_state.conn <= C_WF_BITMAP_T));
3308 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3311 if (ns.conn == C_MASK) {
3312 ns.conn = C_CONNECTED;
3313 if (mdev->state.disk == D_NEGOTIATING) {
3314 drbd_force_state(mdev, NS(disk, D_FAILED));
3315 } else if (peer_state.disk == D_NEGOTIATING) {
3316 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3317 peer_state.disk = D_DISKLESS;
3318 real_peer_disk = D_DISKLESS;
3320 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3322 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3323 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3329 spin_lock_irq(&mdev->tconn->req_lock);
3330 if (mdev->state.i != os.i)
3332 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3333 ns.peer = peer_state.role;
3334 ns.pdsk = real_peer_disk;
3335 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3336 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3337 ns.disk = mdev->new_state_tmp.disk;
3338 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3339 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3340 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3341 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3342 for temporal network outages! */
3343 spin_unlock_irq(&mdev->tconn->req_lock);
3344 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3346 drbd_uuid_new_current(mdev);
3347 clear_bit(NEW_CUR_UUID, &mdev->flags);
3348 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3351 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3353 spin_unlock_irq(&mdev->tconn->req_lock);
3355 if (rv < SS_SUCCESS) {
3356 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3360 if (os.conn > C_WF_REPORT_PARAMS) {
3361 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3362 peer_state.disk != D_NEGOTIATING ) {
3363 /* we want resync, peer has not yet decided to sync... */
3364 /* Nowadays only used when forcing a node into primary role and
3365 setting its disk to UpToDate with that */
3366 drbd_send_uuids(mdev);
3367 drbd_send_state(mdev);
3371 mdev->tconn->net_conf->want_lose = 0;
3373 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3378 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3379 unsigned int data_size)
3381 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
3383 wait_event(mdev->misc_wait,
3384 mdev->state.conn == C_WF_SYNC_UUID ||
3385 mdev->state.conn == C_BEHIND ||
3386 mdev->state.conn < C_CONNECTED ||
3387 mdev->state.disk < D_NEGOTIATING);
3389 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3391 /* Here the _drbd_uuid_ functions are right, current should
3392 _not_ be rotated into the history */
3393 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3394 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3395 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3397 drbd_print_uuids(mdev, "updated sync uuid");
3398 drbd_start_resync(mdev, C_SYNC_TARGET);
3402 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3408 * receive_bitmap_plain
3410 * Return 0 when done, 1 when another iteration is needed, and a negative error
3411 * code upon failure.
3414 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3415 unsigned long *buffer, struct bm_xfer_ctx *c)
3417 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3418 unsigned want = num_words * sizeof(long);
3421 if (want != data_size) {
3422 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3427 err = drbd_recv(mdev->tconn, buffer, want);
3434 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3436 c->word_offset += num_words;
3437 c->bit_offset = c->word_offset * BITS_PER_LONG;
3438 if (c->bit_offset > c->bm_bits)
3439 c->bit_offset = c->bm_bits;
3447 * Return 0 when done, 1 when another iteration is needed, and a negative error
3448 * code upon failure.
3451 recv_bm_rle_bits(struct drbd_conf *mdev,
3452 struct p_compressed_bm *p,
3453 struct bm_xfer_ctx *c,
3456 struct bitstream bs;
3460 unsigned long s = c->bit_offset;
3462 int toggle = DCBP_get_start(p);
3466 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3468 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3472 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3473 bits = vli_decode_bits(&rl, look_ahead);
3479 if (e >= c->bm_bits) {
3480 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3483 _drbd_bm_set_bits(mdev, s, e);
3487 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3488 have, bits, look_ahead,
3489 (unsigned int)(bs.cur.b - p->code),
3490 (unsigned int)bs.buf_len);
3493 look_ahead >>= bits;
3496 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3499 look_ahead |= tmp << have;
3504 bm_xfer_ctx_bit_to_word_offset(c);
3506 return (s != c->bm_bits);
3512 * Return 0 when done, 1 when another iteration is needed, and a negative error
3513 * code upon failure.
3516 decode_bitmap_c(struct drbd_conf *mdev,
3517 struct p_compressed_bm *p,
3518 struct bm_xfer_ctx *c,
3521 if (DCBP_get_code(p) == RLE_VLI_Bits)
3522 return recv_bm_rle_bits(mdev, p, c, len);
3524 /* other variants had been implemented for evaluation,
3525 * but have been dropped as this one turned out to be "best"
3526 * during all our tests. */
3528 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3529 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3533 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3534 const char *direction, struct bm_xfer_ctx *c)
3536 /* what would it take to transfer it "plaintext" */
3537 unsigned plain = sizeof(struct p_header) *
3538 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3539 + c->bm_words * sizeof(long);
3540 unsigned total = c->bytes[0] + c->bytes[1];
3543 /* total can not be zero. but just in case: */
3547 /* don't report if not compressed */
3551 /* total < plain. check for overflow, still */
3552 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3553 : (1000 * total / plain);
3559 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3560 "total %u; compression: %u.%u%%\n",
3562 c->bytes[1], c->packets[1],
3563 c->bytes[0], c->packets[0],
3564 total, r/10, r % 10);
3567 /* Since we are processing the bitfield from lower addresses to higher,
3568 it does not matter if the process it in 32 bit chunks or 64 bit
3569 chunks as long as it is little endian. (Understand it as byte stream,
3570 beginning with the lowest byte...) If we would use big endian
3571 we would need to process it from the highest address to the lowest,
3572 in order to be agnostic to the 32 vs 64 bits issue.
3574 returns 0 on failure, 1 if we successfully received it. */
3575 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3576 unsigned int data_size)
3578 struct bm_xfer_ctx c;
3582 struct p_header *h = &mdev->tconn->data.rbuf.header;
3584 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3585 /* you are supposed to send additional out-of-sync information
3586 * if you actually set bits during this phase */
3588 /* maybe we should use some per thread scratch page,
3589 * and allocate that during initial device creation? */
3590 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3592 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3596 c = (struct bm_xfer_ctx) {
3597 .bm_bits = drbd_bm_bits(mdev),
3598 .bm_words = drbd_bm_words(mdev),
3602 if (cmd == P_BITMAP) {
3603 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3604 } else if (cmd == P_COMPRESSED_BITMAP) {
3605 /* MAYBE: sanity check that we speak proto >= 90,
3606 * and the feature is enabled! */
3607 struct p_compressed_bm *p;
3609 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3610 dev_err(DEV, "ReportCBitmap packet too large\n");
3613 /* use the page buff */
3615 memcpy(p, h, sizeof(*h));
3616 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
3618 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3619 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3622 err = decode_bitmap_c(mdev, p, &c, data_size);
3624 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3628 c.packets[cmd == P_BITMAP]++;
3629 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
3636 if (!drbd_recv_header(mdev, &cmd, &data_size))
3640 INFO_bm_xfer_stats(mdev, "receive", &c);
3642 if (mdev->state.conn == C_WF_BITMAP_T) {
3643 enum drbd_state_rv rv;
3645 ok = !drbd_send_bitmap(mdev);
3648 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3649 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3650 D_ASSERT(rv == SS_SUCCESS);
3651 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3652 /* admin may have requested C_DISCONNECTING,
3653 * other threads may have noticed network errors */
3654 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3655 drbd_conn_str(mdev->state.conn));
3660 drbd_bm_unlock(mdev);
3661 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3662 drbd_start_resync(mdev, C_SYNC_SOURCE);
3663 free_page((unsigned long) buffer);
3667 static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3668 unsigned int data_size)
3670 /* TODO zero copy sink :) */
3671 static char sink[128];
3674 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3679 want = min_t(int, size, sizeof(sink));
3680 r = drbd_recv(mdev->tconn, sink, want);
3688 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3689 unsigned int data_size)
3691 /* Make sure we've acked all the TCP data associated
3692 * with the data requests being unplugged */
3693 drbd_tcp_quickack(mdev->tconn->data.socket);
3698 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3699 unsigned int data_size)
3701 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
3703 switch (mdev->state.conn) {
3704 case C_WF_SYNC_UUID:
3709 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3710 drbd_conn_str(mdev->state.conn));
3713 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3718 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3719 unsigned int to_receive);
3724 drbd_cmd_handler_f function;
3727 static struct data_cmd drbd_cmd_handler[] = {
3728 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3729 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3730 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3731 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3732 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3733 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3734 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
3735 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3736 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3737 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3738 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
3739 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3740 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3741 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3742 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3743 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3744 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3745 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3746 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3747 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3748 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3749 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3750 /* anything missing from this table is in
3751 * the asender_tbl, see get_asender_cmd */
3752 [P_MAX_CMD] = { 0, 0, NULL },
3755 /* All handler functions that expect a sub-header get that sub-heder in
3756 mdev->tconn->data.rbuf.header.head.payload.
3758 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
3759 p_header, but they may not rely on that. Since there is also p_header95 !
3762 static void drbdd(struct drbd_conf *mdev)
3764 struct p_header *header = &mdev->tconn->data.rbuf.header;
3765 unsigned int packet_size;
3766 enum drbd_packet cmd;
3767 size_t shs; /* sub header size */
3770 while (get_t_state(&mdev->tconn->receiver) == RUNNING) {
3771 drbd_thread_current_set_cpu(mdev, &mdev->tconn->receiver);
3772 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3775 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3776 dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3780 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(struct p_header);
3781 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3782 dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3787 rv = drbd_recv(mdev->tconn, &header->payload, shs);
3788 if (unlikely(rv != shs)) {
3789 if (!signal_pending(current))
3790 dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3795 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3797 if (unlikely(!rv)) {
3798 dev_err(DEV, "error receiving %s, l: %d!\n",
3799 cmdname(cmd), packet_size);
3806 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3808 /* If we leave here, we probably want to update at least the
3809 * "Connected" indicator on stable storage. Do so explicitly here. */
3813 void drbd_flush_workqueue(struct drbd_tconn *tconn)
3815 struct drbd_wq_barrier barr;
3817 barr.w.cb = w_prev_work_done;
3818 init_completion(&barr.done);
3819 drbd_queue_work(&tconn->data.work, &barr.w);
3820 wait_for_completion(&barr.done);
3823 static void drbd_disconnect(struct drbd_conf *mdev)
3825 enum drbd_fencing_p fp;
3826 union drbd_state os, ns;
3827 int rv = SS_UNKNOWN_ERROR;
3830 if (mdev->state.conn == C_STANDALONE)
3833 /* asender does not clean up anything. it must not interfere, either */
3834 drbd_thread_stop(&mdev->tconn->asender);
3835 drbd_free_sock(mdev);
3837 /* wait for current activity to cease. */
3838 spin_lock_irq(&mdev->tconn->req_lock);
3839 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3840 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3841 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3842 spin_unlock_irq(&mdev->tconn->req_lock);
3844 /* We do not have data structures that would allow us to
3845 * get the rs_pending_cnt down to 0 again.
3846 * * On C_SYNC_TARGET we do not have any data structures describing
3847 * the pending RSDataRequest's we have sent.
3848 * * On C_SYNC_SOURCE there is no data structure that tracks
3849 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3850 * And no, it is not the sum of the reference counts in the
3851 * resync_LRU. The resync_LRU tracks the whole operation including
3852 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3854 drbd_rs_cancel_all(mdev);
3856 mdev->rs_failed = 0;
3857 atomic_set(&mdev->rs_pending_cnt, 0);
3858 wake_up(&mdev->misc_wait);
3860 del_timer(&mdev->request_timer);
3862 /* make sure syncer is stopped and w_resume_next_sg queued */
3863 del_timer_sync(&mdev->resync_timer);
3864 resync_timer_fn((unsigned long)mdev);
3866 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3867 * w_make_resync_request etc. which may still be on the worker queue
3868 * to be "canceled" */
3869 drbd_flush_workqueue(mdev->tconn);
3871 /* This also does reclaim_net_ee(). If we do this too early, we might
3872 * miss some resync ee and pages.*/
3873 drbd_process_done_ee(mdev);
3875 kfree(mdev->p_uuid);
3876 mdev->p_uuid = NULL;
3878 if (!is_susp(mdev->state))
3881 dev_info(DEV, "Connection closed\n");
3886 if (get_ldev(mdev)) {
3887 fp = mdev->ldev->dc.fencing;
3891 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3892 drbd_try_outdate_peer_async(mdev);
3894 spin_lock_irq(&mdev->tconn->req_lock);
3896 if (os.conn >= C_UNCONNECTED) {
3897 /* Do not restart in case we are C_DISCONNECTING */
3899 ns.conn = C_UNCONNECTED;
3900 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3902 spin_unlock_irq(&mdev->tconn->req_lock);
3904 if (os.conn == C_DISCONNECTING) {
3905 wait_event(mdev->tconn->net_cnt_wait, atomic_read(&mdev->tconn->net_cnt) == 0);
3907 crypto_free_hash(mdev->tconn->cram_hmac_tfm);
3908 mdev->tconn->cram_hmac_tfm = NULL;
3910 kfree(mdev->tconn->net_conf);
3911 mdev->tconn->net_conf = NULL;
3912 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3915 /* serialize with bitmap writeout triggered by the state change,
3917 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3919 /* tcp_close and release of sendpage pages can be deferred. I don't
3920 * want to use SO_LINGER, because apparently it can be deferred for
3921 * more than 20 seconds (longest time I checked).
3923 * Actually we don't care for exactly when the network stack does its
3924 * put_page(), but release our reference on these pages right here.
3926 i = drbd_release_ee(mdev, &mdev->net_ee);
3928 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3929 i = atomic_read(&mdev->pp_in_use_by_net);
3931 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3932 i = atomic_read(&mdev->pp_in_use);
3934 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3936 D_ASSERT(list_empty(&mdev->read_ee));
3937 D_ASSERT(list_empty(&mdev->active_ee));
3938 D_ASSERT(list_empty(&mdev->sync_ee));
3939 D_ASSERT(list_empty(&mdev->done_ee));
3941 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3942 atomic_set(&mdev->current_epoch->epoch_size, 0);
3943 D_ASSERT(list_empty(&mdev->current_epoch->list));
3947 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3948 * we can agree on is stored in agreed_pro_version.
3950 * feature flags and the reserved array should be enough room for future
3951 * enhancements of the handshake protocol, and possible plugins...
3953 * for now, they are expected to be zero, but ignored.
3955 static int drbd_send_handshake(struct drbd_tconn *tconn)
3957 /* ASSERT current == mdev->tconn->receiver ... */
3958 struct p_handshake *p = &tconn->data.sbuf.handshake;
3961 if (mutex_lock_interruptible(&tconn->data.mutex)) {
3962 conn_err(tconn, "interrupted during initial handshake\n");
3963 return 0; /* interrupted. not ok. */
3966 if (tconn->data.socket == NULL) {
3967 mutex_unlock(&tconn->data.mutex);
3971 memset(p, 0, sizeof(*p));
3972 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3973 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3974 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
3975 &p->head, sizeof(*p), 0);
3976 mutex_unlock(&tconn->data.mutex);
3982 * 1 yes, we have a valid connection
3983 * 0 oops, did not work out, please try again
3984 * -1 peer talks different language,
3985 * no point in trying again, please go standalone.
3987 static int drbd_do_handshake(struct drbd_conf *mdev)
3989 /* ASSERT current == mdev->tconn->receiver ... */
3990 struct p_handshake *p = &mdev->tconn->data.rbuf.handshake;
3991 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3992 unsigned int length;
3993 enum drbd_packet cmd;
3996 rv = drbd_send_handshake(mdev->tconn);
4000 rv = drbd_recv_header(mdev, &cmd, &length);
4004 if (cmd != P_HAND_SHAKE) {
4005 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4010 if (length != expect) {
4011 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4016 rv = drbd_recv(mdev->tconn, &p->head.payload, expect);
4019 if (!signal_pending(current))
4020 dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4024 p->protocol_min = be32_to_cpu(p->protocol_min);
4025 p->protocol_max = be32_to_cpu(p->protocol_max);
4026 if (p->protocol_max == 0)
4027 p->protocol_max = p->protocol_min;
4029 if (PRO_VERSION_MAX < p->protocol_min ||
4030 PRO_VERSION_MIN > p->protocol_max)
4033 mdev->tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4035 dev_info(DEV, "Handshake successful: "
4036 "Agreed network protocol version %d\n", mdev->tconn->agreed_pro_version);
4041 dev_err(DEV, "incompatible DRBD dialects: "
4042 "I support %d-%d, peer supports %d-%d\n",
4043 PRO_VERSION_MIN, PRO_VERSION_MAX,
4044 p->protocol_min, p->protocol_max);
4048 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4049 static int drbd_do_auth(struct drbd_conf *mdev)
4051 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4052 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4056 #define CHALLENGE_LEN 64
4060 0 - failed, try again (network error),
4061 -1 - auth failed, don't try again.
4064 static int drbd_do_auth(struct drbd_conf *mdev)
4066 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4067 struct scatterlist sg;
4068 char *response = NULL;
4069 char *right_response = NULL;
4070 char *peers_ch = NULL;
4071 unsigned int key_len = strlen(mdev->tconn->net_conf->shared_secret);
4072 unsigned int resp_size;
4073 struct hash_desc desc;
4074 enum drbd_packet cmd;
4075 unsigned int length;
4078 desc.tfm = mdev->tconn->cram_hmac_tfm;
4081 rv = crypto_hash_setkey(mdev->tconn->cram_hmac_tfm,
4082 (u8 *)mdev->tconn->net_conf->shared_secret, key_len);
4084 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4089 get_random_bytes(my_challenge, CHALLENGE_LEN);
4091 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4095 rv = drbd_recv_header(mdev, &cmd, &length);
4099 if (cmd != P_AUTH_CHALLENGE) {
4100 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4106 if (length > CHALLENGE_LEN * 2) {
4107 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4112 peers_ch = kmalloc(length, GFP_NOIO);
4113 if (peers_ch == NULL) {
4114 dev_err(DEV, "kmalloc of peers_ch failed\n");
4119 rv = drbd_recv(mdev->tconn, peers_ch, length);
4122 if (!signal_pending(current))
4123 dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4128 resp_size = crypto_hash_digestsize(mdev->tconn->cram_hmac_tfm);
4129 response = kmalloc(resp_size, GFP_NOIO);
4130 if (response == NULL) {
4131 dev_err(DEV, "kmalloc of response failed\n");
4136 sg_init_table(&sg, 1);
4137 sg_set_buf(&sg, peers_ch, length);
4139 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4141 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4146 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4150 rv = drbd_recv_header(mdev, &cmd, &length);
4154 if (cmd != P_AUTH_RESPONSE) {
4155 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4161 if (length != resp_size) {
4162 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4167 rv = drbd_recv(mdev->tconn, response , resp_size);
4169 if (rv != resp_size) {
4170 if (!signal_pending(current))
4171 dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4176 right_response = kmalloc(resp_size, GFP_NOIO);
4177 if (right_response == NULL) {
4178 dev_err(DEV, "kmalloc of right_response failed\n");
4183 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4185 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4187 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4192 rv = !memcmp(response, right_response, resp_size);
4195 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4196 resp_size, mdev->tconn->net_conf->cram_hmac_alg);
4203 kfree(right_response);
4209 int drbdd_init(struct drbd_thread *thi)
4211 struct drbd_conf *mdev = thi->mdev;
4212 unsigned int minor = mdev_to_minor(mdev);
4215 sprintf(current->comm, "drbd%d_receiver", minor);
4217 dev_info(DEV, "receiver (re)started\n");
4220 h = drbd_connect(mdev);
4222 drbd_disconnect(mdev);
4223 schedule_timeout_interruptible(HZ);
4226 dev_warn(DEV, "Discarding network configuration.\n");
4227 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4232 if (get_net_conf(mdev->tconn)) {
4234 put_net_conf(mdev->tconn);
4238 drbd_disconnect(mdev);
4240 dev_info(DEV, "receiver terminated\n");
4244 /* ********* acknowledge sender ******** */
4246 static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4248 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
4250 int retcode = be32_to_cpu(p->retcode);
4252 if (retcode >= SS_SUCCESS) {
4253 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4255 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4256 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4257 drbd_set_st_err_str(retcode), retcode);
4259 wake_up(&mdev->state_wait);
4264 static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
4266 return drbd_send_ping_ack(mdev);
4270 static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4272 /* restore idle timeout */
4273 mdev->tconn->meta.socket->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
4274 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4275 wake_up(&mdev->misc_wait);
4280 static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
4282 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4283 sector_t sector = be64_to_cpu(p->sector);
4284 int blksize = be32_to_cpu(p->blksize);
4286 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4288 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4290 if (get_ldev(mdev)) {
4291 drbd_rs_complete_io(mdev, sector);
4292 drbd_set_in_sync(mdev, sector, blksize);
4293 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4294 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4297 dec_rs_pending(mdev);
4298 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4304 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4305 struct rb_root *root, const char *func,
4306 enum drbd_req_event what, bool missing_ok)
4308 struct drbd_request *req;
4309 struct bio_and_error m;
4311 spin_lock_irq(&mdev->tconn->req_lock);
4312 req = find_request(mdev, root, id, sector, missing_ok, func);
4313 if (unlikely(!req)) {
4314 spin_unlock_irq(&mdev->tconn->req_lock);
4317 __req_mod(req, what, &m);
4318 spin_unlock_irq(&mdev->tconn->req_lock);
4321 complete_master_bio(mdev, &m);
4325 static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4327 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4328 sector_t sector = be64_to_cpu(p->sector);
4329 int blksize = be32_to_cpu(p->blksize);
4330 enum drbd_req_event what;
4332 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4334 if (p->block_id == ID_SYNCER) {
4335 drbd_set_in_sync(mdev, sector, blksize);
4336 dec_rs_pending(mdev);
4340 case P_RS_WRITE_ACK:
4341 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4342 what = WRITE_ACKED_BY_PEER_AND_SIS;
4345 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4346 what = WRITE_ACKED_BY_PEER;
4349 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4350 what = RECV_ACKED_BY_PEER;
4353 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4354 what = CONFLICT_DISCARDED_BY_PEER;
4361 return validate_req_change_req_state(mdev, p->block_id, sector,
4362 &mdev->write_requests, __func__,
4366 static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4368 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4369 sector_t sector = be64_to_cpu(p->sector);
4370 int size = be32_to_cpu(p->blksize);
4371 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4372 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
4375 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377 if (p->block_id == ID_SYNCER) {
4378 dec_rs_pending(mdev);
4379 drbd_rs_failed_io(mdev, sector, size);
4383 found = validate_req_change_req_state(mdev, p->block_id, sector,
4384 &mdev->write_requests, __func__,
4385 NEG_ACKED, missing_ok);
4387 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4388 The master bio might already be completed, therefore the
4389 request is no longer in the collision hash. */
4390 /* In Protocol B we might already have got a P_RECV_ACK
4391 but then get a P_NEG_ACK afterwards. */
4394 drbd_set_out_of_sync(mdev, sector, size);
4399 static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4401 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4402 sector_t sector = be64_to_cpu(p->sector);
4404 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4405 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4406 (unsigned long long)sector, be32_to_cpu(p->blksize));
4408 return validate_req_change_req_state(mdev, p->block_id, sector,
4409 &mdev->read_requests, __func__,
4413 static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4417 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4419 sector = be64_to_cpu(p->sector);
4420 size = be32_to_cpu(p->blksize);
4422 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4424 dec_rs_pending(mdev);
4426 if (get_ldev_if_state(mdev, D_FAILED)) {
4427 drbd_rs_complete_io(mdev, sector);
4429 case P_NEG_RS_DREPLY:
4430 drbd_rs_failed_io(mdev, sector, size);
4444 static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4446 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
4448 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4450 if (mdev->state.conn == C_AHEAD &&
4451 atomic_read(&mdev->ap_in_flight) == 0 &&
4452 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4453 mdev->start_resync_timer.expires = jiffies + HZ;
4454 add_timer(&mdev->start_resync_timer);
4460 static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
4462 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4463 struct drbd_work *w;
4467 sector = be64_to_cpu(p->sector);
4468 size = be32_to_cpu(p->blksize);
4470 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4472 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4473 drbd_ov_oos_found(mdev, sector, size);
4477 if (!get_ldev(mdev))
4480 drbd_rs_complete_io(mdev, sector);
4481 dec_rs_pending(mdev);
4485 /* let's advance progress step marks only for every other megabyte */
4486 if ((mdev->ov_left & 0x200) == 0x200)
4487 drbd_advance_rs_marks(mdev, mdev->ov_left);
4489 if (mdev->ov_left == 0) {
4490 w = kmalloc(sizeof(*w), GFP_NOIO);
4492 w->cb = w_ov_finished;
4493 drbd_queue_work_front(&mdev->tconn->data.work, w);
4495 dev_err(DEV, "kmalloc(w) failed.");
4497 drbd_resync_finished(mdev);
4504 static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
4509 struct asender_cmd {
4511 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
4514 static struct asender_cmd *get_asender_cmd(int cmd)
4516 static struct asender_cmd asender_tbl[] = {
4517 /* anything missing from this table is in
4518 * the drbd_cmd_handler (drbd_default_handler) table,
4519 * see the beginning of drbdd() */
4520 [P_PING] = { sizeof(struct p_header), got_Ping },
4521 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4522 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4523 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4524 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4525 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4526 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4527 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4528 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4529 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4530 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4531 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4532 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4533 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4534 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
4535 [P_MAX_CMD] = { 0, NULL },
4537 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4539 return &asender_tbl[cmd];
4542 int drbd_asender(struct drbd_thread *thi)
4544 struct drbd_conf *mdev = thi->mdev;
4545 struct p_header *h = &mdev->tconn->meta.rbuf.header;
4546 struct asender_cmd *cmd = NULL;
4551 int expect = sizeof(struct p_header);
4552 int ping_timeout_active = 0;
4553 int empty, pkt_size;
4554 enum drbd_packet cmd_nr;
4556 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4558 current->policy = SCHED_RR; /* Make this a realtime task! */
4559 current->rt_priority = 2; /* more important than all other tasks */
4561 while (get_t_state(thi) == RUNNING) {
4562 drbd_thread_current_set_cpu(mdev, thi);
4563 if (test_and_clear_bit(SEND_PING, &mdev->tconn->flags)) {
4564 if (!drbd_send_ping(mdev)) {
4565 dev_err(DEV, "drbd_send_ping has failed\n");
4568 mdev->tconn->meta.socket->sk->sk_rcvtimeo =
4569 mdev->tconn->net_conf->ping_timeo*HZ/10;
4570 ping_timeout_active = 1;
4573 /* conditionally cork;
4574 * it may hurt latency if we cork without much to send */
4575 if (!mdev->tconn->net_conf->no_cork &&
4576 3 < atomic_read(&mdev->unacked_cnt))
4577 drbd_tcp_cork(mdev->tconn->meta.socket);
4579 clear_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
4580 flush_signals(current);
4581 if (!drbd_process_done_ee(mdev))
4583 /* to avoid race with newly queued ACKs */
4584 set_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
4585 spin_lock_irq(&mdev->tconn->req_lock);
4586 empty = list_empty(&mdev->done_ee);
4587 spin_unlock_irq(&mdev->tconn->req_lock);
4588 /* new ack may have been queued right here,
4589 * but then there is also a signal pending,
4590 * and we start over... */
4594 /* but unconditionally uncork unless disabled */
4595 if (!mdev->tconn->net_conf->no_cork)
4596 drbd_tcp_uncork(mdev->tconn->meta.socket);
4598 /* short circuit, recv_msg would return EINTR anyways. */
4599 if (signal_pending(current))
4602 rv = drbd_recv_short(mdev->tconn->meta.socket, buf, expect-received, 0);
4603 clear_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
4605 flush_signals(current);
4608 * -EINTR (on meta) we got a signal
4609 * -EAGAIN (on meta) rcvtimeo expired
4610 * -ECONNRESET other side closed the connection
4611 * -ERESTARTSYS (on data) we got a signal
4612 * rv < 0 other than above: unexpected error!
4613 * rv == expected: full header or command
4614 * rv < expected: "woken" by signal during receive
4615 * rv == 0 : "connection shut down by peer"
4617 if (likely(rv > 0)) {
4620 } else if (rv == 0) {
4621 dev_err(DEV, "meta connection shut down by peer.\n");
4623 } else if (rv == -EAGAIN) {
4624 /* If the data socket received something meanwhile,
4625 * that is good enough: peer is still alive. */
4626 if (time_after(mdev->tconn->last_received,
4627 jiffies - mdev->tconn->meta.socket->sk->sk_rcvtimeo))
4629 if (ping_timeout_active) {
4630 dev_err(DEV, "PingAck did not arrive in time.\n");
4633 set_bit(SEND_PING, &mdev->tconn->flags);
4635 } else if (rv == -EINTR) {
4638 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4642 if (received == expect && cmd == NULL) {
4643 if (!decode_header(mdev, h, &cmd_nr, &pkt_size))
4645 cmd = get_asender_cmd(cmd_nr);
4646 if (unlikely(cmd == NULL)) {
4647 dev_err(DEV, "unknown command %d on meta (l: %d)\n",
4651 expect = cmd->pkt_size;
4652 if (pkt_size != expect - sizeof(struct p_header)) {
4653 dev_err(DEV, "Wrong packet size on meta (c: %d, l: %d)\n",
4658 if (received == expect) {
4659 mdev->tconn->last_received = jiffies;
4660 D_ASSERT(cmd != NULL);
4661 if (!cmd->process(mdev, cmd_nr))
4664 /* the idle_timeout (ping-int)
4665 * has been restored in got_PingAck() */
4666 if (cmd == get_asender_cmd(P_PING_ACK))
4667 ping_timeout_active = 0;
4671 expect = sizeof(struct p_header);
4678 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4683 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4686 clear_bit(SIGNAL_ASENDER, &mdev->tconn->flags);
4688 D_ASSERT(mdev->state.conn < C_CONNECTED);
4689 dev_info(DEV, "asender terminated\n");