drbd: new configuration parameter c-min-rate
[linux-block.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61
62
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74         struct drbd_md_io *md_io;
75
76         md_io = (struct drbd_md_io *)bio->bi_private;
77         md_io->error = error;
78
79         complete(&md_io->event);
80 }
81
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87         unsigned long flags = 0;
88         struct drbd_conf *mdev = e->mdev;
89
90         D_ASSERT(e->block_id != ID_VACANT);
91
92         spin_lock_irqsave(&mdev->req_lock, flags);
93         mdev->read_cnt += e->size >> 9;
94         list_del(&e->w.list);
95         if (list_empty(&mdev->read_ee))
96                 wake_up(&mdev->ee_wait);
97         if (test_bit(__EE_WAS_ERROR, &e->flags))
98                 __drbd_chk_io_error(mdev, FALSE);
99         spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101         drbd_queue_work(&mdev->data.work, &e->w);
102         put_ldev(mdev);
103 }
104
105 static int is_failed_barrier(int ee_flags)
106 {
107         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108                         == (EE_IS_BARRIER|EE_WAS_ERROR);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114 {
115         unsigned long flags = 0;
116         struct drbd_conf *mdev = e->mdev;
117         sector_t e_sector;
118         int do_wake;
119         int is_syncer_req;
120         int do_al_complete_io;
121
122         /* if this is a failed barrier request, disable use of barriers,
123          * and schedule for resubmission */
124         if (is_failed_barrier(e->flags)) {
125                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
126                 spin_lock_irqsave(&mdev->req_lock, flags);
127                 list_del(&e->w.list);
128                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129                 e->w.cb = w_e_reissue;
130                 /* put_ldev actually happens below, once we come here again. */
131                 __release(local);
132                 spin_unlock_irqrestore(&mdev->req_lock, flags);
133                 drbd_queue_work(&mdev->data.work, &e->w);
134                 return;
135         }
136
137         D_ASSERT(e->block_id != ID_VACANT);
138
139         /* after we moved e to done_ee,
140          * we may no longer access it,
141          * it may be freed/reused already!
142          * (as soon as we release the req_lock) */
143         e_sector = e->sector;
144         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145         is_syncer_req = is_syncer_block_id(e->block_id);
146
147         spin_lock_irqsave(&mdev->req_lock, flags);
148         mdev->writ_cnt += e->size >> 9;
149         list_del(&e->w.list); /* has been on active_ee or sync_ee */
150         list_add_tail(&e->w.list, &mdev->done_ee);
151
152         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153          * neither did we wake possibly waiting conflicting requests.
154          * done from "drbd_process_done_ee" within the appropriate w.cb
155          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157         do_wake = is_syncer_req
158                 ? list_empty(&mdev->sync_ee)
159                 : list_empty(&mdev->active_ee);
160
161         if (test_bit(__EE_WAS_ERROR, &e->flags))
162                 __drbd_chk_io_error(mdev, FALSE);
163         spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165         if (is_syncer_req)
166                 drbd_rs_complete_io(mdev, e_sector);
167
168         if (do_wake)
169                 wake_up(&mdev->ee_wait);
170
171         if (do_al_complete_io)
172                 drbd_al_complete_io(mdev, e_sector);
173
174         wake_asender(mdev);
175         put_ldev(mdev);
176 }
177
178 /* writes on behalf of the partner, or resync writes,
179  * "submitted" by the receiver.
180  */
181 void drbd_endio_sec(struct bio *bio, int error)
182 {
183         struct drbd_epoch_entry *e = bio->bi_private;
184         struct drbd_conf *mdev = e->mdev;
185         int uptodate = bio_flagged(bio, BIO_UPTODATE);
186         int is_write = bio_data_dir(bio) == WRITE;
187
188         if (error)
189                 dev_warn(DEV, "%s: error=%d s=%llus\n",
190                                 is_write ? "write" : "read", error,
191                                 (unsigned long long)e->sector);
192         if (!error && !uptodate) {
193                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194                                 is_write ? "write" : "read",
195                                 (unsigned long long)e->sector);
196                 /* strange behavior of some lower level drivers...
197                  * fail the request by clearing the uptodate flag,
198                  * but do not return any error?! */
199                 error = -EIO;
200         }
201
202         if (error)
203                 set_bit(__EE_WAS_ERROR, &e->flags);
204
205         bio_put(bio); /* no need for the bio anymore */
206         if (atomic_dec_and_test(&e->pending_bios)) {
207                 if (is_write)
208                         drbd_endio_write_sec_final(e);
209                 else
210                         drbd_endio_read_sec_final(e);
211         }
212 }
213
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215  */
216 void drbd_endio_pri(struct bio *bio, int error)
217 {
218         struct drbd_request *req = bio->bi_private;
219         struct drbd_conf *mdev = req->mdev;
220         enum drbd_req_event what;
221         int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
223         if (!error && !uptodate) {
224                 dev_warn(DEV, "p %s: setting error to -EIO\n",
225                          bio_data_dir(bio) == WRITE ? "write" : "read");
226                 /* strange behavior of some lower level drivers...
227                  * fail the request by clearing the uptodate flag,
228                  * but do not return any error?! */
229                 error = -EIO;
230         }
231
232         /* to avoid recursion in __req_mod */
233         if (unlikely(error)) {
234                 what = (bio_data_dir(bio) == WRITE)
235                         ? write_completed_with_error
236                         : (bio_rw(bio) == READ)
237                           ? read_completed_with_error
238                           : read_ahead_completed_with_error;
239         } else
240                 what = completed_ok;
241
242         bio_put(req->private_bio);
243         req->private_bio = ERR_PTR(error);
244
245         req_mod(req, what);
246 }
247
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249 {
250         struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252         /* We should not detach for read io-error,
253          * but try to WRITE the P_DATA_REPLY to the failed location,
254          * to give the disk the chance to relocate that block */
255
256         spin_lock_irq(&mdev->req_lock);
257         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258                 _req_mod(req, read_retry_remote_canceled);
259                 spin_unlock_irq(&mdev->req_lock);
260                 return 1;
261         }
262         spin_unlock_irq(&mdev->req_lock);
263
264         return w_send_read_req(mdev, w, 0);
265 }
266
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268 {
269         ERR_IF(cancel) return 1;
270         dev_err(DEV, "resync inactive, but callback triggered??\n");
271         return 1; /* Simply ignore this! */
272 }
273
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275 {
276         struct hash_desc desc;
277         struct scatterlist sg;
278         struct page *page = e->pages;
279         struct page *tmp;
280         unsigned len;
281
282         desc.tfm = tfm;
283         desc.flags = 0;
284
285         sg_init_table(&sg, 1);
286         crypto_hash_init(&desc);
287
288         while ((tmp = page_chain_next(page))) {
289                 /* all but the last page will be fully used */
290                 sg_set_page(&sg, page, PAGE_SIZE, 0);
291                 crypto_hash_update(&desc, &sg, sg.length);
292                 page = tmp;
293         }
294         /* and now the last, possibly only partially used page */
295         len = e->size & (PAGE_SIZE - 1);
296         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297         crypto_hash_update(&desc, &sg, sg.length);
298         crypto_hash_final(&desc, digest);
299 }
300
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302 {
303         struct hash_desc desc;
304         struct scatterlist sg;
305         struct bio_vec *bvec;
306         int i;
307
308         desc.tfm = tfm;
309         desc.flags = 0;
310
311         sg_init_table(&sg, 1);
312         crypto_hash_init(&desc);
313
314         __bio_for_each_segment(bvec, bio, i, 0) {
315                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316                 crypto_hash_update(&desc, &sg, sg.length);
317         }
318         crypto_hash_final(&desc, digest);
319 }
320
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322 {
323         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324         int digest_size;
325         void *digest;
326         int ok;
327
328         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330         if (unlikely(cancel)) {
331                 drbd_free_ee(mdev, e);
332                 return 1;
333         }
334
335         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337                 digest = kmalloc(digest_size, GFP_NOIO);
338                 if (digest) {
339                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
340
341                         inc_rs_pending(mdev);
342                         ok = drbd_send_drequest_csum(mdev,
343                                                      e->sector,
344                                                      e->size,
345                                                      digest,
346                                                      digest_size,
347                                                      P_CSUM_RS_REQUEST);
348                         kfree(digest);
349                 } else {
350                         dev_err(DEV, "kmalloc() of digest failed.\n");
351                         ok = 0;
352                 }
353         } else
354                 ok = 1;
355
356         drbd_free_ee(mdev, e);
357
358         if (unlikely(!ok))
359                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360         return ok;
361 }
362
363 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
364
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366 {
367         struct drbd_epoch_entry *e;
368
369         if (!get_ldev(mdev))
370                 return -EIO;
371
372         if (drbd_rs_should_slow_down(mdev))
373                 goto defer;
374
375         /* GFP_TRY, because if there is no memory available right now, this may
376          * be rescheduled for later. It is "only" background resync, after all. */
377         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378         if (!e)
379                 goto defer;
380
381         e->w.cb = w_e_send_csum;
382         spin_lock_irq(&mdev->req_lock);
383         list_add(&e->w.list, &mdev->read_ee);
384         spin_unlock_irq(&mdev->req_lock);
385
386         atomic_add(size >> 9, &mdev->rs_sect_ev);
387         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388                 return 0;
389
390         drbd_free_ee(mdev, e);
391 defer:
392         put_ldev(mdev);
393         return -EAGAIN;
394 }
395
396 void resync_timer_fn(unsigned long data)
397 {
398         unsigned long flags;
399         struct drbd_conf *mdev = (struct drbd_conf *) data;
400         int queue;
401
402         spin_lock_irqsave(&mdev->req_lock, flags);
403
404         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
405                 queue = 1;
406                 if (mdev->state.conn == C_VERIFY_S)
407                         mdev->resync_work.cb = w_make_ov_request;
408                 else
409                         mdev->resync_work.cb = w_make_resync_request;
410         } else {
411                 queue = 0;
412                 mdev->resync_work.cb = w_resync_inactive;
413         }
414
415         spin_unlock_irqrestore(&mdev->req_lock, flags);
416
417         /* harmless race: list_empty outside data.work.q_lock */
418         if (list_empty(&mdev->resync_work.list) && queue)
419                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
420 }
421
422 static void fifo_set(struct fifo_buffer *fb, int value)
423 {
424         int i;
425
426         for (i = 0; i < fb->size; i++)
427                 fb->values[i] += value;
428 }
429
430 static int fifo_push(struct fifo_buffer *fb, int value)
431 {
432         int ov;
433
434         ov = fb->values[fb->head_index];
435         fb->values[fb->head_index++] = value;
436
437         if (fb->head_index >= fb->size)
438                 fb->head_index = 0;
439
440         return ov;
441 }
442
443 static void fifo_add_val(struct fifo_buffer *fb, int value)
444 {
445         int i;
446
447         for (i = 0; i < fb->size; i++)
448                 fb->values[i] += value;
449 }
450
451 int drbd_rs_controller(struct drbd_conf *mdev)
452 {
453         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
454         unsigned int want;     /* The number of sectors we want in the proxy */
455         int req_sect; /* Number of sectors to request in this turn */
456         int correction; /* Number of sectors more we need in the proxy*/
457         int cps; /* correction per invocation of drbd_rs_controller() */
458         int steps; /* Number of time steps to plan ahead */
459         int curr_corr;
460         int max_sect;
461
462         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
463         mdev->rs_in_flight -= sect_in;
464
465         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
466
467         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
468
469         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
470                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
471         } else { /* normal path */
472                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
473                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
474         }
475
476         correction = want - mdev->rs_in_flight - mdev->rs_planed;
477
478         /* Plan ahead */
479         cps = correction / steps;
480         fifo_add_val(&mdev->rs_plan_s, cps);
481         mdev->rs_planed += cps * steps;
482
483         /* What we do in this step */
484         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
485         spin_unlock(&mdev->peer_seq_lock);
486         mdev->rs_planed -= curr_corr;
487
488         req_sect = sect_in + curr_corr;
489         if (req_sect < 0)
490                 req_sect = 0;
491
492         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
493         if (req_sect > max_sect)
494                 req_sect = max_sect;
495
496         /*
497         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
498                  sect_in, mdev->rs_in_flight, want, correction,
499                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
500         */
501
502         return req_sect;
503 }
504
505 int w_make_resync_request(struct drbd_conf *mdev,
506                 struct drbd_work *w, int cancel)
507 {
508         unsigned long bit;
509         sector_t sector;
510         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
511         int max_segment_size;
512         int number, rollback_i, size, pe, mx;
513         int align, queued, sndbuf;
514         int i = 0;
515
516         if (unlikely(cancel))
517                 return 1;
518
519         if (unlikely(mdev->state.conn < C_CONNECTED)) {
520                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
521                 return 0;
522         }
523
524         if (mdev->state.conn != C_SYNC_TARGET)
525                 dev_err(DEV, "%s in w_make_resync_request\n",
526                         drbd_conn_str(mdev->state.conn));
527
528         if (!get_ldev(mdev)) {
529                 /* Since we only need to access mdev->rsync a
530                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
531                    to continue resync with a broken disk makes no sense at
532                    all */
533                 dev_err(DEV, "Disk broke down during resync!\n");
534                 mdev->resync_work.cb = w_resync_inactive;
535                 return 1;
536         }
537
538         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
539          * if it should be necessary */
540         max_segment_size = mdev->agreed_pro_version < 94 ?
541                 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
542
543         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
544                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
545                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
546         } else {
547                 mdev->c_sync_rate = mdev->sync_conf.rate;
548                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
549         }
550
551         /* Throttle resync on lower level disk activity, which may also be
552          * caused by application IO on Primary/SyncTarget.
553          * Keep this after the call to drbd_rs_controller, as that assumes
554          * to be called as precisely as possible every SLEEP_TIME,
555          * and would be confused otherwise. */
556         if (drbd_rs_should_slow_down(mdev))
557                 goto requeue;
558
559         mutex_lock(&mdev->data.mutex);
560         if (mdev->data.socket)
561                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
562         else
563                 mx = 1;
564         mutex_unlock(&mdev->data.mutex);
565
566         /* For resync rates >160MB/sec, allow more pending RS requests */
567         if (number > mx)
568                 mx = number;
569
570         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
571         pe = atomic_read(&mdev->rs_pending_cnt);
572         if ((pe + number) > mx) {
573                 number = mx - pe;
574         }
575
576         for (i = 0; i < number; i++) {
577                 /* Stop generating RS requests, when half of the send buffer is filled */
578                 mutex_lock(&mdev->data.mutex);
579                 if (mdev->data.socket) {
580                         queued = mdev->data.socket->sk->sk_wmem_queued;
581                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
582                 } else {
583                         queued = 1;
584                         sndbuf = 0;
585                 }
586                 mutex_unlock(&mdev->data.mutex);
587                 if (queued > sndbuf / 2)
588                         goto requeue;
589
590 next_sector:
591                 size = BM_BLOCK_SIZE;
592                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
593
594                 if (bit == -1UL) {
595                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
596                         mdev->resync_work.cb = w_resync_inactive;
597                         put_ldev(mdev);
598                         return 1;
599                 }
600
601                 sector = BM_BIT_TO_SECT(bit);
602
603                 if (drbd_try_rs_begin_io(mdev, sector)) {
604                         mdev->bm_resync_fo = bit;
605                         goto requeue;
606                 }
607                 mdev->bm_resync_fo = bit + 1;
608
609                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
610                         drbd_rs_complete_io(mdev, sector);
611                         goto next_sector;
612                 }
613
614 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
615                 /* try to find some adjacent bits.
616                  * we stop if we have already the maximum req size.
617                  *
618                  * Additionally always align bigger requests, in order to
619                  * be prepared for all stripe sizes of software RAIDs.
620                  */
621                 align = 1;
622                 rollback_i = i;
623                 for (;;) {
624                         if (size + BM_BLOCK_SIZE > max_segment_size)
625                                 break;
626
627                         /* Be always aligned */
628                         if (sector & ((1<<(align+3))-1))
629                                 break;
630
631                         /* do not cross extent boundaries */
632                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
633                                 break;
634                         /* now, is it actually dirty, after all?
635                          * caution, drbd_bm_test_bit is tri-state for some
636                          * obscure reason; ( b == 0 ) would get the out-of-band
637                          * only accidentally right because of the "oddly sized"
638                          * adjustment below */
639                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
640                                 break;
641                         bit++;
642                         size += BM_BLOCK_SIZE;
643                         if ((BM_BLOCK_SIZE << align) <= size)
644                                 align++;
645                         i++;
646                 }
647                 /* if we merged some,
648                  * reset the offset to start the next drbd_bm_find_next from */
649                 if (size > BM_BLOCK_SIZE)
650                         mdev->bm_resync_fo = bit + 1;
651 #endif
652
653                 /* adjust very last sectors, in case we are oddly sized */
654                 if (sector + (size>>9) > capacity)
655                         size = (capacity-sector)<<9;
656                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
657                         switch (read_for_csum(mdev, sector, size)) {
658                         case -EIO: /* Disk failure */
659                                 put_ldev(mdev);
660                                 return 0;
661                         case -EAGAIN: /* allocation failed, or ldev busy */
662                                 drbd_rs_complete_io(mdev, sector);
663                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
664                                 i = rollback_i;
665                                 goto requeue;
666                         case 0:
667                                 /* everything ok */
668                                 break;
669                         default:
670                                 BUG();
671                         }
672                 } else {
673                         inc_rs_pending(mdev);
674                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
675                                                sector, size, ID_SYNCER)) {
676                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
677                                 dec_rs_pending(mdev);
678                                 put_ldev(mdev);
679                                 return 0;
680                         }
681                 }
682         }
683
684         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
685                 /* last syncer _request_ was sent,
686                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
687                  * next sync group will resume), as soon as we receive the last
688                  * resync data block, and the last bit is cleared.
689                  * until then resync "work" is "inactive" ...
690                  */
691                 mdev->resync_work.cb = w_resync_inactive;
692                 put_ldev(mdev);
693                 return 1;
694         }
695
696  requeue:
697         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
698         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
699         put_ldev(mdev);
700         return 1;
701 }
702
703 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
704 {
705         int number, i, size;
706         sector_t sector;
707         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
708
709         if (unlikely(cancel))
710                 return 1;
711
712         if (unlikely(mdev->state.conn < C_CONNECTED)) {
713                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
714                 return 0;
715         }
716
717         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
718         if (atomic_read(&mdev->rs_pending_cnt) > number)
719                 goto requeue;
720
721         number -= atomic_read(&mdev->rs_pending_cnt);
722
723         sector = mdev->ov_position;
724         for (i = 0; i < number; i++) {
725                 if (sector >= capacity) {
726                         mdev->resync_work.cb = w_resync_inactive;
727                         return 1;
728                 }
729
730                 size = BM_BLOCK_SIZE;
731
732                 if (drbd_try_rs_begin_io(mdev, sector)) {
733                         mdev->ov_position = sector;
734                         goto requeue;
735                 }
736
737                 if (sector + (size>>9) > capacity)
738                         size = (capacity-sector)<<9;
739
740                 inc_rs_pending(mdev);
741                 if (!drbd_send_ov_request(mdev, sector, size)) {
742                         dec_rs_pending(mdev);
743                         return 0;
744                 }
745                 sector += BM_SECT_PER_BIT;
746         }
747         mdev->ov_position = sector;
748
749  requeue:
750         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
751         return 1;
752 }
753
754
755 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
756 {
757         kfree(w);
758         ov_oos_print(mdev);
759         drbd_resync_finished(mdev);
760
761         return 1;
762 }
763
764 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
765 {
766         kfree(w);
767
768         drbd_resync_finished(mdev);
769
770         return 1;
771 }
772
773 int drbd_resync_finished(struct drbd_conf *mdev)
774 {
775         unsigned long db, dt, dbdt;
776         unsigned long n_oos;
777         union drbd_state os, ns;
778         struct drbd_work *w;
779         char *khelper_cmd = NULL;
780
781         /* Remove all elements from the resync LRU. Since future actions
782          * might set bits in the (main) bitmap, then the entries in the
783          * resync LRU would be wrong. */
784         if (drbd_rs_del_all(mdev)) {
785                 /* In case this is not possible now, most probably because
786                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
787                  * queue (or even the read operations for those packets
788                  * is not finished by now).   Retry in 100ms. */
789
790                 drbd_kick_lo(mdev);
791                 __set_current_state(TASK_INTERRUPTIBLE);
792                 schedule_timeout(HZ / 10);
793                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
794                 if (w) {
795                         w->cb = w_resync_finished;
796                         drbd_queue_work(&mdev->data.work, w);
797                         return 1;
798                 }
799                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
800         }
801
802         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
803         if (dt <= 0)
804                 dt = 1;
805         db = mdev->rs_total;
806         dbdt = Bit2KB(db/dt);
807         mdev->rs_paused /= HZ;
808
809         if (!get_ldev(mdev))
810                 goto out;
811
812         spin_lock_irq(&mdev->req_lock);
813         os = mdev->state;
814
815         /* This protects us against multiple calls (that can happen in the presence
816            of application IO), and against connectivity loss just before we arrive here. */
817         if (os.conn <= C_CONNECTED)
818                 goto out_unlock;
819
820         ns = os;
821         ns.conn = C_CONNECTED;
822
823         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
824              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
825              "Online verify " : "Resync",
826              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
827
828         n_oos = drbd_bm_total_weight(mdev);
829
830         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
831                 if (n_oos) {
832                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
833                               n_oos, Bit2KB(1));
834                         khelper_cmd = "out-of-sync";
835                 }
836         } else {
837                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
838
839                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
840                         khelper_cmd = "after-resync-target";
841
842                 if (mdev->csums_tfm && mdev->rs_total) {
843                         const unsigned long s = mdev->rs_same_csum;
844                         const unsigned long t = mdev->rs_total;
845                         const int ratio =
846                                 (t == 0)     ? 0 :
847                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
848                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
849                              "transferred %luK total %luK\n",
850                              ratio,
851                              Bit2KB(mdev->rs_same_csum),
852                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
853                              Bit2KB(mdev->rs_total));
854                 }
855         }
856
857         if (mdev->rs_failed) {
858                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
859
860                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861                         ns.disk = D_INCONSISTENT;
862                         ns.pdsk = D_UP_TO_DATE;
863                 } else {
864                         ns.disk = D_UP_TO_DATE;
865                         ns.pdsk = D_INCONSISTENT;
866                 }
867         } else {
868                 ns.disk = D_UP_TO_DATE;
869                 ns.pdsk = D_UP_TO_DATE;
870
871                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
872                         if (mdev->p_uuid) {
873                                 int i;
874                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
875                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
876                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
877                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
878                         } else {
879                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
880                         }
881                 }
882
883                 drbd_uuid_set_bm(mdev, 0UL);
884
885                 if (mdev->p_uuid) {
886                         /* Now the two UUID sets are equal, update what we
887                          * know of the peer. */
888                         int i;
889                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
890                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
891                 }
892         }
893
894         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
895 out_unlock:
896         spin_unlock_irq(&mdev->req_lock);
897         put_ldev(mdev);
898 out:
899         mdev->rs_total  = 0;
900         mdev->rs_failed = 0;
901         mdev->rs_paused = 0;
902         mdev->ov_start_sector = 0;
903
904         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
905                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
906                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
907         }
908
909         if (khelper_cmd)
910                 drbd_khelper(mdev, khelper_cmd);
911
912         return 1;
913 }
914
915 /* helper */
916 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
917 {
918         if (drbd_ee_has_active_page(e)) {
919                 /* This might happen if sendpage() has not finished */
920                 spin_lock_irq(&mdev->req_lock);
921                 list_add_tail(&e->w.list, &mdev->net_ee);
922                 spin_unlock_irq(&mdev->req_lock);
923         } else
924                 drbd_free_ee(mdev, e);
925 }
926
927 /**
928  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
929  * @mdev:       DRBD device.
930  * @w:          work object.
931  * @cancel:     The connection will be closed anyways
932  */
933 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
934 {
935         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
936         int ok;
937
938         if (unlikely(cancel)) {
939                 drbd_free_ee(mdev, e);
940                 dec_unacked(mdev);
941                 return 1;
942         }
943
944         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
945                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
946         } else {
947                 if (__ratelimit(&drbd_ratelimit_state))
948                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
949                             (unsigned long long)e->sector);
950
951                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
952         }
953
954         dec_unacked(mdev);
955
956         move_to_net_ee_or_free(mdev, e);
957
958         if (unlikely(!ok))
959                 dev_err(DEV, "drbd_send_block() failed\n");
960         return ok;
961 }
962
963 /**
964  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
965  * @mdev:       DRBD device.
966  * @w:          work object.
967  * @cancel:     The connection will be closed anyways
968  */
969 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
970 {
971         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
972         int ok;
973
974         if (unlikely(cancel)) {
975                 drbd_free_ee(mdev, e);
976                 dec_unacked(mdev);
977                 return 1;
978         }
979
980         if (get_ldev_if_state(mdev, D_FAILED)) {
981                 drbd_rs_complete_io(mdev, e->sector);
982                 put_ldev(mdev);
983         }
984
985         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
986                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
987                         inc_rs_pending(mdev);
988                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
989                 } else {
990                         if (__ratelimit(&drbd_ratelimit_state))
991                                 dev_err(DEV, "Not sending RSDataReply, "
992                                     "partner DISKLESS!\n");
993                         ok = 1;
994                 }
995         } else {
996                 if (__ratelimit(&drbd_ratelimit_state))
997                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
998                             (unsigned long long)e->sector);
999
1000                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1001
1002                 /* update resync data with failure */
1003                 drbd_rs_failed_io(mdev, e->sector, e->size);
1004         }
1005
1006         dec_unacked(mdev);
1007
1008         move_to_net_ee_or_free(mdev, e);
1009
1010         if (unlikely(!ok))
1011                 dev_err(DEV, "drbd_send_block() failed\n");
1012         return ok;
1013 }
1014
1015 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1016 {
1017         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1018         struct digest_info *di;
1019         int digest_size;
1020         void *digest = NULL;
1021         int ok, eq = 0;
1022
1023         if (unlikely(cancel)) {
1024                 drbd_free_ee(mdev, e);
1025                 dec_unacked(mdev);
1026                 return 1;
1027         }
1028
1029         drbd_rs_complete_io(mdev, e->sector);
1030
1031         di = e->digest;
1032
1033         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1034                 /* quick hack to try to avoid a race against reconfiguration.
1035                  * a real fix would be much more involved,
1036                  * introducing more locking mechanisms */
1037                 if (mdev->csums_tfm) {
1038                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1039                         D_ASSERT(digest_size == di->digest_size);
1040                         digest = kmalloc(digest_size, GFP_NOIO);
1041                 }
1042                 if (digest) {
1043                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1044                         eq = !memcmp(digest, di->digest, digest_size);
1045                         kfree(digest);
1046                 }
1047
1048                 if (eq) {
1049                         drbd_set_in_sync(mdev, e->sector, e->size);
1050                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1051                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1052                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1053                 } else {
1054                         inc_rs_pending(mdev);
1055                         e->block_id = ID_SYNCER;
1056                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1057                 }
1058         } else {
1059                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1060                 if (__ratelimit(&drbd_ratelimit_state))
1061                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1062         }
1063
1064         dec_unacked(mdev);
1065         move_to_net_ee_or_free(mdev, e);
1066
1067         if (unlikely(!ok))
1068                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1069         return ok;
1070 }
1071
1072 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1073 {
1074         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1075         int digest_size;
1076         void *digest;
1077         int ok = 1;
1078
1079         if (unlikely(cancel))
1080                 goto out;
1081
1082         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1083                 goto out;
1084
1085         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1086         /* FIXME if this allocation fails, online verify will not terminate! */
1087         digest = kmalloc(digest_size, GFP_NOIO);
1088         if (digest) {
1089                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1090                 inc_rs_pending(mdev);
1091                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1092                                              digest, digest_size, P_OV_REPLY);
1093                 if (!ok)
1094                         dec_rs_pending(mdev);
1095                 kfree(digest);
1096         }
1097
1098 out:
1099         drbd_free_ee(mdev, e);
1100
1101         dec_unacked(mdev);
1102
1103         return ok;
1104 }
1105
1106 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1107 {
1108         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109                 mdev->ov_last_oos_size += size>>9;
1110         } else {
1111                 mdev->ov_last_oos_start = sector;
1112                 mdev->ov_last_oos_size = size>>9;
1113         }
1114         drbd_set_out_of_sync(mdev, sector, size);
1115         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1116 }
1117
1118 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1119 {
1120         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1121         struct digest_info *di;
1122         int digest_size;
1123         void *digest;
1124         int ok, eq = 0;
1125
1126         if (unlikely(cancel)) {
1127                 drbd_free_ee(mdev, e);
1128                 dec_unacked(mdev);
1129                 return 1;
1130         }
1131
1132         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1133          * the resync lru has been cleaned up already */
1134         drbd_rs_complete_io(mdev, e->sector);
1135
1136         di = e->digest;
1137
1138         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1139                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1140                 digest = kmalloc(digest_size, GFP_NOIO);
1141                 if (digest) {
1142                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1143
1144                         D_ASSERT(digest_size == di->digest_size);
1145                         eq = !memcmp(digest, di->digest, digest_size);
1146                         kfree(digest);
1147                 }
1148         } else {
1149                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1150                 if (__ratelimit(&drbd_ratelimit_state))
1151                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1152         }
1153
1154         dec_unacked(mdev);
1155         if (!eq)
1156                 drbd_ov_oos_found(mdev, e->sector, e->size);
1157         else
1158                 ov_oos_print(mdev);
1159
1160         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1161                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1162
1163         drbd_free_ee(mdev, e);
1164
1165         if (--mdev->ov_left == 0) {
1166                 ov_oos_print(mdev);
1167                 drbd_resync_finished(mdev);
1168         }
1169
1170         return ok;
1171 }
1172
1173 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1174 {
1175         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1176         complete(&b->done);
1177         return 1;
1178 }
1179
1180 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1181 {
1182         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1183         struct p_barrier *p = &mdev->data.sbuf.barrier;
1184         int ok = 1;
1185
1186         /* really avoid racing with tl_clear.  w.cb may have been referenced
1187          * just before it was reassigned and re-queued, so double check that.
1188          * actually, this race was harmless, since we only try to send the
1189          * barrier packet here, and otherwise do nothing with the object.
1190          * but compare with the head of w_clear_epoch */
1191         spin_lock_irq(&mdev->req_lock);
1192         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1193                 cancel = 1;
1194         spin_unlock_irq(&mdev->req_lock);
1195         if (cancel)
1196                 return 1;
1197
1198         if (!drbd_get_data_sock(mdev))
1199                 return 0;
1200         p->barrier = b->br_number;
1201         /* inc_ap_pending was done where this was queued.
1202          * dec_ap_pending will be done in got_BarrierAck
1203          * or (on connection loss) in w_clear_epoch.  */
1204         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1205                                 (struct p_header *)p, sizeof(*p), 0);
1206         drbd_put_data_sock(mdev);
1207
1208         return ok;
1209 }
1210
1211 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1212 {
1213         if (cancel)
1214                 return 1;
1215         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1216 }
1217
1218 /**
1219  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1220  * @mdev:       DRBD device.
1221  * @w:          work object.
1222  * @cancel:     The connection will be closed anyways
1223  */
1224 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1225 {
1226         struct drbd_request *req = container_of(w, struct drbd_request, w);
1227         int ok;
1228
1229         if (unlikely(cancel)) {
1230                 req_mod(req, send_canceled);
1231                 return 1;
1232         }
1233
1234         ok = drbd_send_dblock(mdev, req);
1235         req_mod(req, ok ? handed_over_to_network : send_failed);
1236
1237         return ok;
1238 }
1239
1240 /**
1241  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1242  * @mdev:       DRBD device.
1243  * @w:          work object.
1244  * @cancel:     The connection will be closed anyways
1245  */
1246 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1247 {
1248         struct drbd_request *req = container_of(w, struct drbd_request, w);
1249         int ok;
1250
1251         if (unlikely(cancel)) {
1252                 req_mod(req, send_canceled);
1253                 return 1;
1254         }
1255
1256         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1257                                 (unsigned long)req);
1258
1259         if (!ok) {
1260                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1261                  * so this is probably redundant */
1262                 if (mdev->state.conn >= C_CONNECTED)
1263                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1264         }
1265         req_mod(req, ok ? handed_over_to_network : send_failed);
1266
1267         return ok;
1268 }
1269
1270 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1271 {
1272         struct drbd_request *req = container_of(w, struct drbd_request, w);
1273
1274         if (bio_data_dir(req->master_bio) == WRITE)
1275                 drbd_al_begin_io(mdev, req->sector);
1276         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1277            theoretically. Practically it can not deadlock, since this is
1278            only used when unfreezing IOs. All the extents of the requests
1279            that made it into the TL are already active */
1280
1281         drbd_req_make_private_bio(req, req->master_bio);
1282         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1283         generic_make_request(req->private_bio);
1284
1285         return 1;
1286 }
1287
1288 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1289 {
1290         struct drbd_conf *odev = mdev;
1291
1292         while (1) {
1293                 if (odev->sync_conf.after == -1)
1294                         return 1;
1295                 odev = minor_to_mdev(odev->sync_conf.after);
1296                 ERR_IF(!odev) return 1;
1297                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1298                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1299                     odev->state.aftr_isp || odev->state.peer_isp ||
1300                     odev->state.user_isp)
1301                         return 0;
1302         }
1303 }
1304
1305 /**
1306  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1307  * @mdev:       DRBD device.
1308  *
1309  * Called from process context only (admin command and after_state_ch).
1310  */
1311 static int _drbd_pause_after(struct drbd_conf *mdev)
1312 {
1313         struct drbd_conf *odev;
1314         int i, rv = 0;
1315
1316         for (i = 0; i < minor_count; i++) {
1317                 odev = minor_to_mdev(i);
1318                 if (!odev)
1319                         continue;
1320                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1321                         continue;
1322                 if (!_drbd_may_sync_now(odev))
1323                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1324                                != SS_NOTHING_TO_DO);
1325         }
1326
1327         return rv;
1328 }
1329
1330 /**
1331  * _drbd_resume_next() - Resume resync on all devices that may resync now
1332  * @mdev:       DRBD device.
1333  *
1334  * Called from process context only (admin command and worker).
1335  */
1336 static int _drbd_resume_next(struct drbd_conf *mdev)
1337 {
1338         struct drbd_conf *odev;
1339         int i, rv = 0;
1340
1341         for (i = 0; i < minor_count; i++) {
1342                 odev = minor_to_mdev(i);
1343                 if (!odev)
1344                         continue;
1345                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1346                         continue;
1347                 if (odev->state.aftr_isp) {
1348                         if (_drbd_may_sync_now(odev))
1349                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1350                                                         CS_HARD, NULL)
1351                                        != SS_NOTHING_TO_DO) ;
1352                 }
1353         }
1354         return rv;
1355 }
1356
1357 void resume_next_sg(struct drbd_conf *mdev)
1358 {
1359         write_lock_irq(&global_state_lock);
1360         _drbd_resume_next(mdev);
1361         write_unlock_irq(&global_state_lock);
1362 }
1363
1364 void suspend_other_sg(struct drbd_conf *mdev)
1365 {
1366         write_lock_irq(&global_state_lock);
1367         _drbd_pause_after(mdev);
1368         write_unlock_irq(&global_state_lock);
1369 }
1370
1371 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1372 {
1373         struct drbd_conf *odev;
1374
1375         if (o_minor == -1)
1376                 return NO_ERROR;
1377         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1378                 return ERR_SYNC_AFTER;
1379
1380         /* check for loops */
1381         odev = minor_to_mdev(o_minor);
1382         while (1) {
1383                 if (odev == mdev)
1384                         return ERR_SYNC_AFTER_CYCLE;
1385
1386                 /* dependency chain ends here, no cycles. */
1387                 if (odev->sync_conf.after == -1)
1388                         return NO_ERROR;
1389
1390                 /* follow the dependency chain */
1391                 odev = minor_to_mdev(odev->sync_conf.after);
1392         }
1393 }
1394
1395 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1396 {
1397         int changes;
1398         int retcode;
1399
1400         write_lock_irq(&global_state_lock);
1401         retcode = sync_after_error(mdev, na);
1402         if (retcode == NO_ERROR) {
1403                 mdev->sync_conf.after = na;
1404                 do {
1405                         changes  = _drbd_pause_after(mdev);
1406                         changes |= _drbd_resume_next(mdev);
1407                 } while (changes);
1408         }
1409         write_unlock_irq(&global_state_lock);
1410         return retcode;
1411 }
1412
1413 static void ping_peer(struct drbd_conf *mdev)
1414 {
1415         clear_bit(GOT_PING_ACK, &mdev->flags);
1416         request_ping(mdev);
1417         wait_event(mdev->misc_wait,
1418                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1419 }
1420
1421 /**
1422  * drbd_start_resync() - Start the resync process
1423  * @mdev:       DRBD device.
1424  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1425  *
1426  * This function might bring you directly into one of the
1427  * C_PAUSED_SYNC_* states.
1428  */
1429 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1430 {
1431         union drbd_state ns;
1432         int r;
1433
1434         if (mdev->state.conn >= C_SYNC_SOURCE) {
1435                 dev_err(DEV, "Resync already running!\n");
1436                 return;
1437         }
1438
1439         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1440         drbd_rs_cancel_all(mdev);
1441
1442         if (side == C_SYNC_TARGET) {
1443                 /* Since application IO was locked out during C_WF_BITMAP_T and
1444                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1445                    we check that we might make the data inconsistent. */
1446                 r = drbd_khelper(mdev, "before-resync-target");
1447                 r = (r >> 8) & 0xff;
1448                 if (r > 0) {
1449                         dev_info(DEV, "before-resync-target handler returned %d, "
1450                              "dropping connection.\n", r);
1451                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1452                         return;
1453                 }
1454         }
1455
1456         drbd_state_lock(mdev);
1457
1458         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1459                 drbd_state_unlock(mdev);
1460                 return;
1461         }
1462
1463         if (side == C_SYNC_TARGET) {
1464                 mdev->bm_resync_fo = 0;
1465         } else /* side == C_SYNC_SOURCE */ {
1466                 u64 uuid;
1467
1468                 get_random_bytes(&uuid, sizeof(u64));
1469                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1470                 drbd_send_sync_uuid(mdev, uuid);
1471
1472                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1473         }
1474
1475         write_lock_irq(&global_state_lock);
1476         ns = mdev->state;
1477
1478         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1479
1480         ns.conn = side;
1481
1482         if (side == C_SYNC_TARGET)
1483                 ns.disk = D_INCONSISTENT;
1484         else /* side == C_SYNC_SOURCE */
1485                 ns.pdsk = D_INCONSISTENT;
1486
1487         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1488         ns = mdev->state;
1489
1490         if (ns.conn < C_CONNECTED)
1491                 r = SS_UNKNOWN_ERROR;
1492
1493         if (r == SS_SUCCESS) {
1494                 unsigned long tw = drbd_bm_total_weight(mdev);
1495                 unsigned long now = jiffies;
1496                 int i;
1497
1498                 mdev->rs_failed    = 0;
1499                 mdev->rs_paused    = 0;
1500                 mdev->rs_same_csum = 0;
1501                 mdev->rs_last_events = 0;
1502                 mdev->rs_last_sect_ev = 0;
1503                 mdev->rs_total     = tw;
1504                 mdev->rs_start     = now;
1505                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1506                         mdev->rs_mark_left[i] = tw;
1507                         mdev->rs_mark_time[i] = now;
1508                 }
1509                 _drbd_pause_after(mdev);
1510         }
1511         write_unlock_irq(&global_state_lock);
1512         put_ldev(mdev);
1513
1514         if (r == SS_SUCCESS) {
1515                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1516                      drbd_conn_str(ns.conn),
1517                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1518                      (unsigned long) mdev->rs_total);
1519
1520                 if (mdev->rs_total == 0) {
1521                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1522                         ping_peer(mdev);
1523                         drbd_resync_finished(mdev);
1524                 }
1525
1526                 atomic_set(&mdev->rs_sect_in, 0);
1527                 atomic_set(&mdev->rs_sect_ev, 0);
1528                 mdev->rs_in_flight = 0;
1529                 mdev->rs_planed = 0;
1530                 spin_lock(&mdev->peer_seq_lock);
1531                 fifo_set(&mdev->rs_plan_s, 0);
1532                 spin_unlock(&mdev->peer_seq_lock);
1533                 /* ns.conn may already be != mdev->state.conn,
1534                  * we may have been paused in between, or become paused until
1535                  * the timer triggers.
1536                  * No matter, that is handled in resync_timer_fn() */
1537                 if (ns.conn == C_SYNC_TARGET)
1538                         mod_timer(&mdev->resync_timer, jiffies);
1539
1540                 drbd_md_sync(mdev);
1541         }
1542         drbd_state_unlock(mdev);
1543 }
1544
1545 int drbd_worker(struct drbd_thread *thi)
1546 {
1547         struct drbd_conf *mdev = thi->mdev;
1548         struct drbd_work *w = NULL;
1549         LIST_HEAD(work_list);
1550         int intr = 0, i;
1551
1552         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1553
1554         while (get_t_state(thi) == Running) {
1555                 drbd_thread_current_set_cpu(mdev);
1556
1557                 if (down_trylock(&mdev->data.work.s)) {
1558                         mutex_lock(&mdev->data.mutex);
1559                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1560                                 drbd_tcp_uncork(mdev->data.socket);
1561                         mutex_unlock(&mdev->data.mutex);
1562
1563                         intr = down_interruptible(&mdev->data.work.s);
1564
1565                         mutex_lock(&mdev->data.mutex);
1566                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1567                                 drbd_tcp_cork(mdev->data.socket);
1568                         mutex_unlock(&mdev->data.mutex);
1569                 }
1570
1571                 if (intr) {
1572                         D_ASSERT(intr == -EINTR);
1573                         flush_signals(current);
1574                         ERR_IF (get_t_state(thi) == Running)
1575                                 continue;
1576                         break;
1577                 }
1578
1579                 if (get_t_state(thi) != Running)
1580                         break;
1581                 /* With this break, we have done a down() but not consumed
1582                    the entry from the list. The cleanup code takes care of
1583                    this...   */
1584
1585                 w = NULL;
1586                 spin_lock_irq(&mdev->data.work.q_lock);
1587                 ERR_IF(list_empty(&mdev->data.work.q)) {
1588                         /* something terribly wrong in our logic.
1589                          * we were able to down() the semaphore,
1590                          * but the list is empty... doh.
1591                          *
1592                          * what is the best thing to do now?
1593                          * try again from scratch, restarting the receiver,
1594                          * asender, whatnot? could break even more ugly,
1595                          * e.g. when we are primary, but no good local data.
1596                          *
1597                          * I'll try to get away just starting over this loop.
1598                          */
1599                         spin_unlock_irq(&mdev->data.work.q_lock);
1600                         continue;
1601                 }
1602                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1603                 list_del_init(&w->list);
1604                 spin_unlock_irq(&mdev->data.work.q_lock);
1605
1606                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1607                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1608                         if (mdev->state.conn >= C_CONNECTED)
1609                                 drbd_force_state(mdev,
1610                                                 NS(conn, C_NETWORK_FAILURE));
1611                 }
1612         }
1613         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1614         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1615
1616         spin_lock_irq(&mdev->data.work.q_lock);
1617         i = 0;
1618         while (!list_empty(&mdev->data.work.q)) {
1619                 list_splice_init(&mdev->data.work.q, &work_list);
1620                 spin_unlock_irq(&mdev->data.work.q_lock);
1621
1622                 while (!list_empty(&work_list)) {
1623                         w = list_entry(work_list.next, struct drbd_work, list);
1624                         list_del_init(&w->list);
1625                         w->cb(mdev, w, 1);
1626                         i++; /* dead debugging code */
1627                 }
1628
1629                 spin_lock_irq(&mdev->data.work.q_lock);
1630         }
1631         sema_init(&mdev->data.work.s, 0);
1632         /* DANGEROUS race: if someone did queue his work within the spinlock,
1633          * but up() ed outside the spinlock, we could get an up() on the
1634          * semaphore without corresponding list entry.
1635          * So don't do that.
1636          */
1637         spin_unlock_irq(&mdev->data.work.q_lock);
1638
1639         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1640         /* _drbd_set_state only uses stop_nowait.
1641          * wait here for the Exiting receiver. */
1642         drbd_thread_stop(&mdev->receiver);
1643         drbd_mdev_cleanup(mdev);
1644
1645         dev_info(DEV, "worker terminated\n");
1646
1647         clear_bit(DEVICE_DYING, &mdev->flags);
1648         clear_bit(CONFIG_PENDING, &mdev->flags);
1649         wake_up(&mdev->state_wait);
1650
1651         return 0;
1652 }