4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
63 static int drbd_do_handshake(struct drbd_tconn *tconn);
64 static int drbd_do_auth(struct drbd_tconn *tconn);
65 static int drbd_disconnected(int vnr, void *p, void *data);
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_work *, int);
71 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74 * some helper functions to deal with single linked page lists,
75 * page->private being our "next" pointer.
78 /* If at least n pages are linked at head, get n pages off.
79 * Otherwise, don't modify head, and return NULL.
80 * Locking is the responsibility of the caller.
82 static struct page *page_chain_del(struct page **head, int n)
96 tmp = page_chain_next(page);
98 break; /* found sufficient pages */
100 /* insufficient pages, don't use any of them. */
105 /* add end of list marker for the returned list */
106 set_page_private(page, 0);
107 /* actual return value, and adjustment of head */
113 /* may be used outside of locks to find the tail of a (usually short)
114 * "private" page chain, before adding it back to a global chain head
115 * with page_chain_add() under a spinlock. */
116 static struct page *page_chain_tail(struct page *page, int *len)
120 while ((tmp = page_chain_next(page)))
127 static int page_chain_free(struct page *page)
131 page_chain_for_each_safe(page, tmp) {
138 static void page_chain_add(struct page **head,
139 struct page *chain_first, struct page *chain_last)
143 tmp = page_chain_tail(chain_first, NULL);
144 BUG_ON(tmp != chain_last);
147 /* add chain to head */
148 set_page_private(chain_last, (unsigned long)*head);
152 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
154 struct page *page = NULL;
155 struct page *tmp = NULL;
158 /* Yes, testing drbd_pp_vacant outside the lock is racy.
159 * So what. It saves a spin_lock. */
160 if (drbd_pp_vacant >= number) {
161 spin_lock(&drbd_pp_lock);
162 page = page_chain_del(&drbd_pp_pool, number);
164 drbd_pp_vacant -= number;
165 spin_unlock(&drbd_pp_lock);
170 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
171 * "criss-cross" setup, that might cause write-out on some other DRBD,
172 * which in turn might block on the other node at this very place. */
173 for (i = 0; i < number; i++) {
174 tmp = alloc_page(GFP_TRY);
177 set_page_private(tmp, (unsigned long)page);
184 /* Not enough pages immediately available this time.
185 * No need to jump around here, drbd_pp_alloc will retry this
186 * function "soon". */
188 tmp = page_chain_tail(page, NULL);
189 spin_lock(&drbd_pp_lock);
190 page_chain_add(&drbd_pp_pool, page, tmp);
192 spin_unlock(&drbd_pp_lock);
197 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
199 struct drbd_peer_request *peer_req;
200 struct list_head *le, *tle;
202 /* The EEs are always appended to the end of the list. Since
203 they are sent in order over the wire, they have to finish
204 in order. As soon as we see the first not finished we can
205 stop to examine the list... */
207 list_for_each_safe(le, tle, &mdev->net_ee) {
208 peer_req = list_entry(le, struct drbd_peer_request, w.list);
209 if (drbd_ee_has_active_page(peer_req))
211 list_move(le, to_be_freed);
215 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
217 LIST_HEAD(reclaimed);
218 struct drbd_peer_request *peer_req, *t;
220 spin_lock_irq(&mdev->tconn->req_lock);
221 reclaim_net_ee(mdev, &reclaimed);
222 spin_unlock_irq(&mdev->tconn->req_lock);
224 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
225 drbd_free_net_ee(mdev, peer_req);
229 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
230 * @mdev: DRBD device.
231 * @number: number of pages requested
232 * @retry: whether to retry, if not enough pages are available right now
234 * Tries to allocate number pages, first from our own page pool, then from
235 * the kernel, unless this allocation would exceed the max_buffers setting.
236 * Possibly retry until DRBD frees sufficient pages somewhere else.
238 * Returns a page chain linked via page->private.
240 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
242 struct page *page = NULL;
245 /* Yes, we may run up to @number over max_buffers. If we
246 * follow it strictly, the admin will get it wrong anyways. */
247 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers)
248 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250 while (page == NULL) {
251 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
253 drbd_kick_lo_and_reclaim_net(mdev);
255 if (atomic_read(&mdev->pp_in_use) < mdev->tconn->net_conf->max_buffers) {
256 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
264 if (signal_pending(current)) {
265 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
271 finish_wait(&drbd_pp_wait, &wait);
274 atomic_add(number, &mdev->pp_in_use);
278 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
279 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
280 * Either links the page chain back to the global pool,
281 * or returns all pages to the system. */
282 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
284 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
287 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
288 i = page_chain_free(page);
291 tmp = page_chain_tail(page, &i);
292 spin_lock(&drbd_pp_lock);
293 page_chain_add(&drbd_pp_pool, page, tmp);
295 spin_unlock(&drbd_pp_lock);
297 i = atomic_sub_return(i, a);
299 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
300 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
301 wake_up(&drbd_pp_wait);
305 You need to hold the req_lock:
306 _drbd_wait_ee_list_empty()
308 You must not have the req_lock:
314 drbd_process_done_ee()
316 drbd_wait_ee_list_empty()
319 struct drbd_peer_request *
320 drbd_alloc_ee(struct drbd_conf *mdev, u64 id, sector_t sector,
321 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
323 struct drbd_peer_request *peer_req;
325 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
327 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
330 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
332 if (!(gfp_mask & __GFP_NOWARN))
333 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
337 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
341 drbd_clear_interval(&peer_req->i);
342 peer_req->i.size = data_size;
343 peer_req->i.sector = sector;
344 peer_req->i.local = false;
345 peer_req->i.waiting = false;
347 peer_req->epoch = NULL;
348 peer_req->w.mdev = mdev;
349 peer_req->pages = page;
350 atomic_set(&peer_req->pending_bios, 0);
353 * The block_id is opaque to the receiver. It is not endianness
354 * converted, and sent back to the sender unchanged.
356 peer_req->block_id = id;
361 mempool_free(peer_req, drbd_ee_mempool);
365 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
368 if (peer_req->flags & EE_HAS_DIGEST)
369 kfree(peer_req->digest);
370 drbd_pp_free(mdev, peer_req->pages, is_net);
371 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
372 D_ASSERT(drbd_interval_empty(&peer_req->i));
373 mempool_free(peer_req, drbd_ee_mempool);
376 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
378 LIST_HEAD(work_list);
379 struct drbd_peer_request *peer_req, *t;
381 int is_net = list == &mdev->net_ee;
383 spin_lock_irq(&mdev->tconn->req_lock);
384 list_splice_init(list, &work_list);
385 spin_unlock_irq(&mdev->tconn->req_lock);
387 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
388 drbd_free_some_ee(mdev, peer_req, is_net);
395 /* See also comments in _req_mod(,BARRIER_ACKED)
396 * and receive_Barrier.
398 * Move entries from net_ee to done_ee, if ready.
399 * Grab done_ee, call all callbacks, free the entries.
400 * The callbacks typically send out ACKs.
402 static int drbd_process_done_ee(struct drbd_conf *mdev)
404 LIST_HEAD(work_list);
405 LIST_HEAD(reclaimed);
406 struct drbd_peer_request *peer_req, *t;
407 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
409 spin_lock_irq(&mdev->tconn->req_lock);
410 reclaim_net_ee(mdev, &reclaimed);
411 list_splice_init(&mdev->done_ee, &work_list);
412 spin_unlock_irq(&mdev->tconn->req_lock);
414 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
415 drbd_free_net_ee(mdev, peer_req);
417 /* possible callbacks here:
418 * e_end_block, and e_end_resync_block, e_send_discard_ack.
419 * all ignore the last argument.
421 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
422 /* list_del not necessary, next/prev members not touched */
423 ok = peer_req->w.cb(&peer_req->w, !ok) && ok;
424 drbd_free_ee(mdev, peer_req);
426 wake_up(&mdev->ee_wait);
431 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
435 /* avoids spin_lock/unlock
436 * and calling prepare_to_wait in the fast path */
437 while (!list_empty(head)) {
438 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
439 spin_unlock_irq(&mdev->tconn->req_lock);
441 finish_wait(&mdev->ee_wait, &wait);
442 spin_lock_irq(&mdev->tconn->req_lock);
446 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
448 spin_lock_irq(&mdev->tconn->req_lock);
449 _drbd_wait_ee_list_empty(mdev, head);
450 spin_unlock_irq(&mdev->tconn->req_lock);
453 /* see also kernel_accept; which is only present since 2.6.18.
454 * also we want to log which part of it failed, exactly */
455 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
457 struct sock *sk = sock->sk;
461 err = sock->ops->listen(sock, 5);
465 *what = "sock_create_lite";
466 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
472 err = sock->ops->accept(sock, *newsock, 0);
474 sock_release(*newsock);
478 (*newsock)->ops = sock->ops;
484 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
491 struct msghdr msg = {
493 .msg_iov = (struct iovec *)&iov,
494 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
500 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
506 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
513 struct msghdr msg = {
515 .msg_iov = (struct iovec *)&iov,
516 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
524 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
529 * ECONNRESET other side closed the connection
530 * ERESTARTSYS (on sock) we got a signal
534 if (rv == -ECONNRESET)
535 conn_info(tconn, "sock was reset by peer\n");
536 else if (rv != -ERESTARTSYS)
537 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
539 } else if (rv == 0) {
540 conn_info(tconn, "sock was shut down by peer\n");
543 /* signal came in, or peer/link went down,
544 * after we read a partial message
546 /* D_ASSERT(signal_pending(current)); */
554 drbd_force_state(tconn->volume0, NS(conn, C_BROKEN_PIPE));
560 * On individual connections, the socket buffer size must be set prior to the
561 * listen(2) or connect(2) calls in order to have it take effect.
562 * This is our wrapper to do so.
564 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
567 /* open coded SO_SNDBUF, SO_RCVBUF */
569 sock->sk->sk_sndbuf = snd;
570 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
573 sock->sk->sk_rcvbuf = rcv;
574 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
578 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
582 struct sockaddr_in6 src_in6;
584 int disconnect_on_error = 1;
586 if (!get_net_conf(tconn))
589 what = "sock_create_kern";
590 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
591 SOCK_STREAM, IPPROTO_TCP, &sock);
597 sock->sk->sk_rcvtimeo =
598 sock->sk->sk_sndtimeo = tconn->net_conf->try_connect_int*HZ;
599 drbd_setbufsize(sock, tconn->net_conf->sndbuf_size,
600 tconn->net_conf->rcvbuf_size);
602 /* explicitly bind to the configured IP as source IP
603 * for the outgoing connections.
604 * This is needed for multihomed hosts and to be
605 * able to use lo: interfaces for drbd.
606 * Make sure to use 0 as port number, so linux selects
607 * a free one dynamically.
609 memcpy(&src_in6, tconn->net_conf->my_addr,
610 min_t(int, tconn->net_conf->my_addr_len, sizeof(src_in6)));
611 if (((struct sockaddr *)tconn->net_conf->my_addr)->sa_family == AF_INET6)
612 src_in6.sin6_port = 0;
614 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
616 what = "bind before connect";
617 err = sock->ops->bind(sock,
618 (struct sockaddr *) &src_in6,
619 tconn->net_conf->my_addr_len);
623 /* connect may fail, peer not yet available.
624 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 disconnect_on_error = 0;
627 err = sock->ops->connect(sock,
628 (struct sockaddr *)tconn->net_conf->peer_addr,
629 tconn->net_conf->peer_addr_len, 0);
638 /* timeout, busy, signal pending */
639 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640 case EINTR: case ERESTARTSYS:
641 /* peer not (yet) available, network problem */
642 case ECONNREFUSED: case ENETUNREACH:
643 case EHOSTDOWN: case EHOSTUNREACH:
644 disconnect_on_error = 0;
647 conn_err(tconn, "%s failed, err = %d\n", what, err);
649 if (disconnect_on_error)
650 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
656 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
659 struct socket *s_estab = NULL, *s_listen;
662 if (!get_net_conf(tconn))
665 what = "sock_create_kern";
666 err = sock_create_kern(((struct sockaddr *)tconn->net_conf->my_addr)->sa_family,
667 SOCK_STREAM, IPPROTO_TCP, &s_listen);
673 timeo = tconn->net_conf->try_connect_int * HZ;
674 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
676 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
677 s_listen->sk->sk_rcvtimeo = timeo;
678 s_listen->sk->sk_sndtimeo = timeo;
679 drbd_setbufsize(s_listen, tconn->net_conf->sndbuf_size,
680 tconn->net_conf->rcvbuf_size);
682 what = "bind before listen";
683 err = s_listen->ops->bind(s_listen,
684 (struct sockaddr *) tconn->net_conf->my_addr,
685 tconn->net_conf->my_addr_len);
689 err = drbd_accept(&what, s_listen, &s_estab);
693 sock_release(s_listen);
695 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
696 conn_err(tconn, "%s failed, err = %d\n", what, err);
697 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
705 static int drbd_send_fp(struct drbd_tconn *tconn, struct socket *sock, enum drbd_packet cmd)
707 struct p_header *h = &tconn->data.sbuf.header;
709 return _conn_send_cmd(tconn, 0, sock, cmd, h, sizeof(*h), 0);
712 static enum drbd_packet drbd_recv_fp(struct drbd_tconn *tconn, struct socket *sock)
714 struct p_header80 *h = &tconn->data.rbuf.header.h80;
717 rr = drbd_recv_short(sock, h, sizeof(*h), 0);
719 if (rr == sizeof(*h) && h->magic == cpu_to_be32(DRBD_MAGIC))
720 return be16_to_cpu(h->command);
726 * drbd_socket_okay() - Free the socket if its connection is not okay
727 * @sock: pointer to the pointer to the socket.
729 static int drbd_socket_okay(struct socket **sock)
737 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
739 if (rr > 0 || rr == -EAGAIN) {
748 static int drbd_connected(int vnr, void *p, void *data)
750 struct drbd_conf *mdev = (struct drbd_conf *)p;
753 atomic_set(&mdev->packet_seq, 0);
756 ok &= drbd_send_sync_param(mdev, &mdev->sync_conf);
757 ok &= drbd_send_sizes(mdev, 0, 0);
758 ok &= drbd_send_uuids(mdev);
759 ok &= drbd_send_state(mdev);
760 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
761 clear_bit(RESIZE_PENDING, &mdev->flags);
768 * 1 yes, we have a valid connection
769 * 0 oops, did not work out, please try again
770 * -1 peer talks different language,
771 * no point in trying again, please go standalone.
772 * -2 We do not have a network config...
774 static int drbd_connect(struct drbd_tconn *tconn)
776 struct socket *s, *sock, *msock;
779 if (drbd_request_state(tconn->volume0, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
782 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
783 tconn->agreed_pro_version = 99;
784 /* agreed_pro_version must be smaller than 100 so we send the old
785 header (h80) in the first packet and in the handshake packet. */
792 /* 3 tries, this should take less than a second! */
793 s = drbd_try_connect(tconn);
796 /* give the other side time to call bind() & listen() */
797 schedule_timeout_interruptible(HZ / 10);
802 drbd_send_fp(tconn, s, P_HAND_SHAKE_S);
806 drbd_send_fp(tconn, s, P_HAND_SHAKE_M);
810 conn_err(tconn, "Logic error in drbd_connect()\n");
811 goto out_release_sockets;
816 schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
817 ok = drbd_socket_okay(&sock);
818 ok = drbd_socket_okay(&msock) && ok;
824 s = drbd_wait_for_connect(tconn);
826 try = drbd_recv_fp(tconn, s);
827 drbd_socket_okay(&sock);
828 drbd_socket_okay(&msock);
832 conn_warn(tconn, "initial packet S crossed\n");
839 conn_warn(tconn, "initial packet M crossed\n");
843 set_bit(DISCARD_CONCURRENT, &tconn->flags);
846 conn_warn(tconn, "Error receiving initial packet\n");
853 if (tconn->volume0->state.conn <= C_DISCONNECTING)
854 goto out_release_sockets;
855 if (signal_pending(current)) {
856 flush_signals(current);
858 if (get_t_state(&tconn->receiver) == EXITING)
859 goto out_release_sockets;
863 ok = drbd_socket_okay(&sock);
864 ok = drbd_socket_okay(&msock) && ok;
870 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
871 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
873 sock->sk->sk_allocation = GFP_NOIO;
874 msock->sk->sk_allocation = GFP_NOIO;
876 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
877 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
880 * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
881 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
882 * first set it to the P_HAND_SHAKE timeout,
883 * which we set to 4x the configured ping_timeout. */
884 sock->sk->sk_sndtimeo =
885 sock->sk->sk_rcvtimeo = tconn->net_conf->ping_timeo*4*HZ/10;
887 msock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
888 msock->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
890 /* we don't want delays.
891 * we use TCP_CORK where appropriate, though */
892 drbd_tcp_nodelay(sock);
893 drbd_tcp_nodelay(msock);
895 tconn->data.socket = sock;
896 tconn->meta.socket = msock;
897 tconn->last_received = jiffies;
899 h = drbd_do_handshake(tconn);
903 if (tconn->cram_hmac_tfm) {
904 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
905 switch (drbd_do_auth(tconn)) {
907 conn_err(tconn, "Authentication of peer failed\n");
910 conn_err(tconn, "Authentication of peer failed, trying again.\n");
915 if (drbd_request_state(tconn->volume0, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
918 sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
919 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
921 drbd_thread_start(&tconn->asender);
923 if (drbd_send_protocol(tconn) == -1)
926 return !idr_for_each(&tconn->volumes, drbd_connected, tconn);
936 static bool decode_header(struct drbd_tconn *tconn, struct p_header *h, struct packet_info *pi)
938 if (h->h80.magic == cpu_to_be32(DRBD_MAGIC)) {
939 pi->cmd = be16_to_cpu(h->h80.command);
940 pi->size = be16_to_cpu(h->h80.length);
942 } else if (h->h95.magic == cpu_to_be16(DRBD_MAGIC_BIG)) {
943 pi->cmd = be16_to_cpu(h->h95.command);
944 pi->size = be32_to_cpu(h->h95.length) & 0x00ffffff;
947 conn_err(tconn, "magic?? on data m: 0x%08x c: %d l: %d\n",
948 be32_to_cpu(h->h80.magic),
949 be16_to_cpu(h->h80.command),
950 be16_to_cpu(h->h80.length));
956 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
958 struct p_header *h = &tconn->data.rbuf.header;
961 r = drbd_recv(tconn, h, sizeof(*h));
962 if (unlikely(r != sizeof(*h))) {
963 if (!signal_pending(current))
964 conn_warn(tconn, "short read expecting header on sock: r=%d\n", r);
968 r = decode_header(tconn, h, pi);
969 tconn->last_received = jiffies;
974 static void drbd_flush(struct drbd_conf *mdev)
978 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
979 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
982 dev_err(DEV, "local disk flush failed with status %d\n", rv);
983 /* would rather check on EOPNOTSUPP, but that is not reliable.
984 * don't try again for ANY return value != 0
985 * if (rv == -EOPNOTSUPP) */
986 drbd_bump_write_ordering(mdev, WO_drain_io);
993 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
994 * @mdev: DRBD device.
995 * @epoch: Epoch object.
998 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
999 struct drbd_epoch *epoch,
1000 enum epoch_event ev)
1003 struct drbd_epoch *next_epoch;
1004 enum finish_epoch rv = FE_STILL_LIVE;
1006 spin_lock(&mdev->epoch_lock);
1010 epoch_size = atomic_read(&epoch->epoch_size);
1012 switch (ev & ~EV_CLEANUP) {
1014 atomic_dec(&epoch->active);
1016 case EV_GOT_BARRIER_NR:
1017 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1019 case EV_BECAME_LAST:
1024 if (epoch_size != 0 &&
1025 atomic_read(&epoch->active) == 0 &&
1026 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1027 if (!(ev & EV_CLEANUP)) {
1028 spin_unlock(&mdev->epoch_lock);
1029 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1030 spin_lock(&mdev->epoch_lock);
1034 if (mdev->current_epoch != epoch) {
1035 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1036 list_del(&epoch->list);
1037 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1041 if (rv == FE_STILL_LIVE)
1045 atomic_set(&epoch->epoch_size, 0);
1046 /* atomic_set(&epoch->active, 0); is already zero */
1047 if (rv == FE_STILL_LIVE)
1049 wake_up(&mdev->ee_wait);
1059 spin_unlock(&mdev->epoch_lock);
1065 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1066 * @mdev: DRBD device.
1067 * @wo: Write ordering method to try.
1069 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1071 enum write_ordering_e pwo;
1072 static char *write_ordering_str[] = {
1074 [WO_drain_io] = "drain",
1075 [WO_bdev_flush] = "flush",
1078 pwo = mdev->write_ordering;
1080 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1082 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1084 mdev->write_ordering = wo;
1085 if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1086 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1091 * @mdev: DRBD device.
1092 * @peer_req: peer request
1093 * @rw: flag field, see bio->bi_rw
1095 * May spread the pages to multiple bios,
1096 * depending on bio_add_page restrictions.
1098 * Returns 0 if all bios have been submitted,
1099 * -ENOMEM if we could not allocate enough bios,
1100 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1101 * single page to an empty bio (which should never happen and likely indicates
1102 * that the lower level IO stack is in some way broken). This has been observed
1103 * on certain Xen deployments.
1105 /* TODO allocate from our own bio_set. */
1106 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
1107 const unsigned rw, const int fault_type)
1109 struct bio *bios = NULL;
1111 struct page *page = peer_req->pages;
1112 sector_t sector = peer_req->i.sector;
1113 unsigned ds = peer_req->i.size;
1114 unsigned n_bios = 0;
1115 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1118 /* In most cases, we will only need one bio. But in case the lower
1119 * level restrictions happen to be different at this offset on this
1120 * side than those of the sending peer, we may need to submit the
1121 * request in more than one bio. */
1123 bio = bio_alloc(GFP_NOIO, nr_pages);
1125 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1128 /* > peer_req->i.sector, unless this is the first bio */
1129 bio->bi_sector = sector;
1130 bio->bi_bdev = mdev->ldev->backing_bdev;
1132 bio->bi_private = peer_req;
1133 bio->bi_end_io = drbd_endio_sec;
1135 bio->bi_next = bios;
1139 page_chain_for_each(page) {
1140 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1141 if (!bio_add_page(bio, page, len, 0)) {
1142 /* A single page must always be possible!
1143 * But in case it fails anyways,
1144 * we deal with it, and complain (below). */
1145 if (bio->bi_vcnt == 0) {
1147 "bio_add_page failed for len=%u, "
1148 "bi_vcnt=0 (bi_sector=%llu)\n",
1149 len, (unsigned long long)bio->bi_sector);
1159 D_ASSERT(page == NULL);
1162 atomic_set(&peer_req->pending_bios, n_bios);
1165 bios = bios->bi_next;
1166 bio->bi_next = NULL;
1168 drbd_generic_make_request(mdev, fault_type, bio);
1175 bios = bios->bi_next;
1181 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1182 struct drbd_peer_request *peer_req)
1184 struct drbd_interval *i = &peer_req->i;
1186 drbd_remove_interval(&mdev->write_requests, i);
1187 drbd_clear_interval(i);
1189 /* Wake up any processes waiting for this peer request to complete. */
1191 wake_up(&mdev->misc_wait);
1194 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packet cmd,
1195 unsigned int data_size)
1198 struct p_barrier *p = &mdev->tconn->data.rbuf.barrier;
1199 struct drbd_epoch *epoch;
1203 mdev->current_epoch->barrier_nr = p->barrier;
1204 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1206 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1207 * the activity log, which means it would not be resynced in case the
1208 * R_PRIMARY crashes now.
1209 * Therefore we must send the barrier_ack after the barrier request was
1211 switch (mdev->write_ordering) {
1213 if (rv == FE_RECYCLED)
1216 /* receiver context, in the writeout path of the other node.
1217 * avoid potential distributed deadlock */
1218 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1222 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1227 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1230 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1231 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1236 epoch = mdev->current_epoch;
1237 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1239 D_ASSERT(atomic_read(&epoch->active) == 0);
1240 D_ASSERT(epoch->flags == 0);
1244 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1249 atomic_set(&epoch->epoch_size, 0);
1250 atomic_set(&epoch->active, 0);
1252 spin_lock(&mdev->epoch_lock);
1253 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1254 list_add(&epoch->list, &mdev->current_epoch->list);
1255 mdev->current_epoch = epoch;
1258 /* The current_epoch got recycled while we allocated this one... */
1261 spin_unlock(&mdev->epoch_lock);
1266 /* used from receive_RSDataReply (recv_resync_read)
1267 * and from receive_Data */
1268 static struct drbd_peer_request *
1269 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1270 int data_size) __must_hold(local)
1272 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1273 struct drbd_peer_request *peer_req;
1276 void *dig_in = mdev->tconn->int_dig_in;
1277 void *dig_vv = mdev->tconn->int_dig_vv;
1278 unsigned long *data;
1280 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1281 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1284 rr = drbd_recv(mdev->tconn, dig_in, dgs);
1286 if (!signal_pending(current))
1288 "short read receiving data digest: read %d expected %d\n",
1296 if (!expect(data_size != 0))
1298 if (!expect(IS_ALIGNED(data_size, 512)))
1300 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1303 /* even though we trust out peer,
1304 * we sometimes have to double check. */
1305 if (sector + (data_size>>9) > capacity) {
1306 dev_err(DEV, "request from peer beyond end of local disk: "
1307 "capacity: %llus < sector: %llus + size: %u\n",
1308 (unsigned long long)capacity,
1309 (unsigned long long)sector, data_size);
1313 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1314 * "criss-cross" setup, that might cause write-out on some other DRBD,
1315 * which in turn might block on the other node at this very place. */
1316 peer_req = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1321 page = peer_req->pages;
1322 page_chain_for_each(page) {
1323 unsigned len = min_t(int, ds, PAGE_SIZE);
1325 rr = drbd_recv(mdev->tconn, data, len);
1326 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1327 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1328 data[0] = data[0] ^ (unsigned long)-1;
1332 drbd_free_ee(mdev, peer_req);
1333 if (!signal_pending(current))
1334 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1342 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1343 if (memcmp(dig_in, dig_vv, dgs)) {
1344 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1345 (unsigned long long)sector, data_size);
1346 drbd_bcast_ee(mdev, "digest failed",
1347 dgs, dig_in, dig_vv, peer_req);
1348 drbd_free_ee(mdev, peer_req);
1352 mdev->recv_cnt += data_size>>9;
1356 /* drbd_drain_block() just takes a data block
1357 * out of the socket input buffer, and discards it.
1359 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1368 page = drbd_pp_alloc(mdev, 1, 1);
1372 rr = drbd_recv(mdev->tconn, data, min_t(int, data_size, PAGE_SIZE));
1373 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1375 if (!signal_pending(current))
1377 "short read receiving data: read %d expected %d\n",
1378 rr, min_t(int, data_size, PAGE_SIZE));
1384 drbd_pp_free(mdev, page, 0);
1388 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1389 sector_t sector, int data_size)
1391 struct bio_vec *bvec;
1393 int dgs, rr, i, expect;
1394 void *dig_in = mdev->tconn->int_dig_in;
1395 void *dig_vv = mdev->tconn->int_dig_vv;
1397 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1398 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1401 rr = drbd_recv(mdev->tconn, dig_in, dgs);
1403 if (!signal_pending(current))
1405 "short read receiving data reply digest: read %d expected %d\n",
1413 /* optimistically update recv_cnt. if receiving fails below,
1414 * we disconnect anyways, and counters will be reset. */
1415 mdev->recv_cnt += data_size>>9;
1417 bio = req->master_bio;
1418 D_ASSERT(sector == bio->bi_sector);
1420 bio_for_each_segment(bvec, bio, i) {
1421 expect = min_t(int, data_size, bvec->bv_len);
1422 rr = drbd_recv(mdev->tconn,
1423 kmap(bvec->bv_page)+bvec->bv_offset,
1425 kunmap(bvec->bv_page);
1427 if (!signal_pending(current))
1428 dev_warn(DEV, "short read receiving data reply: "
1429 "read %d expected %d\n",
1437 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1438 if (memcmp(dig_in, dig_vv, dgs)) {
1439 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1444 D_ASSERT(data_size == 0);
1448 /* e_end_resync_block() is called via
1449 * drbd_process_done_ee() by asender only */
1450 static int e_end_resync_block(struct drbd_work *w, int unused)
1452 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1453 struct drbd_conf *mdev = w->mdev;
1454 sector_t sector = peer_req->i.sector;
1457 D_ASSERT(drbd_interval_empty(&peer_req->i));
1459 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1460 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1461 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1463 /* Record failure to sync */
1464 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1466 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1473 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1475 struct drbd_peer_request *peer_req;
1477 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1481 dec_rs_pending(mdev);
1484 /* corresponding dec_unacked() in e_end_resync_block()
1485 * respective _drbd_clear_done_ee */
1487 peer_req->w.cb = e_end_resync_block;
1489 spin_lock_irq(&mdev->tconn->req_lock);
1490 list_add(&peer_req->w.list, &mdev->sync_ee);
1491 spin_unlock_irq(&mdev->tconn->req_lock);
1493 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1494 if (drbd_submit_ee(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1497 /* don't care for the reason here */
1498 dev_err(DEV, "submit failed, triggering re-connect\n");
1499 spin_lock_irq(&mdev->tconn->req_lock);
1500 list_del(&peer_req->w.list);
1501 spin_unlock_irq(&mdev->tconn->req_lock);
1503 drbd_free_ee(mdev, peer_req);
1509 static struct drbd_request *
1510 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1511 sector_t sector, bool missing_ok, const char *func)
1513 struct drbd_request *req;
1515 /* Request object according to our peer */
1516 req = (struct drbd_request *)(unsigned long)id;
1517 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1520 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1521 (unsigned long)id, (unsigned long long)sector);
1526 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1527 unsigned int data_size)
1529 struct drbd_request *req;
1532 struct p_data *p = &mdev->tconn->data.rbuf.data;
1534 sector = be64_to_cpu(p->sector);
1536 spin_lock_irq(&mdev->tconn->req_lock);
1537 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1538 spin_unlock_irq(&mdev->tconn->req_lock);
1542 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1543 * special casing it there for the various failure cases.
1544 * still no race with drbd_fail_pending_reads */
1545 ok = recv_dless_read(mdev, req, sector, data_size);
1548 req_mod(req, DATA_RECEIVED);
1549 /* else: nothing. handled from drbd_disconnect...
1550 * I don't think we may complete this just yet
1551 * in case we are "on-disconnect: freeze" */
1556 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packet cmd,
1557 unsigned int data_size)
1561 struct p_data *p = &mdev->tconn->data.rbuf.data;
1563 sector = be64_to_cpu(p->sector);
1564 D_ASSERT(p->block_id == ID_SYNCER);
1566 if (get_ldev(mdev)) {
1567 /* data is submitted to disk within recv_resync_read.
1568 * corresponding put_ldev done below on error,
1569 * or in drbd_endio_sec. */
1570 ok = recv_resync_read(mdev, sector, data_size);
1572 if (__ratelimit(&drbd_ratelimit_state))
1573 dev_err(DEV, "Can not write resync data to local disk.\n");
1575 ok = drbd_drain_block(mdev, data_size);
1577 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1580 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1585 /* e_end_block() is called via drbd_process_done_ee().
1586 * this means this function only runs in the asender thread
1588 static int e_end_block(struct drbd_work *w, int cancel)
1590 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1591 struct drbd_conf *mdev = w->mdev;
1592 sector_t sector = peer_req->i.sector;
1595 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C) {
1596 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1597 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1598 mdev->state.conn <= C_PAUSED_SYNC_T &&
1599 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1600 P_RS_WRITE_ACK : P_WRITE_ACK;
1601 ok &= drbd_send_ack(mdev, pcmd, peer_req);
1602 if (pcmd == P_RS_WRITE_ACK)
1603 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1605 ok = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1606 /* we expect it to be marked out of sync anyways...
1607 * maybe assert this? */
1611 /* we delete from the conflict detection hash _after_ we sent out the
1612 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1613 if (mdev->tconn->net_conf->two_primaries) {
1614 spin_lock_irq(&mdev->tconn->req_lock);
1615 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1616 drbd_remove_epoch_entry_interval(mdev, peer_req);
1617 spin_unlock_irq(&mdev->tconn->req_lock);
1619 D_ASSERT(drbd_interval_empty(&peer_req->i));
1621 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1626 static int e_send_discard_ack(struct drbd_work *w, int unused)
1628 struct drbd_peer_request *peer_req = (struct drbd_peer_request *)w;
1629 struct drbd_conf *mdev = w->mdev;
1632 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
1633 ok = drbd_send_ack(mdev, P_DISCARD_ACK, peer_req);
1635 spin_lock_irq(&mdev->tconn->req_lock);
1636 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1637 drbd_remove_epoch_entry_interval(mdev, peer_req);
1638 spin_unlock_irq(&mdev->tconn->req_lock);
1645 static bool seq_greater(u32 a, u32 b)
1648 * We assume 32-bit wrap-around here.
1649 * For 24-bit wrap-around, we would have to shift:
1652 return (s32)a - (s32)b > 0;
1655 static u32 seq_max(u32 a, u32 b)
1657 return seq_greater(a, b) ? a : b;
1660 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1662 unsigned int old_peer_seq;
1664 spin_lock(&mdev->peer_seq_lock);
1665 old_peer_seq = mdev->peer_seq;
1666 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1667 spin_unlock(&mdev->peer_seq_lock);
1668 if (old_peer_seq != peer_seq)
1669 wake_up(&mdev->seq_wait);
1672 /* Called from receive_Data.
1673 * Synchronize packets on sock with packets on msock.
1675 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1676 * packet traveling on msock, they are still processed in the order they have
1679 * Note: we don't care for Ack packets overtaking P_DATA packets.
1681 * In case packet_seq is larger than mdev->peer_seq number, there are
1682 * outstanding packets on the msock. We wait for them to arrive.
1683 * In case we are the logically next packet, we update mdev->peer_seq
1684 * ourselves. Correctly handles 32bit wrap around.
1686 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1687 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1688 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1689 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1691 * returns 0 if we may process the packet,
1692 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1693 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1699 spin_lock(&mdev->peer_seq_lock);
1701 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1702 if (!seq_greater(packet_seq, mdev->peer_seq + 1))
1704 if (signal_pending(current)) {
1708 p_seq = mdev->peer_seq;
1709 spin_unlock(&mdev->peer_seq_lock);
1710 timeout = schedule_timeout(30*HZ);
1711 spin_lock(&mdev->peer_seq_lock);
1712 if (timeout == 0 && p_seq == mdev->peer_seq) {
1714 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1718 finish_wait(&mdev->seq_wait, &wait);
1719 if (mdev->peer_seq+1 == packet_seq)
1721 spin_unlock(&mdev->peer_seq_lock);
1725 /* see also bio_flags_to_wire()
1726 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1727 * flags and back. We may replicate to other kernel versions. */
1728 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1730 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1731 (dpf & DP_FUA ? REQ_FUA : 0) |
1732 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1733 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1736 /* mirrored write */
1737 static int receive_Data(struct drbd_conf *mdev, enum drbd_packet cmd,
1738 unsigned int data_size)
1741 struct drbd_peer_request *peer_req;
1742 struct p_data *p = &mdev->tconn->data.rbuf.data;
1746 if (!get_ldev(mdev)) {
1747 spin_lock(&mdev->peer_seq_lock);
1748 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1750 spin_unlock(&mdev->peer_seq_lock);
1752 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1753 atomic_inc(&mdev->current_epoch->epoch_size);
1754 return drbd_drain_block(mdev, data_size);
1757 /* get_ldev(mdev) successful.
1758 * Corresponding put_ldev done either below (on various errors),
1759 * or in drbd_endio_sec, if we successfully submit the data at
1760 * the end of this function. */
1762 sector = be64_to_cpu(p->sector);
1763 peer_req = read_in_block(mdev, p->block_id, sector, data_size);
1769 peer_req->w.cb = e_end_block;
1771 dp_flags = be32_to_cpu(p->dp_flags);
1772 rw |= wire_flags_to_bio(mdev, dp_flags);
1774 if (dp_flags & DP_MAY_SET_IN_SYNC)
1775 peer_req->flags |= EE_MAY_SET_IN_SYNC;
1777 spin_lock(&mdev->epoch_lock);
1778 peer_req->epoch = mdev->current_epoch;
1779 atomic_inc(&peer_req->epoch->epoch_size);
1780 atomic_inc(&peer_req->epoch->active);
1781 spin_unlock(&mdev->epoch_lock);
1783 /* I'm the receiver, I do hold a net_cnt reference. */
1784 if (!mdev->tconn->net_conf->two_primaries) {
1785 spin_lock_irq(&mdev->tconn->req_lock);
1787 /* don't get the req_lock yet,
1788 * we may sleep in drbd_wait_peer_seq */
1789 const int size = peer_req->i.size;
1790 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
1794 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
1796 /* conflict detection and handling:
1797 * 1. wait on the sequence number,
1798 * in case this data packet overtook ACK packets.
1799 * 2. check for conflicting write requests.
1801 * Note: for two_primaries, we are protocol C,
1802 * so there cannot be any request that is DONE
1803 * but still on the transfer log.
1805 * if no conflicting request is found:
1808 * if any conflicting request is found
1809 * that has not yet been acked,
1810 * AND I have the "discard concurrent writes" flag:
1811 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1813 * if any conflicting request is found:
1814 * block the receiver, waiting on misc_wait
1815 * until no more conflicting requests are there,
1816 * or we get interrupted (disconnect).
1818 * we do not just write after local io completion of those
1819 * requests, but only after req is done completely, i.e.
1820 * we wait for the P_DISCARD_ACK to arrive!
1822 * then proceed normally, i.e. submit.
1824 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1825 goto out_interrupted;
1827 spin_lock_irq(&mdev->tconn->req_lock);
1831 struct drbd_interval *i;
1832 int have_unacked = 0;
1833 int have_conflict = 0;
1834 prepare_to_wait(&mdev->misc_wait, &wait,
1835 TASK_INTERRUPTIBLE);
1837 i = drbd_find_overlap(&mdev->write_requests, sector, size);
1839 /* only ALERT on first iteration,
1840 * we may be woken up early... */
1842 dev_alert(DEV, "%s[%u] Concurrent %s write detected!"
1843 " new: %llus +%u; pending: %llus +%u\n",
1844 current->comm, current->pid,
1845 i->local ? "local" : "remote",
1846 (unsigned long long)sector, size,
1847 (unsigned long long)i->sector, i->size);
1850 struct drbd_request *req2;
1852 req2 = container_of(i, struct drbd_request, i);
1853 if (req2->rq_state & RQ_NET_PENDING)
1861 /* Discard Ack only for the _first_ iteration */
1862 if (first && discard && have_unacked) {
1863 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1864 (unsigned long long)sector);
1866 peer_req->w.cb = e_send_discard_ack;
1867 list_add_tail(&peer_req->w.list, &mdev->done_ee);
1869 spin_unlock_irq(&mdev->tconn->req_lock);
1871 /* we could probably send that P_DISCARD_ACK ourselves,
1872 * but I don't like the receiver using the msock */
1875 wake_asender(mdev->tconn);
1876 finish_wait(&mdev->misc_wait, &wait);
1880 if (signal_pending(current)) {
1881 spin_unlock_irq(&mdev->tconn->req_lock);
1882 finish_wait(&mdev->misc_wait, &wait);
1883 goto out_interrupted;
1886 /* Indicate to wake up mdev->misc_wait upon completion. */
1889 spin_unlock_irq(&mdev->tconn->req_lock);
1892 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1893 "sec=%llus\n", (unsigned long long)sector);
1894 } else if (discard) {
1895 /* we had none on the first iteration.
1896 * there must be none now. */
1897 D_ASSERT(have_unacked == 0);
1900 spin_lock_irq(&mdev->tconn->req_lock);
1902 finish_wait(&mdev->misc_wait, &wait);
1904 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1907 list_add(&peer_req->w.list, &mdev->active_ee);
1908 spin_unlock_irq(&mdev->tconn->req_lock);
1910 switch (mdev->tconn->net_conf->wire_protocol) {
1913 /* corresponding dec_unacked() in e_end_block()
1914 * respective _drbd_clear_done_ee */
1917 /* I really don't like it that the receiver thread
1918 * sends on the msock, but anyways */
1919 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
1926 if (mdev->state.pdsk < D_INCONSISTENT) {
1927 /* In case we have the only disk of the cluster, */
1928 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
1929 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
1930 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
1931 drbd_al_begin_io(mdev, peer_req->i.sector);
1934 if (drbd_submit_ee(mdev, peer_req, rw, DRBD_FAULT_DT_WR) == 0)
1937 /* don't care for the reason here */
1938 dev_err(DEV, "submit failed, triggering re-connect\n");
1939 spin_lock_irq(&mdev->tconn->req_lock);
1940 list_del(&peer_req->w.list);
1941 drbd_remove_epoch_entry_interval(mdev, peer_req);
1942 spin_unlock_irq(&mdev->tconn->req_lock);
1943 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
1944 drbd_al_complete_io(mdev, peer_req->i.sector);
1947 drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
1949 drbd_free_ee(mdev, peer_req);
1953 /* We may throttle resync, if the lower device seems to be busy,
1954 * and current sync rate is above c_min_rate.
1956 * To decide whether or not the lower device is busy, we use a scheme similar
1957 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1958 * (more than 64 sectors) of activity we cannot account for with our own resync
1959 * activity, it obviously is "busy".
1961 * The current sync rate used here uses only the most recent two step marks,
1962 * to have a short time average so we can react faster.
1964 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1966 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1967 unsigned long db, dt, dbdt;
1968 struct lc_element *tmp;
1972 /* feature disabled? */
1973 if (mdev->sync_conf.c_min_rate == 0)
1976 spin_lock_irq(&mdev->al_lock);
1977 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1979 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1980 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1981 spin_unlock_irq(&mdev->al_lock);
1984 /* Do not slow down if app IO is already waiting for this extent */
1986 spin_unlock_irq(&mdev->al_lock);
1988 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1989 (int)part_stat_read(&disk->part0, sectors[1]) -
1990 atomic_read(&mdev->rs_sect_ev);
1992 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1993 unsigned long rs_left;
1996 mdev->rs_last_events = curr_events;
1998 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2000 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2002 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2003 rs_left = mdev->ov_left;
2005 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2007 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2010 db = mdev->rs_mark_left[i] - rs_left;
2011 dbdt = Bit2KB(db/dt);
2013 if (dbdt > mdev->sync_conf.c_min_rate)
2020 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packet cmd,
2021 unsigned int digest_size)
2024 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2025 struct drbd_peer_request *peer_req;
2026 struct digest_info *di = NULL;
2028 unsigned int fault_type;
2029 struct p_block_req *p = &mdev->tconn->data.rbuf.block_req;
2031 sector = be64_to_cpu(p->sector);
2032 size = be32_to_cpu(p->blksize);
2034 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
2035 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2036 (unsigned long long)sector, size);
2039 if (sector + (size>>9) > capacity) {
2040 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2041 (unsigned long long)sector, size);
2045 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2048 case P_DATA_REQUEST:
2049 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2051 case P_RS_DATA_REQUEST:
2052 case P_CSUM_RS_REQUEST:
2054 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2058 dec_rs_pending(mdev);
2059 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2062 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2065 if (verb && __ratelimit(&drbd_ratelimit_state))
2066 dev_err(DEV, "Can not satisfy peer's read request, "
2067 "no local data.\n");
2069 /* drain possibly payload */
2070 return drbd_drain_block(mdev, digest_size);
2073 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2074 * "criss-cross" setup, that might cause write-out on some other DRBD,
2075 * which in turn might block on the other node at this very place. */
2076 peer_req = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2083 case P_DATA_REQUEST:
2084 peer_req->w.cb = w_e_end_data_req;
2085 fault_type = DRBD_FAULT_DT_RD;
2086 /* application IO, don't drbd_rs_begin_io */
2089 case P_RS_DATA_REQUEST:
2090 peer_req->w.cb = w_e_end_rsdata_req;
2091 fault_type = DRBD_FAULT_RS_RD;
2092 /* used in the sector offset progress display */
2093 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2097 case P_CSUM_RS_REQUEST:
2098 fault_type = DRBD_FAULT_RS_RD;
2099 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2103 di->digest_size = digest_size;
2104 di->digest = (((char *)di)+sizeof(struct digest_info));
2106 peer_req->digest = di;
2107 peer_req->flags |= EE_HAS_DIGEST;
2109 if (drbd_recv(mdev->tconn, di->digest, digest_size) != digest_size)
2112 if (cmd == P_CSUM_RS_REQUEST) {
2113 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2114 peer_req->w.cb = w_e_end_csum_rs_req;
2115 /* used in the sector offset progress display */
2116 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2117 } else if (cmd == P_OV_REPLY) {
2118 /* track progress, we may need to throttle */
2119 atomic_add(size >> 9, &mdev->rs_sect_in);
2120 peer_req->w.cb = w_e_end_ov_reply;
2121 dec_rs_pending(mdev);
2122 /* drbd_rs_begin_io done when we sent this request,
2123 * but accounting still needs to be done. */
2124 goto submit_for_resync;
2129 if (mdev->ov_start_sector == ~(sector_t)0 &&
2130 mdev->tconn->agreed_pro_version >= 90) {
2131 unsigned long now = jiffies;
2133 mdev->ov_start_sector = sector;
2134 mdev->ov_position = sector;
2135 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2136 mdev->rs_total = mdev->ov_left;
2137 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2138 mdev->rs_mark_left[i] = mdev->ov_left;
2139 mdev->rs_mark_time[i] = now;
2141 dev_info(DEV, "Online Verify start sector: %llu\n",
2142 (unsigned long long)sector);
2144 peer_req->w.cb = w_e_end_ov_req;
2145 fault_type = DRBD_FAULT_RS_RD;
2149 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2151 fault_type = DRBD_FAULT_MAX;
2155 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2156 * wrt the receiver, but it is not as straightforward as it may seem.
2157 * Various places in the resync start and stop logic assume resync
2158 * requests are processed in order, requeuing this on the worker thread
2159 * introduces a bunch of new code for synchronization between threads.
2161 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2162 * "forever", throttling after drbd_rs_begin_io will lock that extent
2163 * for application writes for the same time. For now, just throttle
2164 * here, where the rest of the code expects the receiver to sleep for
2168 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2169 * this defers syncer requests for some time, before letting at least
2170 * on request through. The resync controller on the receiving side
2171 * will adapt to the incoming rate accordingly.
2173 * We cannot throttle here if remote is Primary/SyncTarget:
2174 * we would also throttle its application reads.
2175 * In that case, throttling is done on the SyncTarget only.
2177 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2178 schedule_timeout_uninterruptible(HZ/10);
2179 if (drbd_rs_begin_io(mdev, sector))
2183 atomic_add(size >> 9, &mdev->rs_sect_ev);
2187 spin_lock_irq(&mdev->tconn->req_lock);
2188 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2189 spin_unlock_irq(&mdev->tconn->req_lock);
2191 if (drbd_submit_ee(mdev, peer_req, READ, fault_type) == 0)
2194 /* don't care for the reason here */
2195 dev_err(DEV, "submit failed, triggering re-connect\n");
2196 spin_lock_irq(&mdev->tconn->req_lock);
2197 list_del(&peer_req->w.list);
2198 spin_unlock_irq(&mdev->tconn->req_lock);
2199 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2203 drbd_free_ee(mdev, peer_req);
2207 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2209 int self, peer, rv = -100;
2210 unsigned long ch_self, ch_peer;
2212 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2213 peer = mdev->p_uuid[UI_BITMAP] & 1;
2215 ch_peer = mdev->p_uuid[UI_SIZE];
2216 ch_self = mdev->comm_bm_set;
2218 switch (mdev->tconn->net_conf->after_sb_0p) {
2220 case ASB_DISCARD_SECONDARY:
2221 case ASB_CALL_HELPER:
2222 dev_err(DEV, "Configuration error.\n");
2224 case ASB_DISCONNECT:
2226 case ASB_DISCARD_YOUNGER_PRI:
2227 if (self == 0 && peer == 1) {
2231 if (self == 1 && peer == 0) {
2235 /* Else fall through to one of the other strategies... */
2236 case ASB_DISCARD_OLDER_PRI:
2237 if (self == 0 && peer == 1) {
2241 if (self == 1 && peer == 0) {
2245 /* Else fall through to one of the other strategies... */
2246 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2247 "Using discard-least-changes instead\n");
2248 case ASB_DISCARD_ZERO_CHG:
2249 if (ch_peer == 0 && ch_self == 0) {
2250 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2254 if (ch_peer == 0) { rv = 1; break; }
2255 if (ch_self == 0) { rv = -1; break; }
2257 if (mdev->tconn->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2259 case ASB_DISCARD_LEAST_CHG:
2260 if (ch_self < ch_peer)
2262 else if (ch_self > ch_peer)
2264 else /* ( ch_self == ch_peer ) */
2265 /* Well, then use something else. */
2266 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2269 case ASB_DISCARD_LOCAL:
2272 case ASB_DISCARD_REMOTE:
2279 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2283 switch (mdev->tconn->net_conf->after_sb_1p) {
2284 case ASB_DISCARD_YOUNGER_PRI:
2285 case ASB_DISCARD_OLDER_PRI:
2286 case ASB_DISCARD_LEAST_CHG:
2287 case ASB_DISCARD_LOCAL:
2288 case ASB_DISCARD_REMOTE:
2289 dev_err(DEV, "Configuration error.\n");
2291 case ASB_DISCONNECT:
2294 hg = drbd_asb_recover_0p(mdev);
2295 if (hg == -1 && mdev->state.role == R_SECONDARY)
2297 if (hg == 1 && mdev->state.role == R_PRIMARY)
2301 rv = drbd_asb_recover_0p(mdev);
2303 case ASB_DISCARD_SECONDARY:
2304 return mdev->state.role == R_PRIMARY ? 1 : -1;
2305 case ASB_CALL_HELPER:
2306 hg = drbd_asb_recover_0p(mdev);
2307 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2308 enum drbd_state_rv rv2;
2310 drbd_set_role(mdev, R_SECONDARY, 0);
2311 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2312 * we might be here in C_WF_REPORT_PARAMS which is transient.
2313 * we do not need to wait for the after state change work either. */
2314 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2315 if (rv2 != SS_SUCCESS) {
2316 drbd_khelper(mdev, "pri-lost-after-sb");
2318 dev_warn(DEV, "Successfully gave up primary role.\n");
2328 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2332 switch (mdev->tconn->net_conf->after_sb_2p) {
2333 case ASB_DISCARD_YOUNGER_PRI:
2334 case ASB_DISCARD_OLDER_PRI:
2335 case ASB_DISCARD_LEAST_CHG:
2336 case ASB_DISCARD_LOCAL:
2337 case ASB_DISCARD_REMOTE:
2339 case ASB_DISCARD_SECONDARY:
2340 dev_err(DEV, "Configuration error.\n");
2343 rv = drbd_asb_recover_0p(mdev);
2345 case ASB_DISCONNECT:
2347 case ASB_CALL_HELPER:
2348 hg = drbd_asb_recover_0p(mdev);
2350 enum drbd_state_rv rv2;
2352 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2353 * we might be here in C_WF_REPORT_PARAMS which is transient.
2354 * we do not need to wait for the after state change work either. */
2355 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2356 if (rv2 != SS_SUCCESS) {
2357 drbd_khelper(mdev, "pri-lost-after-sb");
2359 dev_warn(DEV, "Successfully gave up primary role.\n");
2369 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2370 u64 bits, u64 flags)
2373 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2376 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2378 (unsigned long long)uuid[UI_CURRENT],
2379 (unsigned long long)uuid[UI_BITMAP],
2380 (unsigned long long)uuid[UI_HISTORY_START],
2381 (unsigned long long)uuid[UI_HISTORY_END],
2382 (unsigned long long)bits,
2383 (unsigned long long)flags);
2387 100 after split brain try auto recover
2388 2 C_SYNC_SOURCE set BitMap
2389 1 C_SYNC_SOURCE use BitMap
2391 -1 C_SYNC_TARGET use BitMap
2392 -2 C_SYNC_TARGET set BitMap
2393 -100 after split brain, disconnect
2394 -1000 unrelated data
2395 -1091 requires proto 91
2396 -1096 requires proto 96
2398 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2403 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2404 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2407 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2411 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2412 peer != UUID_JUST_CREATED)
2416 if (self != UUID_JUST_CREATED &&
2417 (peer == UUID_JUST_CREATED || peer == (u64)0))
2421 int rct, dc; /* roles at crash time */
2423 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2425 if (mdev->tconn->agreed_pro_version < 91)
2428 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2429 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2430 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2431 drbd_uuid_set_bm(mdev, 0UL);
2433 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2434 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2437 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2444 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2446 if (mdev->tconn->agreed_pro_version < 91)
2449 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2450 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2451 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2453 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2454 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2455 mdev->p_uuid[UI_BITMAP] = 0UL;
2457 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2460 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2467 /* Common power [off|failure] */
2468 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2469 (mdev->p_uuid[UI_FLAGS] & 2);
2470 /* lowest bit is set when we were primary,
2471 * next bit (weight 2) is set when peer was primary */
2475 case 0: /* !self_pri && !peer_pri */ return 0;
2476 case 1: /* self_pri && !peer_pri */ return 1;
2477 case 2: /* !self_pri && peer_pri */ return -1;
2478 case 3: /* self_pri && peer_pri */
2479 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2485 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2490 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2492 if (mdev->tconn->agreed_pro_version < 96 ?
2493 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2494 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2495 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2496 /* The last P_SYNC_UUID did not get though. Undo the last start of
2497 resync as sync source modifications of the peer's UUIDs. */
2499 if (mdev->tconn->agreed_pro_version < 91)
2502 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2503 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2505 dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2506 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2513 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2514 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2515 peer = mdev->p_uuid[i] & ~((u64)1);
2521 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2522 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2527 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2529 if (mdev->tconn->agreed_pro_version < 96 ?
2530 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2531 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2532 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2533 /* The last P_SYNC_UUID did not get though. Undo the last start of
2534 resync as sync source modifications of our UUIDs. */
2536 if (mdev->tconn->agreed_pro_version < 91)
2539 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2540 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2542 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2543 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2544 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2552 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2553 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2554 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2560 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2561 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2562 if (self == peer && self != ((u64)0))
2566 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2567 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2568 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2569 peer = mdev->p_uuid[j] & ~((u64)1);
2578 /* drbd_sync_handshake() returns the new conn state on success, or
2579 CONN_MASK (-1) on failure.
2581 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2582 enum drbd_disk_state peer_disk) __must_hold(local)
2585 enum drbd_conns rv = C_MASK;
2586 enum drbd_disk_state mydisk;
2588 mydisk = mdev->state.disk;
2589 if (mydisk == D_NEGOTIATING)
2590 mydisk = mdev->new_state_tmp.disk;
2592 dev_info(DEV, "drbd_sync_handshake:\n");
2593 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2594 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2595 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2597 hg = drbd_uuid_compare(mdev, &rule_nr);
2599 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2602 dev_alert(DEV, "Unrelated data, aborting!\n");
2606 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2610 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2611 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2612 int f = (hg == -100) || abs(hg) == 2;
2613 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2616 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2617 hg > 0 ? "source" : "target");
2621 drbd_khelper(mdev, "initial-split-brain");
2623 if (hg == 100 || (hg == -100 && mdev->tconn->net_conf->always_asbp)) {
2624 int pcount = (mdev->state.role == R_PRIMARY)
2625 + (peer_role == R_PRIMARY);
2626 int forced = (hg == -100);
2630 hg = drbd_asb_recover_0p(mdev);
2633 hg = drbd_asb_recover_1p(mdev);
2636 hg = drbd_asb_recover_2p(mdev);
2639 if (abs(hg) < 100) {
2640 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2641 "automatically solved. Sync from %s node\n",
2642 pcount, (hg < 0) ? "peer" : "this");
2644 dev_warn(DEV, "Doing a full sync, since"
2645 " UUIDs where ambiguous.\n");
2652 if (mdev->tconn->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2654 if (!mdev->tconn->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2658 dev_warn(DEV, "Split-Brain detected, manually solved. "
2659 "Sync from %s node\n",
2660 (hg < 0) ? "peer" : "this");
2664 /* FIXME this log message is not correct if we end up here
2665 * after an attempted attach on a diskless node.
2666 * We just refuse to attach -- well, we drop the "connection"
2667 * to that disk, in a way... */
2668 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2669 drbd_khelper(mdev, "split-brain");
2673 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2674 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2678 if (hg < 0 && /* by intention we do not use mydisk here. */
2679 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2680 switch (mdev->tconn->net_conf->rr_conflict) {
2681 case ASB_CALL_HELPER:
2682 drbd_khelper(mdev, "pri-lost");
2684 case ASB_DISCONNECT:
2685 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2688 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2693 if (mdev->tconn->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2695 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2697 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2698 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2699 abs(hg) >= 2 ? "full" : "bit-map based");
2704 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2705 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2706 BM_LOCKED_SET_ALLOWED))
2710 if (hg > 0) { /* become sync source. */
2712 } else if (hg < 0) { /* become sync target */
2716 if (drbd_bm_total_weight(mdev)) {
2717 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2718 drbd_bm_total_weight(mdev));
2725 /* returns 1 if invalid */
2726 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2728 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2729 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2730 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2733 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2734 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2735 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2738 /* everything else is valid if they are equal on both sides. */
2742 /* everything es is invalid. */
2746 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packet cmd,
2747 unsigned int data_size)
2749 struct p_protocol *p = &mdev->tconn->data.rbuf.protocol;
2750 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2751 int p_want_lose, p_two_primaries, cf;
2752 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2754 p_proto = be32_to_cpu(p->protocol);
2755 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2756 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2757 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2758 p_two_primaries = be32_to_cpu(p->two_primaries);
2759 cf = be32_to_cpu(p->conn_flags);
2760 p_want_lose = cf & CF_WANT_LOSE;
2762 clear_bit(CONN_DRY_RUN, &mdev->flags);
2764 if (cf & CF_DRY_RUN)
2765 set_bit(CONN_DRY_RUN, &mdev->flags);
2767 if (p_proto != mdev->tconn->net_conf->wire_protocol) {
2768 dev_err(DEV, "incompatible communication protocols\n");
2772 if (cmp_after_sb(p_after_sb_0p, mdev->tconn->net_conf->after_sb_0p)) {
2773 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2777 if (cmp_after_sb(p_after_sb_1p, mdev->tconn->net_conf->after_sb_1p)) {
2778 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2782 if (cmp_after_sb(p_after_sb_2p, mdev->tconn->net_conf->after_sb_2p)) {
2783 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2787 if (p_want_lose && mdev->tconn->net_conf->want_lose) {
2788 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2792 if (p_two_primaries != mdev->tconn->net_conf->two_primaries) {
2793 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2797 if (mdev->tconn->agreed_pro_version >= 87) {
2798 unsigned char *my_alg = mdev->tconn->net_conf->integrity_alg;
2800 if (drbd_recv(mdev->tconn, p_integrity_alg, data_size) != data_size)
2803 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2804 if (strcmp(p_integrity_alg, my_alg)) {
2805 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2808 dev_info(DEV, "data-integrity-alg: %s\n",
2809 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2815 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2820 * input: alg name, feature name
2821 * return: NULL (alg name was "")
2822 * ERR_PTR(error) if something goes wrong
2823 * or the crypto hash ptr, if it worked out ok. */
2824 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2825 const char *alg, const char *name)
2827 struct crypto_hash *tfm;
2832 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2834 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2835 alg, name, PTR_ERR(tfm));
2838 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2839 crypto_free_hash(tfm);
2840 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2841 return ERR_PTR(-EINVAL);
2846 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packet cmd,
2847 unsigned int packet_size)
2850 struct p_rs_param_95 *p = &mdev->tconn->data.rbuf.rs_param_95;
2851 unsigned int header_size, data_size, exp_max_sz;
2852 struct crypto_hash *verify_tfm = NULL;
2853 struct crypto_hash *csums_tfm = NULL;
2854 const int apv = mdev->tconn->agreed_pro_version;
2855 int *rs_plan_s = NULL;
2858 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2859 : apv == 88 ? sizeof(struct p_rs_param)
2861 : apv <= 94 ? sizeof(struct p_rs_param_89)
2862 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2864 if (packet_size > exp_max_sz) {
2865 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2866 packet_size, exp_max_sz);
2871 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header);
2872 data_size = packet_size - header_size;
2873 } else if (apv <= 94) {
2874 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header);
2875 data_size = packet_size - header_size;
2876 D_ASSERT(data_size == 0);
2878 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header);
2879 data_size = packet_size - header_size;
2880 D_ASSERT(data_size == 0);
2883 /* initialize verify_alg and csums_alg */
2884 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2886 if (drbd_recv(mdev->tconn, &p->head.payload, header_size) != header_size)
2889 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2893 if (data_size > SHARED_SECRET_MAX) {
2894 dev_err(DEV, "verify-alg too long, "
2895 "peer wants %u, accepting only %u byte\n",
2896 data_size, SHARED_SECRET_MAX);
2900 if (drbd_recv(mdev->tconn, p->verify_alg, data_size) != data_size)
2903 /* we expect NUL terminated string */
2904 /* but just in case someone tries to be evil */
2905 D_ASSERT(p->verify_alg[data_size-1] == 0);
2906 p->verify_alg[data_size-1] = 0;
2908 } else /* apv >= 89 */ {
2909 /* we still expect NUL terminated strings */
2910 /* but just in case someone tries to be evil */
2911 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2912 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2913 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2914 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2917 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2918 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2919 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2920 mdev->sync_conf.verify_alg, p->verify_alg);
2923 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2924 p->verify_alg, "verify-alg");
2925 if (IS_ERR(verify_tfm)) {
2931 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2932 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2933 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2934 mdev->sync_conf.csums_alg, p->csums_alg);
2937 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2938 p->csums_alg, "csums-alg");
2939 if (IS_ERR(csums_tfm)) {
2946 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2947 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2948 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2949 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2950 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2952 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2953 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2954 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2956 dev_err(DEV, "kmalloc of fifo_buffer failed");
2962 spin_lock(&mdev->peer_seq_lock);
2963 /* lock against drbd_nl_syncer_conf() */
2965 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2966 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2967 crypto_free_hash(mdev->verify_tfm);
2968 mdev->verify_tfm = verify_tfm;
2969 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2972 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2973 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2974 crypto_free_hash(mdev->csums_tfm);
2975 mdev->csums_tfm = csums_tfm;
2976 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2978 if (fifo_size != mdev->rs_plan_s.size) {
2979 kfree(mdev->rs_plan_s.values);
2980 mdev->rs_plan_s.values = rs_plan_s;
2981 mdev->rs_plan_s.size = fifo_size;
2982 mdev->rs_planed = 0;
2984 spin_unlock(&mdev->peer_seq_lock);
2989 /* just for completeness: actually not needed,
2990 * as this is not reached if csums_tfm was ok. */
2991 crypto_free_hash(csums_tfm);
2992 /* but free the verify_tfm again, if csums_tfm did not work out */
2993 crypto_free_hash(verify_tfm);
2994 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2998 /* warn if the arguments differ by more than 12.5% */
2999 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3000 const char *s, sector_t a, sector_t b)
3003 if (a == 0 || b == 0)
3005 d = (a > b) ? (a - b) : (b - a);
3006 if (d > (a>>3) || d > (b>>3))
3007 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3008 (unsigned long long)a, (unsigned long long)b);
3011 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packet cmd,
3012 unsigned int data_size)
3014 struct p_sizes *p = &mdev->tconn->data.rbuf.sizes;
3015 enum determine_dev_size dd = unchanged;
3016 sector_t p_size, p_usize, my_usize;
3017 int ldsc = 0; /* local disk size changed */
3018 enum dds_flags ddsf;
3020 p_size = be64_to_cpu(p->d_size);
3021 p_usize = be64_to_cpu(p->u_size);
3023 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
3024 dev_err(DEV, "some backing storage is needed\n");
3025 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3029 /* just store the peer's disk size for now.
3030 * we still need to figure out whether we accept that. */
3031 mdev->p_size = p_size;
3033 if (get_ldev(mdev)) {
3034 warn_if_differ_considerably(mdev, "lower level device sizes",
3035 p_size, drbd_get_max_capacity(mdev->ldev));
3036 warn_if_differ_considerably(mdev, "user requested size",
3037 p_usize, mdev->ldev->dc.disk_size);
3039 /* if this is the first connect, or an otherwise expected
3040 * param exchange, choose the minimum */
3041 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3042 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3045 my_usize = mdev->ldev->dc.disk_size;
3047 if (mdev->ldev->dc.disk_size != p_usize) {
3048 mdev->ldev->dc.disk_size = p_usize;
3049 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3050 (unsigned long)mdev->ldev->dc.disk_size);
3053 /* Never shrink a device with usable data during connect.
3054 But allow online shrinking if we are connected. */
3055 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3056 drbd_get_capacity(mdev->this_bdev) &&
3057 mdev->state.disk >= D_OUTDATED &&
3058 mdev->state.conn < C_CONNECTED) {
3059 dev_err(DEV, "The peer's disk size is too small!\n");
3060 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3061 mdev->ldev->dc.disk_size = my_usize;
3068 ddsf = be16_to_cpu(p->dds_flags);
3069 if (get_ldev(mdev)) {
3070 dd = drbd_determine_dev_size(mdev, ddsf);
3072 if (dd == dev_size_error)
3076 /* I am diskless, need to accept the peer's size. */
3077 drbd_set_my_capacity(mdev, p_size);
3080 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3081 drbd_reconsider_max_bio_size(mdev);
3083 if (get_ldev(mdev)) {
3084 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3085 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3092 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3093 if (be64_to_cpu(p->c_size) !=
3094 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3095 /* we have different sizes, probably peer
3096 * needs to know my new size... */
3097 drbd_send_sizes(mdev, 0, ddsf);
3099 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3100 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3101 if (mdev->state.pdsk >= D_INCONSISTENT &&
3102 mdev->state.disk >= D_INCONSISTENT) {
3103 if (ddsf & DDSF_NO_RESYNC)
3104 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3106 resync_after_online_grow(mdev);
3108 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3115 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packet cmd,
3116 unsigned int data_size)
3118 struct p_uuids *p = &mdev->tconn->data.rbuf.uuids;
3120 int i, updated_uuids = 0;
3122 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3124 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3125 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3127 kfree(mdev->p_uuid);
3128 mdev->p_uuid = p_uuid;
3130 if (mdev->state.conn < C_CONNECTED &&
3131 mdev->state.disk < D_INCONSISTENT &&
3132 mdev->state.role == R_PRIMARY &&
3133 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3134 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3135 (unsigned long long)mdev->ed_uuid);
3136 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3140 if (get_ldev(mdev)) {
3141 int skip_initial_sync =
3142 mdev->state.conn == C_CONNECTED &&
3143 mdev->tconn->agreed_pro_version >= 90 &&
3144 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3145 (p_uuid[UI_FLAGS] & 8);
3146 if (skip_initial_sync) {
3147 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3148 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3149 "clear_n_write from receive_uuids",
3150 BM_LOCKED_TEST_ALLOWED);
3151 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3152 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3153 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3159 } else if (mdev->state.disk < D_INCONSISTENT &&
3160 mdev->state.role == R_PRIMARY) {
3161 /* I am a diskless primary, the peer just created a new current UUID
3163 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3166 /* Before we test for the disk state, we should wait until an eventually
3167 ongoing cluster wide state change is finished. That is important if
3168 we are primary and are detaching from our disk. We need to see the
3169 new disk state... */
3170 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3171 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3172 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3175 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3181 * convert_state() - Converts the peer's view of the cluster state to our point of view
3182 * @ps: The state as seen by the peer.
3184 static union drbd_state convert_state(union drbd_state ps)
3186 union drbd_state ms;
3188 static enum drbd_conns c_tab[] = {
3189 [C_CONNECTED] = C_CONNECTED,
3191 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3192 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3193 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3194 [C_VERIFY_S] = C_VERIFY_T,
3200 ms.conn = c_tab[ps.conn];
3205 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3210 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3211 unsigned int data_size)
3213 struct p_req_state *p = &mdev->tconn->data.rbuf.req_state;
3214 union drbd_state mask, val;
3215 enum drbd_state_rv rv;
3217 mask.i = be32_to_cpu(p->mask);
3218 val.i = be32_to_cpu(p->val);
3220 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3221 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3222 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3226 mask = convert_state(mask);
3227 val = convert_state(val);
3229 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3231 drbd_send_sr_reply(mdev, rv);
3237 static int receive_state(struct drbd_conf *mdev, enum drbd_packet cmd,
3238 unsigned int data_size)
3240 struct p_state *p = &mdev->tconn->data.rbuf.state;
3241 union drbd_state os, ns, peer_state;
3242 enum drbd_disk_state real_peer_disk;
3243 enum chg_state_flags cs_flags;
3246 peer_state.i = be32_to_cpu(p->state);
3248 real_peer_disk = peer_state.disk;
3249 if (peer_state.disk == D_NEGOTIATING) {
3250 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3251 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3254 spin_lock_irq(&mdev->tconn->req_lock);
3256 os = ns = mdev->state;
3257 spin_unlock_irq(&mdev->tconn->req_lock);
3259 /* peer says his disk is uptodate, while we think it is inconsistent,
3260 * and this happens while we think we have a sync going on. */
3261 if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3262 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3263 /* If we are (becoming) SyncSource, but peer is still in sync
3264 * preparation, ignore its uptodate-ness to avoid flapping, it
3265 * will change to inconsistent once the peer reaches active
3267 * It may have changed syncer-paused flags, however, so we
3268 * cannot ignore this completely. */
3269 if (peer_state.conn > C_CONNECTED &&
3270 peer_state.conn < C_SYNC_SOURCE)
3271 real_peer_disk = D_INCONSISTENT;
3273 /* if peer_state changes to connected at the same time,
3274 * it explicitly notifies us that it finished resync.
3275 * Maybe we should finish it up, too? */
3276 else if (os.conn >= C_SYNC_SOURCE &&
3277 peer_state.conn == C_CONNECTED) {
3278 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3279 drbd_resync_finished(mdev);
3284 /* peer says his disk is inconsistent, while we think it is uptodate,
3285 * and this happens while the peer still thinks we have a sync going on,
3286 * but we think we are already done with the sync.
3287 * We ignore this to avoid flapping pdsk.
3288 * This should not happen, if the peer is a recent version of drbd. */
3289 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3290 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3291 real_peer_disk = D_UP_TO_DATE;
3293 if (ns.conn == C_WF_REPORT_PARAMS)
3294 ns.conn = C_CONNECTED;
3296 if (peer_state.conn == C_AHEAD)
3299 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3300 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3301 int cr; /* consider resync */
3303 /* if we established a new connection */
3304 cr = (os.conn < C_CONNECTED);
3305 /* if we had an established connection
3306 * and one of the nodes newly attaches a disk */
3307 cr |= (os.conn == C_CONNECTED &&
3308 (peer_state.disk == D_NEGOTIATING ||
3309 os.disk == D_NEGOTIATING));
3310 /* if we have both been inconsistent, and the peer has been
3311 * forced to be UpToDate with --overwrite-data */
3312 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3313 /* if we had been plain connected, and the admin requested to
3314 * start a sync by "invalidate" or "invalidate-remote" */
3315 cr |= (os.conn == C_CONNECTED &&
3316 (peer_state.conn >= C_STARTING_SYNC_S &&
3317 peer_state.conn <= C_WF_BITMAP_T));
3320 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3323 if (ns.conn == C_MASK) {
3324 ns.conn = C_CONNECTED;
3325 if (mdev->state.disk == D_NEGOTIATING) {
3326 drbd_force_state(mdev, NS(disk, D_FAILED));
3327 } else if (peer_state.disk == D_NEGOTIATING) {
3328 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3329 peer_state.disk = D_DISKLESS;
3330 real_peer_disk = D_DISKLESS;
3332 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3334 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3335 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3341 spin_lock_irq(&mdev->tconn->req_lock);
3342 if (mdev->state.i != os.i)
3344 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3345 ns.peer = peer_state.role;
3346 ns.pdsk = real_peer_disk;
3347 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3348 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3349 ns.disk = mdev->new_state_tmp.disk;
3350 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3351 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3352 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3353 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3354 for temporal network outages! */
3355 spin_unlock_irq(&mdev->tconn->req_lock);
3356 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3358 drbd_uuid_new_current(mdev);
3359 clear_bit(NEW_CUR_UUID, &mdev->flags);
3360 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3363 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3365 spin_unlock_irq(&mdev->tconn->req_lock);
3367 if (rv < SS_SUCCESS) {
3368 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3372 if (os.conn > C_WF_REPORT_PARAMS) {
3373 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3374 peer_state.disk != D_NEGOTIATING ) {
3375 /* we want resync, peer has not yet decided to sync... */
3376 /* Nowadays only used when forcing a node into primary role and
3377 setting its disk to UpToDate with that */
3378 drbd_send_uuids(mdev);
3379 drbd_send_state(mdev);
3383 mdev->tconn->net_conf->want_lose = 0;
3385 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3390 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packet cmd,
3391 unsigned int data_size)
3393 struct p_rs_uuid *p = &mdev->tconn->data.rbuf.rs_uuid;
3395 wait_event(mdev->misc_wait,
3396 mdev->state.conn == C_WF_SYNC_UUID ||
3397 mdev->state.conn == C_BEHIND ||
3398 mdev->state.conn < C_CONNECTED ||
3399 mdev->state.disk < D_NEGOTIATING);
3401 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3403 /* Here the _drbd_uuid_ functions are right, current should
3404 _not_ be rotated into the history */
3405 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3406 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3407 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3409 drbd_print_uuids(mdev, "updated sync uuid");
3410 drbd_start_resync(mdev, C_SYNC_TARGET);
3414 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3420 * receive_bitmap_plain
3422 * Return 0 when done, 1 when another iteration is needed, and a negative error
3423 * code upon failure.
3426 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3427 unsigned long *buffer, struct bm_xfer_ctx *c)
3429 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3430 unsigned want = num_words * sizeof(long);
3433 if (want != data_size) {
3434 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3439 err = drbd_recv(mdev->tconn, buffer, want);
3446 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3448 c->word_offset += num_words;
3449 c->bit_offset = c->word_offset * BITS_PER_LONG;
3450 if (c->bit_offset > c->bm_bits)
3451 c->bit_offset = c->bm_bits;
3459 * Return 0 when done, 1 when another iteration is needed, and a negative error
3460 * code upon failure.
3463 recv_bm_rle_bits(struct drbd_conf *mdev,
3464 struct p_compressed_bm *p,
3465 struct bm_xfer_ctx *c,
3468 struct bitstream bs;
3472 unsigned long s = c->bit_offset;
3474 int toggle = DCBP_get_start(p);
3478 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3480 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3484 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3485 bits = vli_decode_bits(&rl, look_ahead);
3491 if (e >= c->bm_bits) {
3492 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3495 _drbd_bm_set_bits(mdev, s, e);
3499 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3500 have, bits, look_ahead,
3501 (unsigned int)(bs.cur.b - p->code),
3502 (unsigned int)bs.buf_len);
3505 look_ahead >>= bits;
3508 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3511 look_ahead |= tmp << have;
3516 bm_xfer_ctx_bit_to_word_offset(c);
3518 return (s != c->bm_bits);
3524 * Return 0 when done, 1 when another iteration is needed, and a negative error
3525 * code upon failure.
3528 decode_bitmap_c(struct drbd_conf *mdev,
3529 struct p_compressed_bm *p,
3530 struct bm_xfer_ctx *c,
3533 if (DCBP_get_code(p) == RLE_VLI_Bits)
3534 return recv_bm_rle_bits(mdev, p, c, len);
3536 /* other variants had been implemented for evaluation,
3537 * but have been dropped as this one turned out to be "best"
3538 * during all our tests. */
3540 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3541 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3545 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3546 const char *direction, struct bm_xfer_ctx *c)
3548 /* what would it take to transfer it "plaintext" */
3549 unsigned plain = sizeof(struct p_header) *
3550 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3551 + c->bm_words * sizeof(long);
3552 unsigned total = c->bytes[0] + c->bytes[1];
3555 /* total can not be zero. but just in case: */
3559 /* don't report if not compressed */
3563 /* total < plain. check for overflow, still */
3564 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3565 : (1000 * total / plain);
3571 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3572 "total %u; compression: %u.%u%%\n",
3574 c->bytes[1], c->packets[1],
3575 c->bytes[0], c->packets[0],
3576 total, r/10, r % 10);
3579 /* Since we are processing the bitfield from lower addresses to higher,
3580 it does not matter if the process it in 32 bit chunks or 64 bit
3581 chunks as long as it is little endian. (Understand it as byte stream,
3582 beginning with the lowest byte...) If we would use big endian
3583 we would need to process it from the highest address to the lowest,
3584 in order to be agnostic to the 32 vs 64 bits issue.
3586 returns 0 on failure, 1 if we successfully received it. */
3587 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packet cmd,
3588 unsigned int data_size)
3590 struct bm_xfer_ctx c;
3594 struct p_header *h = &mdev->tconn->data.rbuf.header;
3595 struct packet_info pi;
3597 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3598 /* you are supposed to send additional out-of-sync information
3599 * if you actually set bits during this phase */
3601 /* maybe we should use some per thread scratch page,
3602 * and allocate that during initial device creation? */
3603 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3605 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3609 c = (struct bm_xfer_ctx) {
3610 .bm_bits = drbd_bm_bits(mdev),
3611 .bm_words = drbd_bm_words(mdev),
3615 if (cmd == P_BITMAP) {
3616 err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3617 } else if (cmd == P_COMPRESSED_BITMAP) {
3618 /* MAYBE: sanity check that we speak proto >= 90,
3619 * and the feature is enabled! */
3620 struct p_compressed_bm *p;
3622 if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3623 dev_err(DEV, "ReportCBitmap packet too large\n");
3626 /* use the page buff */
3628 memcpy(p, h, sizeof(*h));
3629 if (drbd_recv(mdev->tconn, p->head.payload, data_size) != data_size)
3631 if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3632 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3635 err = decode_bitmap_c(mdev, p, &c, data_size);
3637 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3641 c.packets[cmd == P_BITMAP]++;
3642 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header) + data_size;
3649 if (!drbd_recv_header(mdev->tconn, &pi))
3652 data_size = pi.size;
3655 INFO_bm_xfer_stats(mdev, "receive", &c);
3657 if (mdev->state.conn == C_WF_BITMAP_T) {
3658 enum drbd_state_rv rv;
3660 ok = !drbd_send_bitmap(mdev);
3663 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3664 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3665 D_ASSERT(rv == SS_SUCCESS);
3666 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3667 /* admin may have requested C_DISCONNECTING,
3668 * other threads may have noticed network errors */
3669 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3670 drbd_conn_str(mdev->state.conn));
3675 drbd_bm_unlock(mdev);
3676 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3677 drbd_start_resync(mdev, C_SYNC_SOURCE);
3678 free_page((unsigned long) buffer);
3682 static int receive_skip(struct drbd_conf *mdev, enum drbd_packet cmd,
3683 unsigned int data_size)
3685 /* TODO zero copy sink :) */
3686 static char sink[128];
3689 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3694 want = min_t(int, size, sizeof(sink));
3695 r = drbd_recv(mdev->tconn, sink, want);
3703 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packet cmd,
3704 unsigned int data_size)
3706 /* Make sure we've acked all the TCP data associated
3707 * with the data requests being unplugged */
3708 drbd_tcp_quickack(mdev->tconn->data.socket);
3713 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packet cmd,
3714 unsigned int data_size)
3716 struct p_block_desc *p = &mdev->tconn->data.rbuf.block_desc;
3718 switch (mdev->state.conn) {
3719 case C_WF_SYNC_UUID:
3724 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3725 drbd_conn_str(mdev->state.conn));
3728 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3733 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packet cmd,
3734 unsigned int to_receive);
3739 drbd_cmd_handler_f function;
3742 static struct data_cmd drbd_cmd_handler[] = {
3743 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
3744 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
3745 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3746 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3747 [P_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3748 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header), receive_bitmap } ,
3749 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header), receive_UnplugRemote },
3750 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3751 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3752 [P_SYNC_PARAM] = { 1, sizeof(struct p_header), receive_SyncParam },
3753 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header), receive_SyncParam },
3754 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
3755 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
3756 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
3757 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
3758 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
3759 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3760 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3761 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3762 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3763 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
3764 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3765 /* anything missing from this table is in
3766 * the asender_tbl, see get_asender_cmd */
3767 [P_MAX_CMD] = { 0, 0, NULL },
3770 /* All handler functions that expect a sub-header get that sub-heder in
3771 mdev->tconn->data.rbuf.header.head.payload.
3773 Usually in mdev->tconn->data.rbuf.header.head the callback can find the usual
3774 p_header, but they may not rely on that. Since there is also p_header95 !
3777 static void drbdd(struct drbd_tconn *tconn)
3779 struct p_header *header = &tconn->data.rbuf.header;
3780 struct packet_info pi;
3781 size_t shs; /* sub header size */
3784 while (get_t_state(&tconn->receiver) == RUNNING) {
3785 drbd_thread_current_set_cpu(&tconn->receiver);
3786 if (!drbd_recv_header(tconn, &pi))
3789 if (unlikely(pi.cmd >= P_MAX_CMD || !drbd_cmd_handler[pi.cmd].function)) {
3790 conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
3794 shs = drbd_cmd_handler[pi.cmd].pkt_size - sizeof(struct p_header);
3795 if (pi.size - shs > 0 && !drbd_cmd_handler[pi.cmd].expect_payload) {
3796 conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
3801 rv = drbd_recv(tconn, &header->payload, shs);
3802 if (unlikely(rv != shs)) {
3803 if (!signal_pending(current))
3804 conn_warn(tconn, "short read while reading sub header: rv=%d\n", rv);
3809 rv = drbd_cmd_handler[pi.cmd].function(vnr_to_mdev(tconn, pi.vnr), pi.cmd, pi.size - shs);
3811 if (unlikely(!rv)) {
3812 conn_err(tconn, "error receiving %s, l: %d!\n",
3813 cmdname(pi.cmd), pi.size);
3820 drbd_force_state(tconn->volume0, NS(conn, C_PROTOCOL_ERROR));
3824 void drbd_flush_workqueue(struct drbd_conf *mdev)
3826 struct drbd_wq_barrier barr;
3828 barr.w.cb = w_prev_work_done;
3830 init_completion(&barr.done);
3831 drbd_queue_work(&mdev->tconn->data.work, &barr.w);
3832 wait_for_completion(&barr.done);
3835 static void drbd_disconnect(struct drbd_tconn *tconn)
3837 union drbd_state os, ns;
3838 int rv = SS_UNKNOWN_ERROR;
3840 if (tconn->volume0->state.conn == C_STANDALONE)
3843 /* asender does not clean up anything. it must not interfere, either */
3844 drbd_thread_stop(&tconn->asender);
3845 drbd_free_sock(tconn);
3847 idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
3849 conn_info(tconn, "Connection closed\n");
3851 spin_lock_irq(&tconn->req_lock);
3852 os = tconn->volume0->state;
3853 if (os.conn >= C_UNCONNECTED) {
3854 /* Do not restart in case we are C_DISCONNECTING */
3856 ns.conn = C_UNCONNECTED;
3857 rv = _drbd_set_state(tconn->volume0, ns, CS_VERBOSE, NULL);
3859 spin_unlock_irq(&tconn->req_lock);
3861 if (os.conn == C_DISCONNECTING) {
3862 wait_event(tconn->net_cnt_wait, atomic_read(&tconn->net_cnt) == 0);
3864 crypto_free_hash(tconn->cram_hmac_tfm);
3865 tconn->cram_hmac_tfm = NULL;
3867 kfree(tconn->net_conf);
3868 tconn->net_conf = NULL;
3869 drbd_request_state(tconn->volume0, NS(conn, C_STANDALONE));
3873 static int drbd_disconnected(int vnr, void *p, void *data)
3875 struct drbd_conf *mdev = (struct drbd_conf *)p;
3876 enum drbd_fencing_p fp;
3879 /* wait for current activity to cease. */
3880 spin_lock_irq(&mdev->tconn->req_lock);
3881 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3882 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3883 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3884 spin_unlock_irq(&mdev->tconn->req_lock);
3886 /* We do not have data structures that would allow us to
3887 * get the rs_pending_cnt down to 0 again.
3888 * * On C_SYNC_TARGET we do not have any data structures describing
3889 * the pending RSDataRequest's we have sent.
3890 * * On C_SYNC_SOURCE there is no data structure that tracks
3891 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3892 * And no, it is not the sum of the reference counts in the
3893 * resync_LRU. The resync_LRU tracks the whole operation including
3894 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3896 drbd_rs_cancel_all(mdev);
3898 mdev->rs_failed = 0;
3899 atomic_set(&mdev->rs_pending_cnt, 0);
3900 wake_up(&mdev->misc_wait);
3902 del_timer(&mdev->request_timer);
3904 /* make sure syncer is stopped and w_resume_next_sg queued */
3905 del_timer_sync(&mdev->resync_timer);
3906 resync_timer_fn((unsigned long)mdev);
3908 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3909 * w_make_resync_request etc. which may still be on the worker queue
3910 * to be "canceled" */
3911 drbd_flush_workqueue(mdev);
3913 /* This also does reclaim_net_ee(). If we do this too early, we might
3914 * miss some resync ee and pages.*/
3915 drbd_process_done_ee(mdev);
3917 kfree(mdev->p_uuid);
3918 mdev->p_uuid = NULL;
3920 if (!is_susp(mdev->state))
3926 if (get_ldev(mdev)) {
3927 fp = mdev->ldev->dc.fencing;
3931 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3932 drbd_try_outdate_peer_async(mdev);
3934 /* serialize with bitmap writeout triggered by the state change,
3936 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3938 /* tcp_close and release of sendpage pages can be deferred. I don't
3939 * want to use SO_LINGER, because apparently it can be deferred for
3940 * more than 20 seconds (longest time I checked).
3942 * Actually we don't care for exactly when the network stack does its
3943 * put_page(), but release our reference on these pages right here.
3945 i = drbd_release_ee(mdev, &mdev->net_ee);
3947 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3948 i = atomic_read(&mdev->pp_in_use_by_net);
3950 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3951 i = atomic_read(&mdev->pp_in_use);
3953 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3955 D_ASSERT(list_empty(&mdev->read_ee));
3956 D_ASSERT(list_empty(&mdev->active_ee));
3957 D_ASSERT(list_empty(&mdev->sync_ee));
3958 D_ASSERT(list_empty(&mdev->done_ee));
3960 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3961 atomic_set(&mdev->current_epoch->epoch_size, 0);
3962 D_ASSERT(list_empty(&mdev->current_epoch->list));
3968 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3969 * we can agree on is stored in agreed_pro_version.
3971 * feature flags and the reserved array should be enough room for future
3972 * enhancements of the handshake protocol, and possible plugins...
3974 * for now, they are expected to be zero, but ignored.
3976 static int drbd_send_handshake(struct drbd_tconn *tconn)
3978 /* ASSERT current == mdev->tconn->receiver ... */
3979 struct p_handshake *p = &tconn->data.sbuf.handshake;
3982 if (mutex_lock_interruptible(&tconn->data.mutex)) {
3983 conn_err(tconn, "interrupted during initial handshake\n");
3984 return 0; /* interrupted. not ok. */
3987 if (tconn->data.socket == NULL) {
3988 mutex_unlock(&tconn->data.mutex);
3992 memset(p, 0, sizeof(*p));
3993 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3994 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3995 ok = _conn_send_cmd(tconn, 0, tconn->data.socket, P_HAND_SHAKE,
3996 &p->head, sizeof(*p), 0);
3997 mutex_unlock(&tconn->data.mutex);
4003 * 1 yes, we have a valid connection
4004 * 0 oops, did not work out, please try again
4005 * -1 peer talks different language,
4006 * no point in trying again, please go standalone.
4008 static int drbd_do_handshake(struct drbd_tconn *tconn)
4010 /* ASSERT current == tconn->receiver ... */
4011 struct p_handshake *p = &tconn->data.rbuf.handshake;
4012 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4013 struct packet_info pi;
4016 rv = drbd_send_handshake(tconn);
4020 rv = drbd_recv_header(tconn, &pi);
4024 if (pi.cmd != P_HAND_SHAKE) {
4025 conn_err(tconn, "expected HandShake packet, received: %s (0x%04x)\n",
4026 cmdname(pi.cmd), pi.cmd);
4030 if (pi.size != expect) {
4031 conn_err(tconn, "expected HandShake length: %u, received: %u\n",
4036 rv = drbd_recv(tconn, &p->head.payload, expect);
4039 if (!signal_pending(current))
4040 conn_warn(tconn, "short read receiving handshake packet: l=%u\n", rv);
4044 p->protocol_min = be32_to_cpu(p->protocol_min);
4045 p->protocol_max = be32_to_cpu(p->protocol_max);
4046 if (p->protocol_max == 0)
4047 p->protocol_max = p->protocol_min;
4049 if (PRO_VERSION_MAX < p->protocol_min ||
4050 PRO_VERSION_MIN > p->protocol_max)
4053 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4055 conn_info(tconn, "Handshake successful: "
4056 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4061 conn_err(tconn, "incompatible DRBD dialects: "
4062 "I support %d-%d, peer supports %d-%d\n",
4063 PRO_VERSION_MIN, PRO_VERSION_MAX,
4064 p->protocol_min, p->protocol_max);
4068 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4069 static int drbd_do_auth(struct drbd_tconn *tconn)
4071 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4072 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4076 #define CHALLENGE_LEN 64
4080 0 - failed, try again (network error),
4081 -1 - auth failed, don't try again.
4084 static int drbd_do_auth(struct drbd_tconn *tconn)
4086 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4087 struct scatterlist sg;
4088 char *response = NULL;
4089 char *right_response = NULL;
4090 char *peers_ch = NULL;
4091 unsigned int key_len = strlen(tconn->net_conf->shared_secret);
4092 unsigned int resp_size;
4093 struct hash_desc desc;
4094 struct packet_info pi;
4097 desc.tfm = tconn->cram_hmac_tfm;
4100 rv = crypto_hash_setkey(tconn->cram_hmac_tfm,
4101 (u8 *)tconn->net_conf->shared_secret, key_len);
4103 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4108 get_random_bytes(my_challenge, CHALLENGE_LEN);
4110 rv = conn_send_cmd2(tconn, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4114 rv = drbd_recv_header(tconn, &pi);
4118 if (pi.cmd != P_AUTH_CHALLENGE) {
4119 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4120 cmdname(pi.cmd), pi.cmd);
4125 if (pi.size > CHALLENGE_LEN * 2) {
4126 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4131 peers_ch = kmalloc(pi.size, GFP_NOIO);
4132 if (peers_ch == NULL) {
4133 conn_err(tconn, "kmalloc of peers_ch failed\n");
4138 rv = drbd_recv(tconn, peers_ch, pi.size);
4140 if (rv != pi.size) {
4141 if (!signal_pending(current))
4142 conn_warn(tconn, "short read AuthChallenge: l=%u\n", rv);
4147 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4148 response = kmalloc(resp_size, GFP_NOIO);
4149 if (response == NULL) {
4150 conn_err(tconn, "kmalloc of response failed\n");
4155 sg_init_table(&sg, 1);
4156 sg_set_buf(&sg, peers_ch, pi.size);
4158 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4160 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4165 rv = conn_send_cmd2(tconn, P_AUTH_RESPONSE, response, resp_size);
4169 rv = drbd_recv_header(tconn, &pi);
4173 if (pi.cmd != P_AUTH_RESPONSE) {
4174 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4175 cmdname(pi.cmd), pi.cmd);
4180 if (pi.size != resp_size) {
4181 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4186 rv = drbd_recv(tconn, response , resp_size);
4188 if (rv != resp_size) {
4189 if (!signal_pending(current))
4190 conn_warn(tconn, "short read receiving AuthResponse: l=%u\n", rv);
4195 right_response = kmalloc(resp_size, GFP_NOIO);
4196 if (right_response == NULL) {
4197 conn_err(tconn, "kmalloc of right_response failed\n");
4202 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4204 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4206 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4211 rv = !memcmp(response, right_response, resp_size);
4214 conn_info(tconn, "Peer authenticated using %d bytes of '%s' HMAC\n",
4215 resp_size, tconn->net_conf->cram_hmac_alg);
4222 kfree(right_response);
4228 int drbdd_init(struct drbd_thread *thi)
4230 struct drbd_tconn *tconn = thi->tconn;
4233 conn_info(tconn, "receiver (re)started\n");
4236 h = drbd_connect(tconn);
4238 drbd_disconnect(tconn);
4239 schedule_timeout_interruptible(HZ);
4242 conn_warn(tconn, "Discarding network configuration.\n");
4243 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
4248 if (get_net_conf(tconn)) {
4250 put_net_conf(tconn);
4254 drbd_disconnect(tconn);
4256 conn_info(tconn, "receiver terminated\n");
4260 /* ********* acknowledge sender ******** */
4262 static int got_RqSReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4264 struct p_req_state_reply *p = &mdev->tconn->meta.rbuf.req_state_reply;
4266 int retcode = be32_to_cpu(p->retcode);
4268 if (retcode >= SS_SUCCESS) {
4269 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4271 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4272 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4273 drbd_set_st_err_str(retcode), retcode);
4275 wake_up(&mdev->state_wait);
4280 static int got_Ping(struct drbd_conf *mdev, enum drbd_packet cmd)
4282 return drbd_send_ping_ack(mdev);
4286 static int got_PingAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4288 /* restore idle timeout */
4289 mdev->tconn->meta.socket->sk->sk_rcvtimeo = mdev->tconn->net_conf->ping_int*HZ;
4290 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4291 wake_up(&mdev->misc_wait);
4296 static int got_IsInSync(struct drbd_conf *mdev, enum drbd_packet cmd)
4298 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4299 sector_t sector = be64_to_cpu(p->sector);
4300 int blksize = be32_to_cpu(p->blksize);
4302 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4304 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4306 if (get_ldev(mdev)) {
4307 drbd_rs_complete_io(mdev, sector);
4308 drbd_set_in_sync(mdev, sector, blksize);
4309 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4310 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4313 dec_rs_pending(mdev);
4314 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4320 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4321 struct rb_root *root, const char *func,
4322 enum drbd_req_event what, bool missing_ok)
4324 struct drbd_request *req;
4325 struct bio_and_error m;
4327 spin_lock_irq(&mdev->tconn->req_lock);
4328 req = find_request(mdev, root, id, sector, missing_ok, func);
4329 if (unlikely(!req)) {
4330 spin_unlock_irq(&mdev->tconn->req_lock);
4333 __req_mod(req, what, &m);
4334 spin_unlock_irq(&mdev->tconn->req_lock);
4337 complete_master_bio(mdev, &m);
4341 static int got_BlockAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4343 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4344 sector_t sector = be64_to_cpu(p->sector);
4345 int blksize = be32_to_cpu(p->blksize);
4346 enum drbd_req_event what;
4348 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4350 if (p->block_id == ID_SYNCER) {
4351 drbd_set_in_sync(mdev, sector, blksize);
4352 dec_rs_pending(mdev);
4356 case P_RS_WRITE_ACK:
4357 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4358 what = WRITE_ACKED_BY_PEER_AND_SIS;
4361 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4362 what = WRITE_ACKED_BY_PEER;
4365 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B);
4366 what = RECV_ACKED_BY_PEER;
4369 D_ASSERT(mdev->tconn->net_conf->wire_protocol == DRBD_PROT_C);
4370 what = CONFLICT_DISCARDED_BY_PEER;
4377 return validate_req_change_req_state(mdev, p->block_id, sector,
4378 &mdev->write_requests, __func__,
4382 static int got_NegAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4384 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4385 sector_t sector = be64_to_cpu(p->sector);
4386 int size = be32_to_cpu(p->blksize);
4387 bool missing_ok = mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A ||
4388 mdev->tconn->net_conf->wire_protocol == DRBD_PROT_B;
4391 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4393 if (p->block_id == ID_SYNCER) {
4394 dec_rs_pending(mdev);
4395 drbd_rs_failed_io(mdev, sector, size);
4399 found = validate_req_change_req_state(mdev, p->block_id, sector,
4400 &mdev->write_requests, __func__,
4401 NEG_ACKED, missing_ok);
4403 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4404 The master bio might already be completed, therefore the
4405 request is no longer in the collision hash. */
4406 /* In Protocol B we might already have got a P_RECV_ACK
4407 but then get a P_NEG_ACK afterwards. */
4410 drbd_set_out_of_sync(mdev, sector, size);
4415 static int got_NegDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4417 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4418 sector_t sector = be64_to_cpu(p->sector);
4420 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4421 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4422 (unsigned long long)sector, be32_to_cpu(p->blksize));
4424 return validate_req_change_req_state(mdev, p->block_id, sector,
4425 &mdev->read_requests, __func__,
4429 static int got_NegRSDReply(struct drbd_conf *mdev, enum drbd_packet cmd)
4433 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4435 sector = be64_to_cpu(p->sector);
4436 size = be32_to_cpu(p->blksize);
4438 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4440 dec_rs_pending(mdev);
4442 if (get_ldev_if_state(mdev, D_FAILED)) {
4443 drbd_rs_complete_io(mdev, sector);
4445 case P_NEG_RS_DREPLY:
4446 drbd_rs_failed_io(mdev, sector, size);
4460 static int got_BarrierAck(struct drbd_conf *mdev, enum drbd_packet cmd)
4462 struct p_barrier_ack *p = &mdev->tconn->meta.rbuf.barrier_ack;
4464 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4466 if (mdev->state.conn == C_AHEAD &&
4467 atomic_read(&mdev->ap_in_flight) == 0 &&
4468 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4469 mdev->start_resync_timer.expires = jiffies + HZ;
4470 add_timer(&mdev->start_resync_timer);
4476 static int got_OVResult(struct drbd_conf *mdev, enum drbd_packet cmd)
4478 struct p_block_ack *p = &mdev->tconn->meta.rbuf.block_ack;
4479 struct drbd_work *w;
4483 sector = be64_to_cpu(p->sector);
4484 size = be32_to_cpu(p->blksize);
4486 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4488 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4489 drbd_ov_oos_found(mdev, sector, size);
4493 if (!get_ldev(mdev))
4496 drbd_rs_complete_io(mdev, sector);
4497 dec_rs_pending(mdev);
4501 /* let's advance progress step marks only for every other megabyte */
4502 if ((mdev->ov_left & 0x200) == 0x200)
4503 drbd_advance_rs_marks(mdev, mdev->ov_left);
4505 if (mdev->ov_left == 0) {
4506 w = kmalloc(sizeof(*w), GFP_NOIO);
4508 w->cb = w_ov_finished;
4510 drbd_queue_work_front(&mdev->tconn->data.work, w);
4512 dev_err(DEV, "kmalloc(w) failed.");
4514 drbd_resync_finished(mdev);
4521 static int got_skip(struct drbd_conf *mdev, enum drbd_packet cmd)
4526 struct asender_cmd {
4528 int (*process)(struct drbd_conf *mdev, enum drbd_packet cmd);
4531 static struct asender_cmd *get_asender_cmd(int cmd)
4533 static struct asender_cmd asender_tbl[] = {
4534 /* anything missing from this table is in
4535 * the drbd_cmd_handler (drbd_default_handler) table,
4536 * see the beginning of drbdd() */
4537 [P_PING] = { sizeof(struct p_header), got_Ping },
4538 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4539 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4540 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4541 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4542 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4543 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4544 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4545 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4546 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4547 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4548 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4549 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4550 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
4551 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply},
4552 [P_MAX_CMD] = { 0, NULL },
4554 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4556 return &asender_tbl[cmd];
4559 static int _drbd_process_done_ee(int vnr, void *p, void *data)
4561 struct drbd_conf *mdev = (struct drbd_conf *)p;
4562 return !drbd_process_done_ee(mdev);
4565 static int _check_ee_empty(int vnr, void *p, void *data)
4567 struct drbd_conf *mdev = (struct drbd_conf *)p;
4568 struct drbd_tconn *tconn = mdev->tconn;
4571 spin_lock_irq(&tconn->req_lock);
4572 not_empty = !list_empty(&mdev->done_ee);
4573 spin_unlock_irq(&tconn->req_lock);
4578 static int tconn_process_done_ee(struct drbd_tconn *tconn)
4583 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4584 flush_signals(current);
4585 err = idr_for_each(&tconn->volumes, _drbd_process_done_ee, NULL);
4588 set_bit(SIGNAL_ASENDER, &tconn->flags);
4589 not_empty = idr_for_each(&tconn->volumes, _check_ee_empty, NULL);
4590 } while (not_empty);
4595 int drbd_asender(struct drbd_thread *thi)
4597 struct drbd_tconn *tconn = thi->tconn;
4598 struct p_header *h = &tconn->meta.rbuf.header;
4599 struct asender_cmd *cmd = NULL;
4600 struct packet_info pi;
4604 int expect = sizeof(struct p_header);
4605 int ping_timeout_active = 0;
4607 current->policy = SCHED_RR; /* Make this a realtime task! */
4608 current->rt_priority = 2; /* more important than all other tasks */
4610 while (get_t_state(thi) == RUNNING) {
4611 drbd_thread_current_set_cpu(thi);
4612 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
4613 if (!drbd_send_ping(tconn->volume0)) {
4614 conn_err(tconn, "drbd_send_ping has failed\n");
4617 tconn->meta.socket->sk->sk_rcvtimeo =
4618 tconn->net_conf->ping_timeo*HZ/10;
4619 ping_timeout_active = 1;
4622 /* TODO: conditionally cork; it may hurt latency if we cork without
4624 if (!tconn->net_conf->no_cork)
4625 drbd_tcp_cork(tconn->meta.socket);
4626 if (tconn_process_done_ee(tconn))
4628 /* but unconditionally uncork unless disabled */
4629 if (!tconn->net_conf->no_cork)
4630 drbd_tcp_uncork(tconn->meta.socket);
4632 /* short circuit, recv_msg would return EINTR anyways. */
4633 if (signal_pending(current))
4636 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
4637 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4639 flush_signals(current);
4642 * -EINTR (on meta) we got a signal
4643 * -EAGAIN (on meta) rcvtimeo expired
4644 * -ECONNRESET other side closed the connection
4645 * -ERESTARTSYS (on data) we got a signal
4646 * rv < 0 other than above: unexpected error!
4647 * rv == expected: full header or command
4648 * rv < expected: "woken" by signal during receive
4649 * rv == 0 : "connection shut down by peer"
4651 if (likely(rv > 0)) {
4654 } else if (rv == 0) {
4655 conn_err(tconn, "meta connection shut down by peer.\n");
4657 } else if (rv == -EAGAIN) {
4658 /* If the data socket received something meanwhile,
4659 * that is good enough: peer is still alive. */
4660 if (time_after(tconn->last_received,
4661 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
4663 if (ping_timeout_active) {
4664 conn_err(tconn, "PingAck did not arrive in time.\n");
4667 set_bit(SEND_PING, &tconn->flags);
4669 } else if (rv == -EINTR) {
4672 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
4676 if (received == expect && cmd == NULL) {
4677 if (!decode_header(tconn, h, &pi))
4679 cmd = get_asender_cmd(pi.cmd);
4680 if (unlikely(cmd == NULL)) {
4681 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
4685 expect = cmd->pkt_size;
4686 if (pi.size != expect - sizeof(struct p_header)) {
4687 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
4692 if (received == expect) {
4693 tconn->last_received = jiffies;
4694 if (!cmd->process(vnr_to_mdev(tconn, pi.vnr), pi.cmd))
4697 /* the idle_timeout (ping-int)
4698 * has been restored in got_PingAck() */
4699 if (cmd == get_asender_cmd(P_PING_ACK))
4700 ping_timeout_active = 0;
4704 expect = sizeof(struct p_header);
4711 drbd_force_state(tconn->volume0, NS(conn, C_NETWORK_FAILURE));
4715 drbd_force_state(tconn->volume0, NS(conn, C_DISCONNECTING));
4717 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4719 conn_info(tconn, "asender terminated\n");