drbd: detach from frozen backing device
[linux-2.6-block.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56
57
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the resync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69         struct drbd_md_io *md_io;
70         struct drbd_conf *mdev;
71
72         md_io = (struct drbd_md_io *)bio->bi_private;
73         mdev = container_of(md_io, struct drbd_conf, md_io);
74
75         md_io->error = error;
76
77         md_io->done = 1;
78         wake_up(&mdev->misc_wait);
79         bio_put(bio);
80         drbd_md_put_buffer(mdev);
81         put_ldev(mdev);
82 }
83
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
88 {
89         unsigned long flags = 0;
90         struct drbd_conf *mdev = peer_req->w.mdev;
91
92         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
93         mdev->read_cnt += peer_req->i.size >> 9;
94         list_del(&peer_req->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
98                 __drbd_chk_io_error(mdev, false);
99         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
100
101         drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
102         put_ldev(mdev);
103 }
104
105 /* writes on behalf of the partner, or resync writes,
106  * "submitted" by the receiver, final stage.  */
107 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
108 {
109         unsigned long flags = 0;
110         struct drbd_conf *mdev = peer_req->w.mdev;
111         struct drbd_interval i;
112         int do_wake;
113         u64 block_id;
114         int do_al_complete_io;
115
116         /* after we moved peer_req to done_ee,
117          * we may no longer access it,
118          * it may be freed/reused already!
119          * (as soon as we release the req_lock) */
120         i = peer_req->i;
121         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
122         block_id = peer_req->block_id;
123
124         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
125         mdev->writ_cnt += peer_req->i.size >> 9;
126         list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
127         list_add_tail(&peer_req->w.list, &mdev->done_ee);
128
129         /*
130          * Do not remove from the write_requests tree here: we did not send the
131          * Ack yet and did not wake possibly waiting conflicting requests.
132          * Removed from the tree from "drbd_process_done_ee" within the
133          * appropriate w.cb (e_end_block/e_end_resync_block) or from
134          * _drbd_clear_done_ee.
135          */
136
137         do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
138
139         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
140                 __drbd_chk_io_error(mdev, false);
141         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
142
143         if (block_id == ID_SYNCER)
144                 drbd_rs_complete_io(mdev, i.sector);
145
146         if (do_wake)
147                 wake_up(&mdev->ee_wait);
148
149         if (do_al_complete_io)
150                 drbd_al_complete_io(mdev, &i);
151
152         wake_asender(mdev->tconn);
153         put_ldev(mdev);
154 }
155
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_peer_request_endio(struct bio *bio, int error)
160 {
161         struct drbd_peer_request *peer_req = bio->bi_private;
162         struct drbd_conf *mdev = peer_req->w.mdev;
163         int uptodate = bio_flagged(bio, BIO_UPTODATE);
164         int is_write = bio_data_dir(bio) == WRITE;
165
166         if (error && __ratelimit(&drbd_ratelimit_state))
167                 dev_warn(DEV, "%s: error=%d s=%llus\n",
168                                 is_write ? "write" : "read", error,
169                                 (unsigned long long)peer_req->i.sector);
170         if (!error && !uptodate) {
171                 if (__ratelimit(&drbd_ratelimit_state))
172                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
173                                         is_write ? "write" : "read",
174                                         (unsigned long long)peer_req->i.sector);
175                 /* strange behavior of some lower level drivers...
176                  * fail the request by clearing the uptodate flag,
177                  * but do not return any error?! */
178                 error = -EIO;
179         }
180
181         if (error)
182                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
183
184         bio_put(bio); /* no need for the bio anymore */
185         if (atomic_dec_and_test(&peer_req->pending_bios)) {
186                 if (is_write)
187                         drbd_endio_write_sec_final(peer_req);
188                 else
189                         drbd_endio_read_sec_final(peer_req);
190         }
191 }
192
193 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
194  */
195 void drbd_request_endio(struct bio *bio, int error)
196 {
197         unsigned long flags;
198         struct drbd_request *req = bio->bi_private;
199         struct drbd_conf *mdev = req->w.mdev;
200         struct bio_and_error m;
201         enum drbd_req_event what;
202         int uptodate = bio_flagged(bio, BIO_UPTODATE);
203
204         if (!error && !uptodate) {
205                 dev_warn(DEV, "p %s: setting error to -EIO\n",
206                          bio_data_dir(bio) == WRITE ? "write" : "read");
207                 /* strange behavior of some lower level drivers...
208                  * fail the request by clearing the uptodate flag,
209                  * but do not return any error?! */
210                 error = -EIO;
211         }
212
213         /* to avoid recursion in __req_mod */
214         if (unlikely(error)) {
215                 what = (bio_data_dir(bio) == WRITE)
216                         ? WRITE_COMPLETED_WITH_ERROR
217                         : (bio_rw(bio) == READ)
218                           ? READ_COMPLETED_WITH_ERROR
219                           : READ_AHEAD_COMPLETED_WITH_ERROR;
220         } else
221                 what = COMPLETED_OK;
222
223         bio_put(req->private_bio);
224         req->private_bio = ERR_PTR(error);
225
226         /* not req_mod(), we need irqsave here! */
227         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
228         __req_mod(req, what, &m);
229         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
230
231         if (m.bio)
232                 complete_master_bio(mdev, &m);
233 }
234
235 int w_read_retry_remote(struct drbd_work *w, int cancel)
236 {
237         struct drbd_request *req = container_of(w, struct drbd_request, w);
238         struct drbd_conf *mdev = w->mdev;
239
240         /* We should not detach for read io-error,
241          * but try to WRITE the P_DATA_REPLY to the failed location,
242          * to give the disk the chance to relocate that block */
243
244         spin_lock_irq(&mdev->tconn->req_lock);
245         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
246                 _req_mod(req, READ_RETRY_REMOTE_CANCELED);
247                 spin_unlock_irq(&mdev->tconn->req_lock);
248                 return 0;
249         }
250         spin_unlock_irq(&mdev->tconn->req_lock);
251
252         return w_send_read_req(w, 0);
253 }
254
255 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
256                   struct drbd_peer_request *peer_req, void *digest)
257 {
258         struct hash_desc desc;
259         struct scatterlist sg;
260         struct page *page = peer_req->pages;
261         struct page *tmp;
262         unsigned len;
263
264         desc.tfm = tfm;
265         desc.flags = 0;
266
267         sg_init_table(&sg, 1);
268         crypto_hash_init(&desc);
269
270         while ((tmp = page_chain_next(page))) {
271                 /* all but the last page will be fully used */
272                 sg_set_page(&sg, page, PAGE_SIZE, 0);
273                 crypto_hash_update(&desc, &sg, sg.length);
274                 page = tmp;
275         }
276         /* and now the last, possibly only partially used page */
277         len = peer_req->i.size & (PAGE_SIZE - 1);
278         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
279         crypto_hash_update(&desc, &sg, sg.length);
280         crypto_hash_final(&desc, digest);
281 }
282
283 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
284 {
285         struct hash_desc desc;
286         struct scatterlist sg;
287         struct bio_vec *bvec;
288         int i;
289
290         desc.tfm = tfm;
291         desc.flags = 0;
292
293         sg_init_table(&sg, 1);
294         crypto_hash_init(&desc);
295
296         __bio_for_each_segment(bvec, bio, i, 0) {
297                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
298                 crypto_hash_update(&desc, &sg, sg.length);
299         }
300         crypto_hash_final(&desc, digest);
301 }
302
303 /* MAYBE merge common code with w_e_end_ov_req */
304 static int w_e_send_csum(struct drbd_work *w, int cancel)
305 {
306         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
307         struct drbd_conf *mdev = w->mdev;
308         int digest_size;
309         void *digest;
310         int err = 0;
311
312         if (unlikely(cancel))
313                 goto out;
314
315         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
316                 goto out;
317
318         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
319         digest = kmalloc(digest_size, GFP_NOIO);
320         if (digest) {
321                 sector_t sector = peer_req->i.sector;
322                 unsigned int size = peer_req->i.size;
323                 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
324                 /* Free peer_req and pages before send.
325                  * In case we block on congestion, we could otherwise run into
326                  * some distributed deadlock, if the other side blocks on
327                  * congestion as well, because our receiver blocks in
328                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
329                 drbd_free_peer_req(mdev, peer_req);
330                 peer_req = NULL;
331                 inc_rs_pending(mdev);
332                 err = drbd_send_drequest_csum(mdev, sector, size,
333                                               digest, digest_size,
334                                               P_CSUM_RS_REQUEST);
335                 kfree(digest);
336         } else {
337                 dev_err(DEV, "kmalloc() of digest failed.\n");
338                 err = -ENOMEM;
339         }
340
341 out:
342         if (peer_req)
343                 drbd_free_peer_req(mdev, peer_req);
344
345         if (unlikely(err))
346                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
347         return err;
348 }
349
350 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
351
352 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
353 {
354         struct drbd_peer_request *peer_req;
355
356         if (!get_ldev(mdev))
357                 return -EIO;
358
359         if (drbd_rs_should_slow_down(mdev, sector))
360                 goto defer;
361
362         /* GFP_TRY, because if there is no memory available right now, this may
363          * be rescheduled for later. It is "only" background resync, after all. */
364         peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
365                                        size, GFP_TRY);
366         if (!peer_req)
367                 goto defer;
368
369         peer_req->w.cb = w_e_send_csum;
370         spin_lock_irq(&mdev->tconn->req_lock);
371         list_add(&peer_req->w.list, &mdev->read_ee);
372         spin_unlock_irq(&mdev->tconn->req_lock);
373
374         atomic_add(size >> 9, &mdev->rs_sect_ev);
375         if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
376                 return 0;
377
378         /* If it failed because of ENOMEM, retry should help.  If it failed
379          * because bio_add_page failed (probably broken lower level driver),
380          * retry may or may not help.
381          * If it does not, you may need to force disconnect. */
382         spin_lock_irq(&mdev->tconn->req_lock);
383         list_del(&peer_req->w.list);
384         spin_unlock_irq(&mdev->tconn->req_lock);
385
386         drbd_free_peer_req(mdev, peer_req);
387 defer:
388         put_ldev(mdev);
389         return -EAGAIN;
390 }
391
392 int w_resync_timer(struct drbd_work *w, int cancel)
393 {
394         struct drbd_conf *mdev = w->mdev;
395         switch (mdev->state.conn) {
396         case C_VERIFY_S:
397                 w_make_ov_request(w, cancel);
398                 break;
399         case C_SYNC_TARGET:
400                 w_make_resync_request(w, cancel);
401                 break;
402         }
403
404         return 0;
405 }
406
407 void resync_timer_fn(unsigned long data)
408 {
409         struct drbd_conf *mdev = (struct drbd_conf *) data;
410
411         if (list_empty(&mdev->resync_work.list))
412                 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
413 }
414
415 static void fifo_set(struct fifo_buffer *fb, int value)
416 {
417         int i;
418
419         for (i = 0; i < fb->size; i++)
420                 fb->values[i] = value;
421 }
422
423 static int fifo_push(struct fifo_buffer *fb, int value)
424 {
425         int ov;
426
427         ov = fb->values[fb->head_index];
428         fb->values[fb->head_index++] = value;
429
430         if (fb->head_index >= fb->size)
431                 fb->head_index = 0;
432
433         return ov;
434 }
435
436 static void fifo_add_val(struct fifo_buffer *fb, int value)
437 {
438         int i;
439
440         for (i = 0; i < fb->size; i++)
441                 fb->values[i] += value;
442 }
443
444 struct fifo_buffer *fifo_alloc(int fifo_size)
445 {
446         struct fifo_buffer *fb;
447
448         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_KERNEL);
449         if (!fb)
450                 return NULL;
451
452         fb->head_index = 0;
453         fb->size = fifo_size;
454         fb->total = 0;
455
456         return fb;
457 }
458
459 static int drbd_rs_controller(struct drbd_conf *mdev)
460 {
461         struct disk_conf *dc;
462         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
463         unsigned int want;     /* The number of sectors we want in the proxy */
464         int req_sect; /* Number of sectors to request in this turn */
465         int correction; /* Number of sectors more we need in the proxy*/
466         int cps; /* correction per invocation of drbd_rs_controller() */
467         int steps; /* Number of time steps to plan ahead */
468         int curr_corr;
469         int max_sect;
470         struct fifo_buffer *plan;
471
472         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
473         mdev->rs_in_flight -= sect_in;
474
475         dc = rcu_dereference(mdev->ldev->disk_conf);
476         plan = rcu_dereference(mdev->rs_plan_s);
477
478         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
479
480         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
481                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
482         } else { /* normal path */
483                 want = dc->c_fill_target ? dc->c_fill_target :
484                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
485         }
486
487         correction = want - mdev->rs_in_flight - plan->total;
488
489         /* Plan ahead */
490         cps = correction / steps;
491         fifo_add_val(plan, cps);
492         plan->total += cps * steps;
493
494         /* What we do in this step */
495         curr_corr = fifo_push(plan, 0);
496         plan->total -= curr_corr;
497
498         req_sect = sect_in + curr_corr;
499         if (req_sect < 0)
500                 req_sect = 0;
501
502         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
503         if (req_sect > max_sect)
504                 req_sect = max_sect;
505
506         /*
507         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
508                  sect_in, mdev->rs_in_flight, want, correction,
509                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
510         */
511
512         return req_sect;
513 }
514
515 static int drbd_rs_number_requests(struct drbd_conf *mdev)
516 {
517         int number;
518
519         rcu_read_lock();
520         if (rcu_dereference(mdev->rs_plan_s)->size) {
521                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
522                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
523         } else {
524                 mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
525                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
526         }
527         rcu_read_unlock();
528
529         /* ignore the amount of pending requests, the resync controller should
530          * throttle down to incoming reply rate soon enough anyways. */
531         return number;
532 }
533
534 int w_make_resync_request(struct drbd_work *w, int cancel)
535 {
536         struct drbd_conf *mdev = w->mdev;
537         unsigned long bit;
538         sector_t sector;
539         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
540         int max_bio_size;
541         int number, rollback_i, size;
542         int align, queued, sndbuf;
543         int i = 0;
544
545         if (unlikely(cancel))
546                 return 0;
547
548         if (mdev->rs_total == 0) {
549                 /* empty resync? */
550                 drbd_resync_finished(mdev);
551                 return 0;
552         }
553
554         if (!get_ldev(mdev)) {
555                 /* Since we only need to access mdev->rsync a
556                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
557                    to continue resync with a broken disk makes no sense at
558                    all */
559                 dev_err(DEV, "Disk broke down during resync!\n");
560                 return 0;
561         }
562
563         max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
564         number = drbd_rs_number_requests(mdev);
565         if (number == 0)
566                 goto requeue;
567
568         for (i = 0; i < number; i++) {
569                 /* Stop generating RS requests, when half of the send buffer is filled */
570                 mutex_lock(&mdev->tconn->data.mutex);
571                 if (mdev->tconn->data.socket) {
572                         queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
573                         sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
574                 } else {
575                         queued = 1;
576                         sndbuf = 0;
577                 }
578                 mutex_unlock(&mdev->tconn->data.mutex);
579                 if (queued > sndbuf / 2)
580                         goto requeue;
581
582 next_sector:
583                 size = BM_BLOCK_SIZE;
584                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
585
586                 if (bit == DRBD_END_OF_BITMAP) {
587                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
588                         put_ldev(mdev);
589                         return 0;
590                 }
591
592                 sector = BM_BIT_TO_SECT(bit);
593
594                 if (drbd_rs_should_slow_down(mdev, sector) ||
595                     drbd_try_rs_begin_io(mdev, sector)) {
596                         mdev->bm_resync_fo = bit;
597                         goto requeue;
598                 }
599                 mdev->bm_resync_fo = bit + 1;
600
601                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
602                         drbd_rs_complete_io(mdev, sector);
603                         goto next_sector;
604                 }
605
606 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
607                 /* try to find some adjacent bits.
608                  * we stop if we have already the maximum req size.
609                  *
610                  * Additionally always align bigger requests, in order to
611                  * be prepared for all stripe sizes of software RAIDs.
612                  */
613                 align = 1;
614                 rollback_i = i;
615                 for (;;) {
616                         if (size + BM_BLOCK_SIZE > max_bio_size)
617                                 break;
618
619                         /* Be always aligned */
620                         if (sector & ((1<<(align+3))-1))
621                                 break;
622
623                         /* do not cross extent boundaries */
624                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
625                                 break;
626                         /* now, is it actually dirty, after all?
627                          * caution, drbd_bm_test_bit is tri-state for some
628                          * obscure reason; ( b == 0 ) would get the out-of-band
629                          * only accidentally right because of the "oddly sized"
630                          * adjustment below */
631                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
632                                 break;
633                         bit++;
634                         size += BM_BLOCK_SIZE;
635                         if ((BM_BLOCK_SIZE << align) <= size)
636                                 align++;
637                         i++;
638                 }
639                 /* if we merged some,
640                  * reset the offset to start the next drbd_bm_find_next from */
641                 if (size > BM_BLOCK_SIZE)
642                         mdev->bm_resync_fo = bit + 1;
643 #endif
644
645                 /* adjust very last sectors, in case we are oddly sized */
646                 if (sector + (size>>9) > capacity)
647                         size = (capacity-sector)<<9;
648                 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
649                         switch (read_for_csum(mdev, sector, size)) {
650                         case -EIO: /* Disk failure */
651                                 put_ldev(mdev);
652                                 return -EIO;
653                         case -EAGAIN: /* allocation failed, or ldev busy */
654                                 drbd_rs_complete_io(mdev, sector);
655                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
656                                 i = rollback_i;
657                                 goto requeue;
658                         case 0:
659                                 /* everything ok */
660                                 break;
661                         default:
662                                 BUG();
663                         }
664                 } else {
665                         int err;
666
667                         inc_rs_pending(mdev);
668                         err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669                                                  sector, size, ID_SYNCER);
670                         if (err) {
671                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
672                                 dec_rs_pending(mdev);
673                                 put_ldev(mdev);
674                                 return err;
675                         }
676                 }
677         }
678
679         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
680                 /* last syncer _request_ was sent,
681                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
682                  * next sync group will resume), as soon as we receive the last
683                  * resync data block, and the last bit is cleared.
684                  * until then resync "work" is "inactive" ...
685                  */
686                 put_ldev(mdev);
687                 return 0;
688         }
689
690  requeue:
691         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693         put_ldev(mdev);
694         return 0;
695 }
696
697 static int w_make_ov_request(struct drbd_work *w, int cancel)
698 {
699         struct drbd_conf *mdev = w->mdev;
700         int number, i, size;
701         sector_t sector;
702         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
703
704         if (unlikely(cancel))
705                 return 1;
706
707         number = drbd_rs_number_requests(mdev);
708
709         sector = mdev->ov_position;
710         for (i = 0; i < number; i++) {
711                 if (sector >= capacity) {
712                         return 1;
713                 }
714
715                 size = BM_BLOCK_SIZE;
716
717                 if (drbd_rs_should_slow_down(mdev, sector) ||
718                     drbd_try_rs_begin_io(mdev, sector)) {
719                         mdev->ov_position = sector;
720                         goto requeue;
721                 }
722
723                 if (sector + (size>>9) > capacity)
724                         size = (capacity-sector)<<9;
725
726                 inc_rs_pending(mdev);
727                 if (drbd_send_ov_request(mdev, sector, size)) {
728                         dec_rs_pending(mdev);
729                         return 0;
730                 }
731                 sector += BM_SECT_PER_BIT;
732         }
733         mdev->ov_position = sector;
734
735  requeue:
736         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
737         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
738         return 1;
739 }
740
741 int w_ov_finished(struct drbd_work *w, int cancel)
742 {
743         struct drbd_conf *mdev = w->mdev;
744         kfree(w);
745         ov_out_of_sync_print(mdev);
746         drbd_resync_finished(mdev);
747
748         return 0;
749 }
750
751 static int w_resync_finished(struct drbd_work *w, int cancel)
752 {
753         struct drbd_conf *mdev = w->mdev;
754         kfree(w);
755
756         drbd_resync_finished(mdev);
757
758         return 0;
759 }
760
761 static void ping_peer(struct drbd_conf *mdev)
762 {
763         struct drbd_tconn *tconn = mdev->tconn;
764
765         clear_bit(GOT_PING_ACK, &tconn->flags);
766         request_ping(tconn);
767         wait_event(tconn->ping_wait,
768                    test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
769 }
770
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773         unsigned long db, dt, dbdt;
774         unsigned long n_oos;
775         union drbd_state os, ns;
776         struct drbd_work *w;
777         char *khelper_cmd = NULL;
778         int verify_done = 0;
779
780         /* Remove all elements from the resync LRU. Since future actions
781          * might set bits in the (main) bitmap, then the entries in the
782          * resync LRU would be wrong. */
783         if (drbd_rs_del_all(mdev)) {
784                 /* In case this is not possible now, most probably because
785                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
786                  * queue (or even the read operations for those packets
787                  * is not finished by now).   Retry in 100ms. */
788
789                 schedule_timeout_interruptible(HZ / 10);
790                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
791                 if (w) {
792                         w->cb = w_resync_finished;
793                         drbd_queue_work(&mdev->tconn->data.work, w);
794                         return 1;
795                 }
796                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
797         }
798
799         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
800         if (dt <= 0)
801                 dt = 1;
802         db = mdev->rs_total;
803         dbdt = Bit2KB(db/dt);
804         mdev->rs_paused /= HZ;
805
806         if (!get_ldev(mdev))
807                 goto out;
808
809         ping_peer(mdev);
810
811         spin_lock_irq(&mdev->tconn->req_lock);
812         os = drbd_read_state(mdev);
813
814         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
815
816         /* This protects us against multiple calls (that can happen in the presence
817            of application IO), and against connectivity loss just before we arrive here. */
818         if (os.conn <= C_CONNECTED)
819                 goto out_unlock;
820
821         ns = os;
822         ns.conn = C_CONNECTED;
823
824         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
825              verify_done ? "Online verify " : "Resync",
826              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
827
828         n_oos = drbd_bm_total_weight(mdev);
829
830         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
831                 if (n_oos) {
832                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
833                               n_oos, Bit2KB(1));
834                         khelper_cmd = "out-of-sync";
835                 }
836         } else {
837                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
838
839                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
840                         khelper_cmd = "after-resync-target";
841
842                 if (mdev->tconn->csums_tfm && mdev->rs_total) {
843                         const unsigned long s = mdev->rs_same_csum;
844                         const unsigned long t = mdev->rs_total;
845                         const int ratio =
846                                 (t == 0)     ? 0 :
847                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
848                         dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
849                              "transferred %luK total %luK\n",
850                              ratio,
851                              Bit2KB(mdev->rs_same_csum),
852                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
853                              Bit2KB(mdev->rs_total));
854                 }
855         }
856
857         if (mdev->rs_failed) {
858                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
859
860                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861                         ns.disk = D_INCONSISTENT;
862                         ns.pdsk = D_UP_TO_DATE;
863                 } else {
864                         ns.disk = D_UP_TO_DATE;
865                         ns.pdsk = D_INCONSISTENT;
866                 }
867         } else {
868                 ns.disk = D_UP_TO_DATE;
869                 ns.pdsk = D_UP_TO_DATE;
870
871                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
872                         if (mdev->p_uuid) {
873                                 int i;
874                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
875                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
876                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
877                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
878                         } else {
879                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
880                         }
881                 }
882
883                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
884                         /* for verify runs, we don't update uuids here,
885                          * so there would be nothing to report. */
886                         drbd_uuid_set_bm(mdev, 0UL);
887                         drbd_print_uuids(mdev, "updated UUIDs");
888                         if (mdev->p_uuid) {
889                                 /* Now the two UUID sets are equal, update what we
890                                  * know of the peer. */
891                                 int i;
892                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
893                                         mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
894                         }
895                 }
896         }
897
898         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
899 out_unlock:
900         spin_unlock_irq(&mdev->tconn->req_lock);
901         put_ldev(mdev);
902 out:
903         mdev->rs_total  = 0;
904         mdev->rs_failed = 0;
905         mdev->rs_paused = 0;
906         if (verify_done)
907                 mdev->ov_start_sector = 0;
908
909         drbd_md_sync(mdev);
910
911         if (khelper_cmd)
912                 drbd_khelper(mdev, khelper_cmd);
913
914         return 1;
915 }
916
917 /* helper */
918 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
919 {
920         if (drbd_peer_req_has_active_page(peer_req)) {
921                 /* This might happen if sendpage() has not finished */
922                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
923                 atomic_add(i, &mdev->pp_in_use_by_net);
924                 atomic_sub(i, &mdev->pp_in_use);
925                 spin_lock_irq(&mdev->tconn->req_lock);
926                 list_add_tail(&peer_req->w.list, &mdev->net_ee);
927                 spin_unlock_irq(&mdev->tconn->req_lock);
928                 wake_up(&drbd_pp_wait);
929         } else
930                 drbd_free_peer_req(mdev, peer_req);
931 }
932
933 /**
934  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
935  * @mdev:       DRBD device.
936  * @w:          work object.
937  * @cancel:     The connection will be closed anyways
938  */
939 int w_e_end_data_req(struct drbd_work *w, int cancel)
940 {
941         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
942         struct drbd_conf *mdev = w->mdev;
943         int err;
944
945         if (unlikely(cancel)) {
946                 drbd_free_peer_req(mdev, peer_req);
947                 dec_unacked(mdev);
948                 return 0;
949         }
950
951         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
952                 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
953         } else {
954                 if (__ratelimit(&drbd_ratelimit_state))
955                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
956                             (unsigned long long)peer_req->i.sector);
957
958                 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
959         }
960
961         dec_unacked(mdev);
962
963         move_to_net_ee_or_free(mdev, peer_req);
964
965         if (unlikely(err))
966                 dev_err(DEV, "drbd_send_block() failed\n");
967         return err;
968 }
969
970 /**
971  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
972  * @mdev:       DRBD device.
973  * @w:          work object.
974  * @cancel:     The connection will be closed anyways
975  */
976 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
977 {
978         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
979         struct drbd_conf *mdev = w->mdev;
980         int err;
981
982         if (unlikely(cancel)) {
983                 drbd_free_peer_req(mdev, peer_req);
984                 dec_unacked(mdev);
985                 return 0;
986         }
987
988         if (get_ldev_if_state(mdev, D_FAILED)) {
989                 drbd_rs_complete_io(mdev, peer_req->i.sector);
990                 put_ldev(mdev);
991         }
992
993         if (mdev->state.conn == C_AHEAD) {
994                 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
995         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
996                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
997                         inc_rs_pending(mdev);
998                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
999                 } else {
1000                         if (__ratelimit(&drbd_ratelimit_state))
1001                                 dev_err(DEV, "Not sending RSDataReply, "
1002                                     "partner DISKLESS!\n");
1003                         err = 0;
1004                 }
1005         } else {
1006                 if (__ratelimit(&drbd_ratelimit_state))
1007                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1008                             (unsigned long long)peer_req->i.sector);
1009
1010                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1011
1012                 /* update resync data with failure */
1013                 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1014         }
1015
1016         dec_unacked(mdev);
1017
1018         move_to_net_ee_or_free(mdev, peer_req);
1019
1020         if (unlikely(err))
1021                 dev_err(DEV, "drbd_send_block() failed\n");
1022         return err;
1023 }
1024
1025 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1026 {
1027         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1028         struct drbd_conf *mdev = w->mdev;
1029         struct digest_info *di;
1030         int digest_size;
1031         void *digest = NULL;
1032         int err, eq = 0;
1033
1034         if (unlikely(cancel)) {
1035                 drbd_free_peer_req(mdev, peer_req);
1036                 dec_unacked(mdev);
1037                 return 0;
1038         }
1039
1040         if (get_ldev(mdev)) {
1041                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1042                 put_ldev(mdev);
1043         }
1044
1045         di = peer_req->digest;
1046
1047         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1048                 /* quick hack to try to avoid a race against reconfiguration.
1049                  * a real fix would be much more involved,
1050                  * introducing more locking mechanisms */
1051                 if (mdev->tconn->csums_tfm) {
1052                         digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1053                         D_ASSERT(digest_size == di->digest_size);
1054                         digest = kmalloc(digest_size, GFP_NOIO);
1055                 }
1056                 if (digest) {
1057                         drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1058                         eq = !memcmp(digest, di->digest, digest_size);
1059                         kfree(digest);
1060                 }
1061
1062                 if (eq) {
1063                         drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1064                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1065                         mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1066                         err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1067                 } else {
1068                         inc_rs_pending(mdev);
1069                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1070                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1071                         kfree(di);
1072                         err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1073                 }
1074         } else {
1075                 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1076                 if (__ratelimit(&drbd_ratelimit_state))
1077                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1078         }
1079
1080         dec_unacked(mdev);
1081         move_to_net_ee_or_free(mdev, peer_req);
1082
1083         if (unlikely(err))
1084                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1085         return err;
1086 }
1087
1088 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1089 {
1090         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1091         struct drbd_conf *mdev = w->mdev;
1092         sector_t sector = peer_req->i.sector;
1093         unsigned int size = peer_req->i.size;
1094         int digest_size;
1095         void *digest;
1096         int err = 0;
1097
1098         if (unlikely(cancel))
1099                 goto out;
1100
1101         digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1102         digest = kmalloc(digest_size, GFP_NOIO);
1103         if (!digest) {
1104                 err = 1;        /* terminate the connection in case the allocation failed */
1105                 goto out;
1106         }
1107
1108         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1109                 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1110         else
1111                 memset(digest, 0, digest_size);
1112
1113         /* Free e and pages before send.
1114          * In case we block on congestion, we could otherwise run into
1115          * some distributed deadlock, if the other side blocks on
1116          * congestion as well, because our receiver blocks in
1117          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1118         drbd_free_peer_req(mdev, peer_req);
1119         peer_req = NULL;
1120         inc_rs_pending(mdev);
1121         err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1122         if (err)
1123                 dec_rs_pending(mdev);
1124         kfree(digest);
1125
1126 out:
1127         if (peer_req)
1128                 drbd_free_peer_req(mdev, peer_req);
1129         dec_unacked(mdev);
1130         return err;
1131 }
1132
1133 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1134 {
1135         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1136                 mdev->ov_last_oos_size += size>>9;
1137         } else {
1138                 mdev->ov_last_oos_start = sector;
1139                 mdev->ov_last_oos_size = size>>9;
1140         }
1141         drbd_set_out_of_sync(mdev, sector, size);
1142 }
1143
1144 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1145 {
1146         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1147         struct drbd_conf *mdev = w->mdev;
1148         struct digest_info *di;
1149         void *digest;
1150         sector_t sector = peer_req->i.sector;
1151         unsigned int size = peer_req->i.size;
1152         int digest_size;
1153         int err, eq = 0;
1154
1155         if (unlikely(cancel)) {
1156                 drbd_free_peer_req(mdev, peer_req);
1157                 dec_unacked(mdev);
1158                 return 0;
1159         }
1160
1161         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1162          * the resync lru has been cleaned up already */
1163         if (get_ldev(mdev)) {
1164                 drbd_rs_complete_io(mdev, peer_req->i.sector);
1165                 put_ldev(mdev);
1166         }
1167
1168         di = peer_req->digest;
1169
1170         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1171                 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1172                 digest = kmalloc(digest_size, GFP_NOIO);
1173                 if (digest) {
1174                         drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1175
1176                         D_ASSERT(digest_size == di->digest_size);
1177                         eq = !memcmp(digest, di->digest, digest_size);
1178                         kfree(digest);
1179                 }
1180         }
1181
1182         /* Free peer_req and pages before send.
1183          * In case we block on congestion, we could otherwise run into
1184          * some distributed deadlock, if the other side blocks on
1185          * congestion as well, because our receiver blocks in
1186          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1187         drbd_free_peer_req(mdev, peer_req);
1188         if (!eq)
1189                 drbd_ov_out_of_sync_found(mdev, sector, size);
1190         else
1191                 ov_out_of_sync_print(mdev);
1192
1193         err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1194                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1195
1196         dec_unacked(mdev);
1197
1198         --mdev->ov_left;
1199
1200         /* let's advance progress step marks only for every other megabyte */
1201         if ((mdev->ov_left & 0x200) == 0x200)
1202                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1203
1204         if (mdev->ov_left == 0) {
1205                 ov_out_of_sync_print(mdev);
1206                 drbd_resync_finished(mdev);
1207         }
1208
1209         return err;
1210 }
1211
1212 int w_prev_work_done(struct drbd_work *w, int cancel)
1213 {
1214         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1215
1216         complete(&b->done);
1217         return 0;
1218 }
1219
1220 int w_send_barrier(struct drbd_work *w, int cancel)
1221 {
1222         struct drbd_socket *sock;
1223         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1224         struct drbd_conf *mdev = w->mdev;
1225         struct p_barrier *p;
1226
1227         /* really avoid racing with tl_clear.  w.cb may have been referenced
1228          * just before it was reassigned and re-queued, so double check that.
1229          * actually, this race was harmless, since we only try to send the
1230          * barrier packet here, and otherwise do nothing with the object.
1231          * but compare with the head of w_clear_epoch */
1232         spin_lock_irq(&mdev->tconn->req_lock);
1233         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1234                 cancel = 1;
1235         spin_unlock_irq(&mdev->tconn->req_lock);
1236         if (cancel)
1237                 return 0;
1238
1239         sock = &mdev->tconn->data;
1240         p = drbd_prepare_command(mdev, sock);
1241         if (!p)
1242                 return -EIO;
1243         p->barrier = b->br_number;
1244         /* inc_ap_pending was done where this was queued.
1245          * dec_ap_pending will be done in got_BarrierAck
1246          * or (on connection loss) in w_clear_epoch.  */
1247         return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1248 }
1249
1250 int w_send_write_hint(struct drbd_work *w, int cancel)
1251 {
1252         struct drbd_conf *mdev = w->mdev;
1253         struct drbd_socket *sock;
1254
1255         if (cancel)
1256                 return 0;
1257         sock = &mdev->tconn->data;
1258         if (!drbd_prepare_command(mdev, sock))
1259                 return -EIO;
1260         return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1261 }
1262
1263 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1264 {
1265         struct drbd_request *req = container_of(w, struct drbd_request, w);
1266         struct drbd_conf *mdev = w->mdev;
1267         int err;
1268
1269         if (unlikely(cancel)) {
1270                 req_mod(req, SEND_CANCELED);
1271                 return 0;
1272         }
1273
1274         err = drbd_send_out_of_sync(mdev, req);
1275         req_mod(req, OOS_HANDED_TO_NETWORK);
1276
1277         return err;
1278 }
1279
1280 /**
1281  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1282  * @mdev:       DRBD device.
1283  * @w:          work object.
1284  * @cancel:     The connection will be closed anyways
1285  */
1286 int w_send_dblock(struct drbd_work *w, int cancel)
1287 {
1288         struct drbd_request *req = container_of(w, struct drbd_request, w);
1289         struct drbd_conf *mdev = w->mdev;
1290         int err;
1291
1292         if (unlikely(cancel)) {
1293                 req_mod(req, SEND_CANCELED);
1294                 return 0;
1295         }
1296
1297         err = drbd_send_dblock(mdev, req);
1298         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1299
1300         return err;
1301 }
1302
1303 /**
1304  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1305  * @mdev:       DRBD device.
1306  * @w:          work object.
1307  * @cancel:     The connection will be closed anyways
1308  */
1309 int w_send_read_req(struct drbd_work *w, int cancel)
1310 {
1311         struct drbd_request *req = container_of(w, struct drbd_request, w);
1312         struct drbd_conf *mdev = w->mdev;
1313         int err;
1314
1315         if (unlikely(cancel)) {
1316                 req_mod(req, SEND_CANCELED);
1317                 return 0;
1318         }
1319
1320         err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1321                                  (unsigned long)req);
1322
1323         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1324
1325         return err;
1326 }
1327
1328 int w_restart_disk_io(struct drbd_work *w, int cancel)
1329 {
1330         struct drbd_request *req = container_of(w, struct drbd_request, w);
1331         struct drbd_conf *mdev = w->mdev;
1332
1333         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1334                 drbd_al_begin_io(mdev, &req->i);
1335         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1336            theoretically. Practically it can not deadlock, since this is
1337            only used when unfreezing IOs. All the extents of the requests
1338            that made it into the TL are already active */
1339
1340         drbd_req_make_private_bio(req, req->master_bio);
1341         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1342         generic_make_request(req->private_bio);
1343
1344         return 0;
1345 }
1346
1347 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1348 {
1349         struct drbd_conf *odev = mdev;
1350         int resync_after;
1351
1352         while (1) {
1353                 if (!odev->ldev)
1354                         return 1;
1355                 rcu_read_lock();
1356                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1357                 rcu_read_unlock();
1358                 if (resync_after == -1)
1359                         return 1;
1360                 odev = minor_to_mdev(resync_after);
1361                 if (!expect(odev))
1362                         return 1;
1363                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1364                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1365                     odev->state.aftr_isp || odev->state.peer_isp ||
1366                     odev->state.user_isp)
1367                         return 0;
1368         }
1369 }
1370
1371 /**
1372  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1373  * @mdev:       DRBD device.
1374  *
1375  * Called from process context only (admin command and after_state_ch).
1376  */
1377 static int _drbd_pause_after(struct drbd_conf *mdev)
1378 {
1379         struct drbd_conf *odev;
1380         int i, rv = 0;
1381
1382         rcu_read_lock();
1383         idr_for_each_entry(&minors, odev, i) {
1384                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1385                         continue;
1386                 if (!_drbd_may_sync_now(odev))
1387                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1388                                != SS_NOTHING_TO_DO);
1389         }
1390         rcu_read_unlock();
1391
1392         return rv;
1393 }
1394
1395 /**
1396  * _drbd_resume_next() - Resume resync on all devices that may resync now
1397  * @mdev:       DRBD device.
1398  *
1399  * Called from process context only (admin command and worker).
1400  */
1401 static int _drbd_resume_next(struct drbd_conf *mdev)
1402 {
1403         struct drbd_conf *odev;
1404         int i, rv = 0;
1405
1406         rcu_read_lock();
1407         idr_for_each_entry(&minors, odev, i) {
1408                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1409                         continue;
1410                 if (odev->state.aftr_isp) {
1411                         if (_drbd_may_sync_now(odev))
1412                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1413                                                         CS_HARD, NULL)
1414                                        != SS_NOTHING_TO_DO) ;
1415                 }
1416         }
1417         rcu_read_unlock();
1418         return rv;
1419 }
1420
1421 void resume_next_sg(struct drbd_conf *mdev)
1422 {
1423         write_lock_irq(&global_state_lock);
1424         _drbd_resume_next(mdev);
1425         write_unlock_irq(&global_state_lock);
1426 }
1427
1428 void suspend_other_sg(struct drbd_conf *mdev)
1429 {
1430         write_lock_irq(&global_state_lock);
1431         _drbd_pause_after(mdev);
1432         write_unlock_irq(&global_state_lock);
1433 }
1434
1435 /* caller must hold global_state_lock */
1436 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1437 {
1438         struct drbd_conf *odev;
1439         int resync_after;
1440
1441         if (o_minor == -1)
1442                 return NO_ERROR;
1443         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1444                 return ERR_RESYNC_AFTER;
1445
1446         /* check for loops */
1447         odev = minor_to_mdev(o_minor);
1448         while (1) {
1449                 if (odev == mdev)
1450                         return ERR_RESYNC_AFTER_CYCLE;
1451
1452                 rcu_read_lock();
1453                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1454                 rcu_read_unlock();
1455                 /* dependency chain ends here, no cycles. */
1456                 if (resync_after == -1)
1457                         return NO_ERROR;
1458
1459                 /* follow the dependency chain */
1460                 odev = minor_to_mdev(resync_after);
1461         }
1462 }
1463
1464 /* caller must hold global_state_lock */
1465 void drbd_resync_after_changed(struct drbd_conf *mdev)
1466 {
1467         int changes;
1468
1469         do {
1470                 changes  = _drbd_pause_after(mdev);
1471                 changes |= _drbd_resume_next(mdev);
1472         } while (changes);
1473 }
1474
1475 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1476 {
1477         struct fifo_buffer *plan;
1478
1479         atomic_set(&mdev->rs_sect_in, 0);
1480         atomic_set(&mdev->rs_sect_ev, 0);
1481         mdev->rs_in_flight = 0;
1482
1483         /* Updating the RCU protected object in place is necessary since
1484            this function gets called from atomic context.
1485            It is valid since all other updates also lead to an completely
1486            empty fifo */
1487         rcu_read_lock();
1488         plan = rcu_dereference(mdev->rs_plan_s);
1489         plan->total = 0;
1490         fifo_set(plan, 0);
1491         rcu_read_unlock();
1492 }
1493
1494 void start_resync_timer_fn(unsigned long data)
1495 {
1496         struct drbd_conf *mdev = (struct drbd_conf *) data;
1497
1498         drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1499 }
1500
1501 int w_start_resync(struct drbd_work *w, int cancel)
1502 {
1503         struct drbd_conf *mdev = w->mdev;
1504
1505         if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1506                 dev_warn(DEV, "w_start_resync later...\n");
1507                 mdev->start_resync_timer.expires = jiffies + HZ/10;
1508                 add_timer(&mdev->start_resync_timer);
1509                 return 0;
1510         }
1511
1512         drbd_start_resync(mdev, C_SYNC_SOURCE);
1513         clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1514         return 0;
1515 }
1516
1517 /**
1518  * drbd_start_resync() - Start the resync process
1519  * @mdev:       DRBD device.
1520  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1521  *
1522  * This function might bring you directly into one of the
1523  * C_PAUSED_SYNC_* states.
1524  */
1525 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1526 {
1527         union drbd_state ns;
1528         int r;
1529
1530         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1531                 dev_err(DEV, "Resync already running!\n");
1532                 return;
1533         }
1534
1535         if (mdev->state.conn < C_AHEAD) {
1536                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1537                 drbd_rs_cancel_all(mdev);
1538                 /* This should be done when we abort the resync. We definitely do not
1539                    want to have this for connections going back and forth between
1540                    Ahead/Behind and SyncSource/SyncTarget */
1541         }
1542
1543         if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1544                 if (side == C_SYNC_TARGET) {
1545                         /* Since application IO was locked out during C_WF_BITMAP_T and
1546                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1547                            we check that we might make the data inconsistent. */
1548                         r = drbd_khelper(mdev, "before-resync-target");
1549                         r = (r >> 8) & 0xff;
1550                         if (r > 0) {
1551                                 dev_info(DEV, "before-resync-target handler returned %d, "
1552                                          "dropping connection.\n", r);
1553                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1554                                 return;
1555                         }
1556                 } else /* C_SYNC_SOURCE */ {
1557                         r = drbd_khelper(mdev, "before-resync-source");
1558                         r = (r >> 8) & 0xff;
1559                         if (r > 0) {
1560                                 if (r == 3) {
1561                                         dev_info(DEV, "before-resync-source handler returned %d, "
1562                                                  "ignoring. Old userland tools?", r);
1563                                 } else {
1564                                         dev_info(DEV, "before-resync-source handler returned %d, "
1565                                                  "dropping connection.\n", r);
1566                                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1567                                         return;
1568                                 }
1569                         }
1570                 }
1571         }
1572
1573         if (current == mdev->tconn->worker.task) {
1574                 /* The worker should not sleep waiting for state_mutex,
1575                    that can take long */
1576                 if (!mutex_trylock(mdev->state_mutex)) {
1577                         set_bit(B_RS_H_DONE, &mdev->flags);
1578                         mdev->start_resync_timer.expires = jiffies + HZ/5;
1579                         add_timer(&mdev->start_resync_timer);
1580                         return;
1581                 }
1582         } else {
1583                 mutex_lock(mdev->state_mutex);
1584         }
1585         clear_bit(B_RS_H_DONE, &mdev->flags);
1586
1587         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1588                 mutex_unlock(mdev->state_mutex);
1589                 return;
1590         }
1591
1592         write_lock_irq(&global_state_lock);
1593         ns = drbd_read_state(mdev);
1594
1595         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1596
1597         ns.conn = side;
1598
1599         if (side == C_SYNC_TARGET)
1600                 ns.disk = D_INCONSISTENT;
1601         else /* side == C_SYNC_SOURCE */
1602                 ns.pdsk = D_INCONSISTENT;
1603
1604         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1605         ns = drbd_read_state(mdev);
1606
1607         if (ns.conn < C_CONNECTED)
1608                 r = SS_UNKNOWN_ERROR;
1609
1610         if (r == SS_SUCCESS) {
1611                 unsigned long tw = drbd_bm_total_weight(mdev);
1612                 unsigned long now = jiffies;
1613                 int i;
1614
1615                 mdev->rs_failed    = 0;
1616                 mdev->rs_paused    = 0;
1617                 mdev->rs_same_csum = 0;
1618                 mdev->rs_last_events = 0;
1619                 mdev->rs_last_sect_ev = 0;
1620                 mdev->rs_total     = tw;
1621                 mdev->rs_start     = now;
1622                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1623                         mdev->rs_mark_left[i] = tw;
1624                         mdev->rs_mark_time[i] = now;
1625                 }
1626                 _drbd_pause_after(mdev);
1627         }
1628         write_unlock_irq(&global_state_lock);
1629
1630         if (r == SS_SUCCESS) {
1631                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1632                      drbd_conn_str(ns.conn),
1633                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1634                      (unsigned long) mdev->rs_total);
1635                 if (side == C_SYNC_TARGET)
1636                         mdev->bm_resync_fo = 0;
1637
1638                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1639                  * with w_send_oos, or the sync target will get confused as to
1640                  * how much bits to resync.  We cannot do that always, because for an
1641                  * empty resync and protocol < 95, we need to do it here, as we call
1642                  * drbd_resync_finished from here in that case.
1643                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1644                  * and from after_state_ch otherwise. */
1645                 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1646                         drbd_gen_and_send_sync_uuid(mdev);
1647
1648                 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1649                         /* This still has a race (about when exactly the peers
1650                          * detect connection loss) that can lead to a full sync
1651                          * on next handshake. In 8.3.9 we fixed this with explicit
1652                          * resync-finished notifications, but the fix
1653                          * introduces a protocol change.  Sleeping for some
1654                          * time longer than the ping interval + timeout on the
1655                          * SyncSource, to give the SyncTarget the chance to
1656                          * detect connection loss, then waiting for a ping
1657                          * response (implicit in drbd_resync_finished) reduces
1658                          * the race considerably, but does not solve it. */
1659                         if (side == C_SYNC_SOURCE) {
1660                                 struct net_conf *nc;
1661                                 int timeo;
1662
1663                                 rcu_read_lock();
1664                                 nc = rcu_dereference(mdev->tconn->net_conf);
1665                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1666                                 rcu_read_unlock();
1667                                 schedule_timeout_interruptible(timeo);
1668                         }
1669                         drbd_resync_finished(mdev);
1670                 }
1671
1672                 drbd_rs_controller_reset(mdev);
1673                 /* ns.conn may already be != mdev->state.conn,
1674                  * we may have been paused in between, or become paused until
1675                  * the timer triggers.
1676                  * No matter, that is handled in resync_timer_fn() */
1677                 if (ns.conn == C_SYNC_TARGET)
1678                         mod_timer(&mdev->resync_timer, jiffies);
1679
1680                 drbd_md_sync(mdev);
1681         }
1682         put_ldev(mdev);
1683         mutex_unlock(mdev->state_mutex);
1684 }
1685
1686 int drbd_worker(struct drbd_thread *thi)
1687 {
1688         struct drbd_tconn *tconn = thi->tconn;
1689         struct drbd_work *w = NULL;
1690         struct drbd_conf *mdev;
1691         struct net_conf *nc;
1692         LIST_HEAD(work_list);
1693         int vnr, intr = 0;
1694         int cork;
1695
1696         while (get_t_state(thi) == RUNNING) {
1697                 drbd_thread_current_set_cpu(thi);
1698
1699                 if (down_trylock(&tconn->data.work.s)) {
1700                         mutex_lock(&tconn->data.mutex);
1701
1702                         rcu_read_lock();
1703                         nc = rcu_dereference(tconn->net_conf);
1704                         cork = nc ? nc->tcp_cork : 0;
1705                         rcu_read_unlock();
1706
1707                         if (tconn->data.socket && cork)
1708                                 drbd_tcp_uncork(tconn->data.socket);
1709                         mutex_unlock(&tconn->data.mutex);
1710
1711                         intr = down_interruptible(&tconn->data.work.s);
1712
1713                         mutex_lock(&tconn->data.mutex);
1714                         if (tconn->data.socket  && cork)
1715                                 drbd_tcp_cork(tconn->data.socket);
1716                         mutex_unlock(&tconn->data.mutex);
1717                 }
1718
1719                 if (intr) {
1720                         flush_signals(current);
1721                         if (get_t_state(thi) == RUNNING) {
1722                                 conn_warn(tconn, "Worker got an unexpected signal\n");
1723                                 continue;
1724                         }
1725                         break;
1726                 }
1727
1728                 if (get_t_state(thi) != RUNNING)
1729                         break;
1730                 /* With this break, we have done a down() but not consumed
1731                    the entry from the list. The cleanup code takes care of
1732                    this...   */
1733
1734                 w = NULL;
1735                 spin_lock_irq(&tconn->data.work.q_lock);
1736                 if (list_empty(&tconn->data.work.q)) {
1737                         /* something terribly wrong in our logic.
1738                          * we were able to down() the semaphore,
1739                          * but the list is empty... doh.
1740                          *
1741                          * what is the best thing to do now?
1742                          * try again from scratch, restarting the receiver,
1743                          * asender, whatnot? could break even more ugly,
1744                          * e.g. when we are primary, but no good local data.
1745                          *
1746                          * I'll try to get away just starting over this loop.
1747                          */
1748                         conn_warn(tconn, "Work list unexpectedly empty\n");
1749                         spin_unlock_irq(&tconn->data.work.q_lock);
1750                         continue;
1751                 }
1752                 w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1753                 list_del_init(&w->list);
1754                 spin_unlock_irq(&tconn->data.work.q_lock);
1755
1756                 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1757                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1758                         if (tconn->cstate >= C_WF_REPORT_PARAMS)
1759                                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1760                 }
1761         }
1762
1763         spin_lock_irq(&tconn->data.work.q_lock);
1764         while (!list_empty(&tconn->data.work.q)) {
1765                 list_splice_init(&tconn->data.work.q, &work_list);
1766                 spin_unlock_irq(&tconn->data.work.q_lock);
1767
1768                 while (!list_empty(&work_list)) {
1769                         w = list_entry(work_list.next, struct drbd_work, list);
1770                         list_del_init(&w->list);
1771                         w->cb(w, 1);
1772                 }
1773
1774                 spin_lock_irq(&tconn->data.work.q_lock);
1775         }
1776         sema_init(&tconn->data.work.s, 0);
1777         /* DANGEROUS race: if someone did queue his work within the spinlock,
1778          * but up() ed outside the spinlock, we could get an up() on the
1779          * semaphore without corresponding list entry.
1780          * So don't do that.
1781          */
1782         spin_unlock_irq(&tconn->data.work.q_lock);
1783
1784         rcu_read_lock();
1785         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1786                 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1787                 kref_get(&mdev->kref);
1788                 rcu_read_unlock();
1789                 drbd_mdev_cleanup(mdev);
1790                 kref_put(&mdev->kref, &drbd_minor_destroy);
1791                 rcu_read_lock();
1792         }
1793         rcu_read_unlock();
1794
1795         return 0;
1796 }