drbd: No longer answer P_RS_DATA_REQUEST packets when in C_AHEAD mode
[linux-block.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43                                  struct drbd_work *w, int cancel);
44
45
46
47 /* defined here:
48    drbd_md_io_complete
49    drbd_endio_sec
50    drbd_endio_pri
51
52  * more endio handlers:
53    atodb_endio in drbd_actlog.c
54    drbd_bm_async_io_complete in drbd_bitmap.c
55
56  * For all these callbacks, note the following:
57  * The callbacks will be called in irq context by the IDE drivers,
58  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
59  * Try to get the locking right :)
60  *
61  */
62
63
64 /* About the global_state_lock
65    Each state transition on an device holds a read lock. In case we have
66    to evaluate the sync after dependencies, we grab a write lock, because
67    we need stable states on all devices for that.  */
68 rwlock_t global_state_lock;
69
70 /* used for synchronous meta data and bitmap IO
71  * submitted by drbd_md_sync_page_io()
72  */
73 void drbd_md_io_complete(struct bio *bio, int error)
74 {
75         struct drbd_md_io *md_io;
76
77         md_io = (struct drbd_md_io *)bio->bi_private;
78         md_io->error = error;
79
80         complete(&md_io->event);
81 }
82
83 /* reads on behalf of the partner,
84  * "submitted" by the receiver
85  */
86 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87 {
88         unsigned long flags = 0;
89         struct drbd_conf *mdev = e->mdev;
90
91         D_ASSERT(e->block_id != ID_VACANT);
92
93         spin_lock_irqsave(&mdev->req_lock, flags);
94         mdev->read_cnt += e->size >> 9;
95         list_del(&e->w.list);
96         if (list_empty(&mdev->read_ee))
97                 wake_up(&mdev->ee_wait);
98         if (test_bit(__EE_WAS_ERROR, &e->flags))
99                 __drbd_chk_io_error(mdev, false);
100         spin_unlock_irqrestore(&mdev->req_lock, flags);
101
102         drbd_queue_work(&mdev->data.work, &e->w);
103         put_ldev(mdev);
104 }
105
106 /* writes on behalf of the partner, or resync writes,
107  * "submitted" by the receiver, final stage.  */
108 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
109 {
110         unsigned long flags = 0;
111         struct drbd_conf *mdev = e->mdev;
112         sector_t e_sector;
113         int do_wake;
114         int is_syncer_req;
115         int do_al_complete_io;
116
117         D_ASSERT(e->block_id != ID_VACANT);
118
119         /* after we moved e to done_ee,
120          * we may no longer access it,
121          * it may be freed/reused already!
122          * (as soon as we release the req_lock) */
123         e_sector = e->sector;
124         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
125         is_syncer_req = is_syncer_block_id(e->block_id);
126
127         spin_lock_irqsave(&mdev->req_lock, flags);
128         mdev->writ_cnt += e->size >> 9;
129         list_del(&e->w.list); /* has been on active_ee or sync_ee */
130         list_add_tail(&e->w.list, &mdev->done_ee);
131
132         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
133          * neither did we wake possibly waiting conflicting requests.
134          * done from "drbd_process_done_ee" within the appropriate w.cb
135          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
136
137         do_wake = is_syncer_req
138                 ? list_empty(&mdev->sync_ee)
139                 : list_empty(&mdev->active_ee);
140
141         if (test_bit(__EE_WAS_ERROR, &e->flags))
142                 __drbd_chk_io_error(mdev, false);
143         spin_unlock_irqrestore(&mdev->req_lock, flags);
144
145         if (is_syncer_req)
146                 drbd_rs_complete_io(mdev, e_sector);
147
148         if (do_wake)
149                 wake_up(&mdev->ee_wait);
150
151         if (do_al_complete_io)
152                 drbd_al_complete_io(mdev, e_sector);
153
154         wake_asender(mdev);
155         put_ldev(mdev);
156 }
157
158 /* writes on behalf of the partner, or resync writes,
159  * "submitted" by the receiver.
160  */
161 void drbd_endio_sec(struct bio *bio, int error)
162 {
163         struct drbd_epoch_entry *e = bio->bi_private;
164         struct drbd_conf *mdev = e->mdev;
165         int uptodate = bio_flagged(bio, BIO_UPTODATE);
166         int is_write = bio_data_dir(bio) == WRITE;
167
168         if (error && __ratelimit(&drbd_ratelimit_state))
169                 dev_warn(DEV, "%s: error=%d s=%llus\n",
170                                 is_write ? "write" : "read", error,
171                                 (unsigned long long)e->sector);
172         if (!error && !uptodate) {
173                 if (__ratelimit(&drbd_ratelimit_state))
174                         dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
175                                         is_write ? "write" : "read",
176                                         (unsigned long long)e->sector);
177                 /* strange behavior of some lower level drivers...
178                  * fail the request by clearing the uptodate flag,
179                  * but do not return any error?! */
180                 error = -EIO;
181         }
182
183         if (error)
184                 set_bit(__EE_WAS_ERROR, &e->flags);
185
186         bio_put(bio); /* no need for the bio anymore */
187         if (atomic_dec_and_test(&e->pending_bios)) {
188                 if (is_write)
189                         drbd_endio_write_sec_final(e);
190                 else
191                         drbd_endio_read_sec_final(e);
192         }
193 }
194
195 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
196  */
197 void drbd_endio_pri(struct bio *bio, int error)
198 {
199         unsigned long flags;
200         struct drbd_request *req = bio->bi_private;
201         struct drbd_conf *mdev = req->mdev;
202         struct bio_and_error m;
203         enum drbd_req_event what;
204         int uptodate = bio_flagged(bio, BIO_UPTODATE);
205
206         if (!error && !uptodate) {
207                 dev_warn(DEV, "p %s: setting error to -EIO\n",
208                          bio_data_dir(bio) == WRITE ? "write" : "read");
209                 /* strange behavior of some lower level drivers...
210                  * fail the request by clearing the uptodate flag,
211                  * but do not return any error?! */
212                 error = -EIO;
213         }
214
215         /* to avoid recursion in __req_mod */
216         if (unlikely(error)) {
217                 what = (bio_data_dir(bio) == WRITE)
218                         ? write_completed_with_error
219                         : (bio_rw(bio) == READ)
220                           ? read_completed_with_error
221                           : read_ahead_completed_with_error;
222         } else
223                 what = completed_ok;
224
225         bio_put(req->private_bio);
226         req->private_bio = ERR_PTR(error);
227
228         /* not req_mod(), we need irqsave here! */
229         spin_lock_irqsave(&mdev->req_lock, flags);
230         __req_mod(req, what, &m);
231         spin_unlock_irqrestore(&mdev->req_lock, flags);
232
233         if (m.bio)
234                 complete_master_bio(mdev, &m);
235 }
236
237 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
238 {
239         struct drbd_request *req = container_of(w, struct drbd_request, w);
240
241         /* We should not detach for read io-error,
242          * but try to WRITE the P_DATA_REPLY to the failed location,
243          * to give the disk the chance to relocate that block */
244
245         spin_lock_irq(&mdev->req_lock);
246         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
247                 _req_mod(req, read_retry_remote_canceled);
248                 spin_unlock_irq(&mdev->req_lock);
249                 return 1;
250         }
251         spin_unlock_irq(&mdev->req_lock);
252
253         return w_send_read_req(mdev, w, 0);
254 }
255
256 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
257 {
258         ERR_IF(cancel) return 1;
259         dev_err(DEV, "resync inactive, but callback triggered??\n");
260         return 1; /* Simply ignore this! */
261 }
262
263 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
264 {
265         struct hash_desc desc;
266         struct scatterlist sg;
267         struct page *page = e->pages;
268         struct page *tmp;
269         unsigned len;
270
271         desc.tfm = tfm;
272         desc.flags = 0;
273
274         sg_init_table(&sg, 1);
275         crypto_hash_init(&desc);
276
277         while ((tmp = page_chain_next(page))) {
278                 /* all but the last page will be fully used */
279                 sg_set_page(&sg, page, PAGE_SIZE, 0);
280                 crypto_hash_update(&desc, &sg, sg.length);
281                 page = tmp;
282         }
283         /* and now the last, possibly only partially used page */
284         len = e->size & (PAGE_SIZE - 1);
285         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
286         crypto_hash_update(&desc, &sg, sg.length);
287         crypto_hash_final(&desc, digest);
288 }
289
290 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
291 {
292         struct hash_desc desc;
293         struct scatterlist sg;
294         struct bio_vec *bvec;
295         int i;
296
297         desc.tfm = tfm;
298         desc.flags = 0;
299
300         sg_init_table(&sg, 1);
301         crypto_hash_init(&desc);
302
303         __bio_for_each_segment(bvec, bio, i, 0) {
304                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
305                 crypto_hash_update(&desc, &sg, sg.length);
306         }
307         crypto_hash_final(&desc, digest);
308 }
309
310 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
311 {
312         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
313         int digest_size;
314         void *digest;
315         int ok;
316
317         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
318
319         if (unlikely(cancel)) {
320                 drbd_free_ee(mdev, e);
321                 return 1;
322         }
323
324         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
325                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
326                 digest = kmalloc(digest_size, GFP_NOIO);
327                 if (digest) {
328                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
329
330                         inc_rs_pending(mdev);
331                         ok = drbd_send_drequest_csum(mdev,
332                                                      e->sector,
333                                                      e->size,
334                                                      digest,
335                                                      digest_size,
336                                                      P_CSUM_RS_REQUEST);
337                         kfree(digest);
338                 } else {
339                         dev_err(DEV, "kmalloc() of digest failed.\n");
340                         ok = 0;
341                 }
342         } else
343                 ok = 1;
344
345         drbd_free_ee(mdev, e);
346
347         if (unlikely(!ok))
348                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
349         return ok;
350 }
351
352 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
353
354 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
355 {
356         struct drbd_epoch_entry *e;
357
358         if (!get_ldev(mdev))
359                 return -EIO;
360
361         if (drbd_rs_should_slow_down(mdev, sector))
362                 goto defer;
363
364         /* GFP_TRY, because if there is no memory available right now, this may
365          * be rescheduled for later. It is "only" background resync, after all. */
366         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
367         if (!e)
368                 goto defer;
369
370         e->w.cb = w_e_send_csum;
371         spin_lock_irq(&mdev->req_lock);
372         list_add(&e->w.list, &mdev->read_ee);
373         spin_unlock_irq(&mdev->req_lock);
374
375         atomic_add(size >> 9, &mdev->rs_sect_ev);
376         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
377                 return 0;
378
379         /* drbd_submit_ee currently fails for one reason only:
380          * not being able to allocate enough bios.
381          * Is dropping the connection going to help? */
382         spin_lock_irq(&mdev->req_lock);
383         list_del(&e->w.list);
384         spin_unlock_irq(&mdev->req_lock);
385
386         drbd_free_ee(mdev, e);
387 defer:
388         put_ldev(mdev);
389         return -EAGAIN;
390 }
391
392 void resync_timer_fn(unsigned long data)
393 {
394         struct drbd_conf *mdev = (struct drbd_conf *) data;
395         int queue;
396
397         queue = 1;
398         switch (mdev->state.conn) {
399         case C_VERIFY_S:
400                 mdev->resync_work.cb = w_make_ov_request;
401                 break;
402         case C_SYNC_TARGET:
403                 mdev->resync_work.cb = w_make_resync_request;
404                 break;
405         default:
406                 queue = 0;
407                 mdev->resync_work.cb = w_resync_inactive;
408         }
409
410         /* harmless race: list_empty outside data.work.q_lock */
411         if (list_empty(&mdev->resync_work.list) && queue)
412                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
413 }
414
415 static void fifo_set(struct fifo_buffer *fb, int value)
416 {
417         int i;
418
419         for (i = 0; i < fb->size; i++)
420                 fb->values[i] = value;
421 }
422
423 static int fifo_push(struct fifo_buffer *fb, int value)
424 {
425         int ov;
426
427         ov = fb->values[fb->head_index];
428         fb->values[fb->head_index++] = value;
429
430         if (fb->head_index >= fb->size)
431                 fb->head_index = 0;
432
433         return ov;
434 }
435
436 static void fifo_add_val(struct fifo_buffer *fb, int value)
437 {
438         int i;
439
440         for (i = 0; i < fb->size; i++)
441                 fb->values[i] += value;
442 }
443
444 static int drbd_rs_controller(struct drbd_conf *mdev)
445 {
446         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
447         unsigned int want;     /* The number of sectors we want in the proxy */
448         int req_sect; /* Number of sectors to request in this turn */
449         int correction; /* Number of sectors more we need in the proxy*/
450         int cps; /* correction per invocation of drbd_rs_controller() */
451         int steps; /* Number of time steps to plan ahead */
452         int curr_corr;
453         int max_sect;
454
455         sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
456         mdev->rs_in_flight -= sect_in;
457
458         spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
459
460         steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
461
462         if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
463                 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
464         } else { /* normal path */
465                 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
466                         sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
467         }
468
469         correction = want - mdev->rs_in_flight - mdev->rs_planed;
470
471         /* Plan ahead */
472         cps = correction / steps;
473         fifo_add_val(&mdev->rs_plan_s, cps);
474         mdev->rs_planed += cps * steps;
475
476         /* What we do in this step */
477         curr_corr = fifo_push(&mdev->rs_plan_s, 0);
478         spin_unlock(&mdev->peer_seq_lock);
479         mdev->rs_planed -= curr_corr;
480
481         req_sect = sect_in + curr_corr;
482         if (req_sect < 0)
483                 req_sect = 0;
484
485         max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
486         if (req_sect > max_sect)
487                 req_sect = max_sect;
488
489         /*
490         dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
491                  sect_in, mdev->rs_in_flight, want, correction,
492                  steps, cps, mdev->rs_planed, curr_corr, req_sect);
493         */
494
495         return req_sect;
496 }
497
498 static int drbd_rs_number_requests(struct drbd_conf *mdev)
499 {
500         int number;
501         if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
502                 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
503                 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
504         } else {
505                 mdev->c_sync_rate = mdev->sync_conf.rate;
506                 number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
507         }
508
509         /* ignore the amount of pending requests, the resync controller should
510          * throttle down to incoming reply rate soon enough anyways. */
511         return number;
512 }
513
514 static int w_make_resync_request(struct drbd_conf *mdev,
515                                  struct drbd_work *w, int cancel)
516 {
517         unsigned long bit;
518         sector_t sector;
519         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
520         int max_bio_size;
521         int number, rollback_i, size;
522         int align, queued, sndbuf;
523         int i = 0;
524
525         if (unlikely(cancel))
526                 return 1;
527
528         if (unlikely(mdev->state.conn < C_CONNECTED)) {
529                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
530                 return 0;
531         }
532
533         if (mdev->state.conn != C_SYNC_TARGET)
534                 dev_err(DEV, "%s in w_make_resync_request\n",
535                         drbd_conn_str(mdev->state.conn));
536
537         if (mdev->rs_total == 0) {
538                 /* empty resync? */
539                 drbd_resync_finished(mdev);
540                 return 1;
541         }
542
543         if (!get_ldev(mdev)) {
544                 /* Since we only need to access mdev->rsync a
545                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
546                    to continue resync with a broken disk makes no sense at
547                    all */
548                 dev_err(DEV, "Disk broke down during resync!\n");
549                 mdev->resync_work.cb = w_resync_inactive;
550                 return 1;
551         }
552
553         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
554          * if it should be necessary */
555         max_bio_size =
556                 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
557                 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
558
559         number = drbd_rs_number_requests(mdev);
560         if (number == 0)
561                 goto requeue;
562
563         for (i = 0; i < number; i++) {
564                 /* Stop generating RS requests, when half of the send buffer is filled */
565                 mutex_lock(&mdev->data.mutex);
566                 if (mdev->data.socket) {
567                         queued = mdev->data.socket->sk->sk_wmem_queued;
568                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
569                 } else {
570                         queued = 1;
571                         sndbuf = 0;
572                 }
573                 mutex_unlock(&mdev->data.mutex);
574                 if (queued > sndbuf / 2)
575                         goto requeue;
576
577 next_sector:
578                 size = BM_BLOCK_SIZE;
579                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
580
581                 if (bit == DRBD_END_OF_BITMAP) {
582                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
583                         mdev->resync_work.cb = w_resync_inactive;
584                         put_ldev(mdev);
585                         return 1;
586                 }
587
588                 sector = BM_BIT_TO_SECT(bit);
589
590                 if (drbd_rs_should_slow_down(mdev, sector) ||
591                     drbd_try_rs_begin_io(mdev, sector)) {
592                         mdev->bm_resync_fo = bit;
593                         goto requeue;
594                 }
595                 mdev->bm_resync_fo = bit + 1;
596
597                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
598                         drbd_rs_complete_io(mdev, sector);
599                         goto next_sector;
600                 }
601
602 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
603                 /* try to find some adjacent bits.
604                  * we stop if we have already the maximum req size.
605                  *
606                  * Additionally always align bigger requests, in order to
607                  * be prepared for all stripe sizes of software RAIDs.
608                  */
609                 align = 1;
610                 rollback_i = i;
611                 for (;;) {
612                         if (size + BM_BLOCK_SIZE > max_bio_size)
613                                 break;
614
615                         /* Be always aligned */
616                         if (sector & ((1<<(align+3))-1))
617                                 break;
618
619                         /* do not cross extent boundaries */
620                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
621                                 break;
622                         /* now, is it actually dirty, after all?
623                          * caution, drbd_bm_test_bit is tri-state for some
624                          * obscure reason; ( b == 0 ) would get the out-of-band
625                          * only accidentally right because of the "oddly sized"
626                          * adjustment below */
627                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
628                                 break;
629                         bit++;
630                         size += BM_BLOCK_SIZE;
631                         if ((BM_BLOCK_SIZE << align) <= size)
632                                 align++;
633                         i++;
634                 }
635                 /* if we merged some,
636                  * reset the offset to start the next drbd_bm_find_next from */
637                 if (size > BM_BLOCK_SIZE)
638                         mdev->bm_resync_fo = bit + 1;
639 #endif
640
641                 /* adjust very last sectors, in case we are oddly sized */
642                 if (sector + (size>>9) > capacity)
643                         size = (capacity-sector)<<9;
644                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
645                         switch (read_for_csum(mdev, sector, size)) {
646                         case -EIO: /* Disk failure */
647                                 put_ldev(mdev);
648                                 return 0;
649                         case -EAGAIN: /* allocation failed, or ldev busy */
650                                 drbd_rs_complete_io(mdev, sector);
651                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
652                                 i = rollback_i;
653                                 goto requeue;
654                         case 0:
655                                 /* everything ok */
656                                 break;
657                         default:
658                                 BUG();
659                         }
660                 } else {
661                         inc_rs_pending(mdev);
662                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
663                                                sector, size, ID_SYNCER)) {
664                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
665                                 dec_rs_pending(mdev);
666                                 put_ldev(mdev);
667                                 return 0;
668                         }
669                 }
670         }
671
672         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
673                 /* last syncer _request_ was sent,
674                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
675                  * next sync group will resume), as soon as we receive the last
676                  * resync data block, and the last bit is cleared.
677                  * until then resync "work" is "inactive" ...
678                  */
679                 mdev->resync_work.cb = w_resync_inactive;
680                 put_ldev(mdev);
681                 return 1;
682         }
683
684  requeue:
685         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
686         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
687         put_ldev(mdev);
688         return 1;
689 }
690
691 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
692 {
693         int number, i, size;
694         sector_t sector;
695         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
696
697         if (unlikely(cancel))
698                 return 1;
699
700         if (unlikely(mdev->state.conn < C_CONNECTED)) {
701                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
702                 return 0;
703         }
704
705         number = drbd_rs_number_requests(mdev);
706
707         sector = mdev->ov_position;
708         for (i = 0; i < number; i++) {
709                 if (sector >= capacity) {
710                         mdev->resync_work.cb = w_resync_inactive;
711                         return 1;
712                 }
713
714                 size = BM_BLOCK_SIZE;
715
716                 if (drbd_rs_should_slow_down(mdev, sector) ||
717                     drbd_try_rs_begin_io(mdev, sector)) {
718                         mdev->ov_position = sector;
719                         goto requeue;
720                 }
721
722                 if (sector + (size>>9) > capacity)
723                         size = (capacity-sector)<<9;
724
725                 inc_rs_pending(mdev);
726                 if (!drbd_send_ov_request(mdev, sector, size)) {
727                         dec_rs_pending(mdev);
728                         return 0;
729                 }
730                 sector += BM_SECT_PER_BIT;
731         }
732         mdev->ov_position = sector;
733
734  requeue:
735         mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
736         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
737         return 1;
738 }
739
740
741 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
742 {
743         drbd_start_resync(mdev, C_SYNC_SOURCE);
744
745         return 1;
746 }
747
748 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
749 {
750         kfree(w);
751         ov_oos_print(mdev);
752         drbd_resync_finished(mdev);
753
754         return 1;
755 }
756
757 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
758 {
759         kfree(w);
760
761         drbd_resync_finished(mdev);
762
763         return 1;
764 }
765
766 static void ping_peer(struct drbd_conf *mdev)
767 {
768         clear_bit(GOT_PING_ACK, &mdev->flags);
769         request_ping(mdev);
770         wait_event(mdev->misc_wait,
771                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
772 }
773
774 int drbd_resync_finished(struct drbd_conf *mdev)
775 {
776         unsigned long db, dt, dbdt;
777         unsigned long n_oos;
778         union drbd_state os, ns;
779         struct drbd_work *w;
780         char *khelper_cmd = NULL;
781         int verify_done = 0;
782
783         /* Remove all elements from the resync LRU. Since future actions
784          * might set bits in the (main) bitmap, then the entries in the
785          * resync LRU would be wrong. */
786         if (drbd_rs_del_all(mdev)) {
787                 /* In case this is not possible now, most probably because
788                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
789                  * queue (or even the read operations for those packets
790                  * is not finished by now).   Retry in 100ms. */
791
792                 __set_current_state(TASK_INTERRUPTIBLE);
793                 schedule_timeout(HZ / 10);
794                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
795                 if (w) {
796                         w->cb = w_resync_finished;
797                         drbd_queue_work(&mdev->data.work, w);
798                         return 1;
799                 }
800                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
801         }
802
803         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
804         if (dt <= 0)
805                 dt = 1;
806         db = mdev->rs_total;
807         dbdt = Bit2KB(db/dt);
808         mdev->rs_paused /= HZ;
809
810         if (!get_ldev(mdev))
811                 goto out;
812
813         ping_peer(mdev);
814
815         spin_lock_irq(&mdev->req_lock);
816         os = mdev->state;
817
818         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
819
820         /* This protects us against multiple calls (that can happen in the presence
821            of application IO), and against connectivity loss just before we arrive here. */
822         if (os.conn <= C_CONNECTED)
823                 goto out_unlock;
824
825         ns = os;
826         ns.conn = C_CONNECTED;
827
828         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
829              verify_done ? "Online verify " : "Resync",
830              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
831
832         n_oos = drbd_bm_total_weight(mdev);
833
834         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
835                 if (n_oos) {
836                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
837                               n_oos, Bit2KB(1));
838                         khelper_cmd = "out-of-sync";
839                 }
840         } else {
841                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
842
843                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
844                         khelper_cmd = "after-resync-target";
845
846                 if (mdev->csums_tfm && mdev->rs_total) {
847                         const unsigned long s = mdev->rs_same_csum;
848                         const unsigned long t = mdev->rs_total;
849                         const int ratio =
850                                 (t == 0)     ? 0 :
851                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
852                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
853                              "transferred %luK total %luK\n",
854                              ratio,
855                              Bit2KB(mdev->rs_same_csum),
856                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
857                              Bit2KB(mdev->rs_total));
858                 }
859         }
860
861         if (mdev->rs_failed) {
862                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
863
864                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
865                         ns.disk = D_INCONSISTENT;
866                         ns.pdsk = D_UP_TO_DATE;
867                 } else {
868                         ns.disk = D_UP_TO_DATE;
869                         ns.pdsk = D_INCONSISTENT;
870                 }
871         } else {
872                 ns.disk = D_UP_TO_DATE;
873                 ns.pdsk = D_UP_TO_DATE;
874
875                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
876                         if (mdev->p_uuid) {
877                                 int i;
878                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
879                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
880                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
881                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
882                         } else {
883                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
884                         }
885                 }
886
887                 drbd_uuid_set_bm(mdev, 0UL);
888
889                 if (mdev->p_uuid) {
890                         /* Now the two UUID sets are equal, update what we
891                          * know of the peer. */
892                         int i;
893                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
894                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
895                 }
896         }
897
898         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
899 out_unlock:
900         spin_unlock_irq(&mdev->req_lock);
901         put_ldev(mdev);
902 out:
903         mdev->rs_total  = 0;
904         mdev->rs_failed = 0;
905         mdev->rs_paused = 0;
906         if (verify_done)
907                 mdev->ov_start_sector = 0;
908
909         drbd_md_sync(mdev);
910
911         if (khelper_cmd)
912                 drbd_khelper(mdev, khelper_cmd);
913
914         return 1;
915 }
916
917 /* helper */
918 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
919 {
920         if (drbd_ee_has_active_page(e)) {
921                 /* This might happen if sendpage() has not finished */
922                 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
923                 atomic_add(i, &mdev->pp_in_use_by_net);
924                 atomic_sub(i, &mdev->pp_in_use);
925                 spin_lock_irq(&mdev->req_lock);
926                 list_add_tail(&e->w.list, &mdev->net_ee);
927                 spin_unlock_irq(&mdev->req_lock);
928                 wake_up(&drbd_pp_wait);
929         } else
930                 drbd_free_ee(mdev, e);
931 }
932
933 /**
934  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
935  * @mdev:       DRBD device.
936  * @w:          work object.
937  * @cancel:     The connection will be closed anyways
938  */
939 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
940 {
941         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
942         int ok;
943
944         if (unlikely(cancel)) {
945                 drbd_free_ee(mdev, e);
946                 dec_unacked(mdev);
947                 return 1;
948         }
949
950         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
951                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
952         } else {
953                 if (__ratelimit(&drbd_ratelimit_state))
954                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
955                             (unsigned long long)e->sector);
956
957                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
958         }
959
960         dec_unacked(mdev);
961
962         move_to_net_ee_or_free(mdev, e);
963
964         if (unlikely(!ok))
965                 dev_err(DEV, "drbd_send_block() failed\n");
966         return ok;
967 }
968
969 /**
970  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
971  * @mdev:       DRBD device.
972  * @w:          work object.
973  * @cancel:     The connection will be closed anyways
974  */
975 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
976 {
977         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
978         int ok;
979
980         if (unlikely(cancel)) {
981                 drbd_free_ee(mdev, e);
982                 dec_unacked(mdev);
983                 return 1;
984         }
985
986         if (get_ldev_if_state(mdev, D_FAILED)) {
987                 drbd_rs_complete_io(mdev, e->sector);
988                 put_ldev(mdev);
989         }
990
991         if (mdev->state.conn == C_AHEAD) {
992                 ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
993         } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
994                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
995                         inc_rs_pending(mdev);
996                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
997                 } else {
998                         if (__ratelimit(&drbd_ratelimit_state))
999                                 dev_err(DEV, "Not sending RSDataReply, "
1000                                     "partner DISKLESS!\n");
1001                         ok = 1;
1002                 }
1003         } else {
1004                 if (__ratelimit(&drbd_ratelimit_state))
1005                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1006                             (unsigned long long)e->sector);
1007
1008                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1009
1010                 /* update resync data with failure */
1011                 drbd_rs_failed_io(mdev, e->sector, e->size);
1012         }
1013
1014         dec_unacked(mdev);
1015
1016         move_to_net_ee_or_free(mdev, e);
1017
1018         if (unlikely(!ok))
1019                 dev_err(DEV, "drbd_send_block() failed\n");
1020         return ok;
1021 }
1022
1023 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1024 {
1025         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1026         struct digest_info *di;
1027         int digest_size;
1028         void *digest = NULL;
1029         int ok, eq = 0;
1030
1031         if (unlikely(cancel)) {
1032                 drbd_free_ee(mdev, e);
1033                 dec_unacked(mdev);
1034                 return 1;
1035         }
1036
1037         if (get_ldev(mdev)) {
1038                 drbd_rs_complete_io(mdev, e->sector);
1039                 put_ldev(mdev);
1040         }
1041
1042         di = e->digest;
1043
1044         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1045                 /* quick hack to try to avoid a race against reconfiguration.
1046                  * a real fix would be much more involved,
1047                  * introducing more locking mechanisms */
1048                 if (mdev->csums_tfm) {
1049                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1050                         D_ASSERT(digest_size == di->digest_size);
1051                         digest = kmalloc(digest_size, GFP_NOIO);
1052                 }
1053                 if (digest) {
1054                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1055                         eq = !memcmp(digest, di->digest, digest_size);
1056                         kfree(digest);
1057                 }
1058
1059                 if (eq) {
1060                         drbd_set_in_sync(mdev, e->sector, e->size);
1061                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1062                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1063                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1064                 } else {
1065                         inc_rs_pending(mdev);
1066                         e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1067                         e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1068                         kfree(di);
1069                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1070                 }
1071         } else {
1072                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1073                 if (__ratelimit(&drbd_ratelimit_state))
1074                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1075         }
1076
1077         dec_unacked(mdev);
1078         move_to_net_ee_or_free(mdev, e);
1079
1080         if (unlikely(!ok))
1081                 dev_err(DEV, "drbd_send_block/ack() failed\n");
1082         return ok;
1083 }
1084
1085 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1088         int digest_size;
1089         void *digest;
1090         int ok = 1;
1091
1092         if (unlikely(cancel))
1093                 goto out;
1094
1095         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1096                 goto out;
1097
1098         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1099         /* FIXME if this allocation fails, online verify will not terminate! */
1100         digest = kmalloc(digest_size, GFP_NOIO);
1101         if (digest) {
1102                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1103                 inc_rs_pending(mdev);
1104                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1105                                              digest, digest_size, P_OV_REPLY);
1106                 if (!ok)
1107                         dec_rs_pending(mdev);
1108                 kfree(digest);
1109         }
1110
1111 out:
1112         drbd_free_ee(mdev, e);
1113
1114         dec_unacked(mdev);
1115
1116         return ok;
1117 }
1118
1119 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1120 {
1121         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1122                 mdev->ov_last_oos_size += size>>9;
1123         } else {
1124                 mdev->ov_last_oos_start = sector;
1125                 mdev->ov_last_oos_size = size>>9;
1126         }
1127         drbd_set_out_of_sync(mdev, sector, size);
1128 }
1129
1130 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1131 {
1132         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1133         struct digest_info *di;
1134         int digest_size;
1135         void *digest;
1136         int ok, eq = 0;
1137
1138         if (unlikely(cancel)) {
1139                 drbd_free_ee(mdev, e);
1140                 dec_unacked(mdev);
1141                 return 1;
1142         }
1143
1144         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1145          * the resync lru has been cleaned up already */
1146         if (get_ldev(mdev)) {
1147                 drbd_rs_complete_io(mdev, e->sector);
1148                 put_ldev(mdev);
1149         }
1150
1151         di = e->digest;
1152
1153         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1154                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1155                 digest = kmalloc(digest_size, GFP_NOIO);
1156                 if (digest) {
1157                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1158
1159                         D_ASSERT(digest_size == di->digest_size);
1160                         eq = !memcmp(digest, di->digest, digest_size);
1161                         kfree(digest);
1162                 }
1163         } else {
1164                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1165                 if (__ratelimit(&drbd_ratelimit_state))
1166                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1167         }
1168
1169         dec_unacked(mdev);
1170         if (!eq)
1171                 drbd_ov_oos_found(mdev, e->sector, e->size);
1172         else
1173                 ov_oos_print(mdev);
1174
1175         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1176                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1177
1178         drbd_free_ee(mdev, e);
1179
1180         --mdev->ov_left;
1181
1182         /* let's advance progress step marks only for every other megabyte */
1183         if ((mdev->ov_left & 0x200) == 0x200)
1184                 drbd_advance_rs_marks(mdev, mdev->ov_left);
1185
1186         if (mdev->ov_left == 0) {
1187                 ov_oos_print(mdev);
1188                 drbd_resync_finished(mdev);
1189         }
1190
1191         return ok;
1192 }
1193
1194 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1195 {
1196         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1197         complete(&b->done);
1198         return 1;
1199 }
1200
1201 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1202 {
1203         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1204         struct p_barrier *p = &mdev->data.sbuf.barrier;
1205         int ok = 1;
1206
1207         /* really avoid racing with tl_clear.  w.cb may have been referenced
1208          * just before it was reassigned and re-queued, so double check that.
1209          * actually, this race was harmless, since we only try to send the
1210          * barrier packet here, and otherwise do nothing with the object.
1211          * but compare with the head of w_clear_epoch */
1212         spin_lock_irq(&mdev->req_lock);
1213         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1214                 cancel = 1;
1215         spin_unlock_irq(&mdev->req_lock);
1216         if (cancel)
1217                 return 1;
1218
1219         if (!drbd_get_data_sock(mdev))
1220                 return 0;
1221         p->barrier = b->br_number;
1222         /* inc_ap_pending was done where this was queued.
1223          * dec_ap_pending will be done in got_BarrierAck
1224          * or (on connection loss) in w_clear_epoch.  */
1225         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1226                                 (struct p_header80 *)p, sizeof(*p), 0);
1227         drbd_put_data_sock(mdev);
1228
1229         return ok;
1230 }
1231
1232 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1233 {
1234         if (cancel)
1235                 return 1;
1236         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1237 }
1238
1239 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1240 {
1241         struct drbd_request *req = container_of(w, struct drbd_request, w);
1242         int ok;
1243
1244         if (unlikely(cancel)) {
1245                 req_mod(req, send_canceled);
1246                 return 1;
1247         }
1248
1249         ok = drbd_send_oos(mdev, req);
1250         req_mod(req, oos_handed_to_network);
1251
1252         return ok;
1253 }
1254
1255 /**
1256  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1257  * @mdev:       DRBD device.
1258  * @w:          work object.
1259  * @cancel:     The connection will be closed anyways
1260  */
1261 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1262 {
1263         struct drbd_request *req = container_of(w, struct drbd_request, w);
1264         int ok;
1265
1266         if (unlikely(cancel)) {
1267                 req_mod(req, send_canceled);
1268                 return 1;
1269         }
1270
1271         ok = drbd_send_dblock(mdev, req);
1272         req_mod(req, ok ? handed_over_to_network : send_failed);
1273
1274         return ok;
1275 }
1276
1277 /**
1278  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1279  * @mdev:       DRBD device.
1280  * @w:          work object.
1281  * @cancel:     The connection will be closed anyways
1282  */
1283 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1284 {
1285         struct drbd_request *req = container_of(w, struct drbd_request, w);
1286         int ok;
1287
1288         if (unlikely(cancel)) {
1289                 req_mod(req, send_canceled);
1290                 return 1;
1291         }
1292
1293         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1294                                 (unsigned long)req);
1295
1296         if (!ok) {
1297                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1298                  * so this is probably redundant */
1299                 if (mdev->state.conn >= C_CONNECTED)
1300                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1301         }
1302         req_mod(req, ok ? handed_over_to_network : send_failed);
1303
1304         return ok;
1305 }
1306
1307 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1308 {
1309         struct drbd_request *req = container_of(w, struct drbd_request, w);
1310
1311         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1312                 drbd_al_begin_io(mdev, req->sector);
1313         /* Calling drbd_al_begin_io() out of the worker might deadlocks
1314            theoretically. Practically it can not deadlock, since this is
1315            only used when unfreezing IOs. All the extents of the requests
1316            that made it into the TL are already active */
1317
1318         drbd_req_make_private_bio(req, req->master_bio);
1319         req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1320         generic_make_request(req->private_bio);
1321
1322         return 1;
1323 }
1324
1325 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1326 {
1327         struct drbd_conf *odev = mdev;
1328
1329         while (1) {
1330                 if (odev->sync_conf.after == -1)
1331                         return 1;
1332                 odev = minor_to_mdev(odev->sync_conf.after);
1333                 ERR_IF(!odev) return 1;
1334                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1335                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1336                     odev->state.aftr_isp || odev->state.peer_isp ||
1337                     odev->state.user_isp)
1338                         return 0;
1339         }
1340 }
1341
1342 /**
1343  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1344  * @mdev:       DRBD device.
1345  *
1346  * Called from process context only (admin command and after_state_ch).
1347  */
1348 static int _drbd_pause_after(struct drbd_conf *mdev)
1349 {
1350         struct drbd_conf *odev;
1351         int i, rv = 0;
1352
1353         for (i = 0; i < minor_count; i++) {
1354                 odev = minor_to_mdev(i);
1355                 if (!odev)
1356                         continue;
1357                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1358                         continue;
1359                 if (!_drbd_may_sync_now(odev))
1360                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1361                                != SS_NOTHING_TO_DO);
1362         }
1363
1364         return rv;
1365 }
1366
1367 /**
1368  * _drbd_resume_next() - Resume resync on all devices that may resync now
1369  * @mdev:       DRBD device.
1370  *
1371  * Called from process context only (admin command and worker).
1372  */
1373 static int _drbd_resume_next(struct drbd_conf *mdev)
1374 {
1375         struct drbd_conf *odev;
1376         int i, rv = 0;
1377
1378         for (i = 0; i < minor_count; i++) {
1379                 odev = minor_to_mdev(i);
1380                 if (!odev)
1381                         continue;
1382                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1383                         continue;
1384                 if (odev->state.aftr_isp) {
1385                         if (_drbd_may_sync_now(odev))
1386                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1387                                                         CS_HARD, NULL)
1388                                        != SS_NOTHING_TO_DO) ;
1389                 }
1390         }
1391         return rv;
1392 }
1393
1394 void resume_next_sg(struct drbd_conf *mdev)
1395 {
1396         write_lock_irq(&global_state_lock);
1397         _drbd_resume_next(mdev);
1398         write_unlock_irq(&global_state_lock);
1399 }
1400
1401 void suspend_other_sg(struct drbd_conf *mdev)
1402 {
1403         write_lock_irq(&global_state_lock);
1404         _drbd_pause_after(mdev);
1405         write_unlock_irq(&global_state_lock);
1406 }
1407
1408 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1409 {
1410         struct drbd_conf *odev;
1411
1412         if (o_minor == -1)
1413                 return NO_ERROR;
1414         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1415                 return ERR_SYNC_AFTER;
1416
1417         /* check for loops */
1418         odev = minor_to_mdev(o_minor);
1419         while (1) {
1420                 if (odev == mdev)
1421                         return ERR_SYNC_AFTER_CYCLE;
1422
1423                 /* dependency chain ends here, no cycles. */
1424                 if (odev->sync_conf.after == -1)
1425                         return NO_ERROR;
1426
1427                 /* follow the dependency chain */
1428                 odev = minor_to_mdev(odev->sync_conf.after);
1429         }
1430 }
1431
1432 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1433 {
1434         int changes;
1435         int retcode;
1436
1437         write_lock_irq(&global_state_lock);
1438         retcode = sync_after_error(mdev, na);
1439         if (retcode == NO_ERROR) {
1440                 mdev->sync_conf.after = na;
1441                 do {
1442                         changes  = _drbd_pause_after(mdev);
1443                         changes |= _drbd_resume_next(mdev);
1444                 } while (changes);
1445         }
1446         write_unlock_irq(&global_state_lock);
1447         return retcode;
1448 }
1449
1450 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1451 {
1452         atomic_set(&mdev->rs_sect_in, 0);
1453         atomic_set(&mdev->rs_sect_ev, 0);
1454         mdev->rs_in_flight = 0;
1455         mdev->rs_planed = 0;
1456         spin_lock(&mdev->peer_seq_lock);
1457         fifo_set(&mdev->rs_plan_s, 0);
1458         spin_unlock(&mdev->peer_seq_lock);
1459 }
1460
1461 /**
1462  * drbd_start_resync() - Start the resync process
1463  * @mdev:       DRBD device.
1464  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1465  *
1466  * This function might bring you directly into one of the
1467  * C_PAUSED_SYNC_* states.
1468  */
1469 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1470 {
1471         union drbd_state ns;
1472         int r;
1473
1474         if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1475                 dev_err(DEV, "Resync already running!\n");
1476                 return;
1477         }
1478
1479         if (mdev->state.conn < C_AHEAD) {
1480                 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1481                 drbd_rs_cancel_all(mdev);
1482                 /* This should be done when we abort the resync. We definitely do not
1483                    want to have this for connections going back and forth between
1484                    Ahead/Behind and SyncSource/SyncTarget */
1485         }
1486
1487         if (side == C_SYNC_TARGET) {
1488                 /* Since application IO was locked out during C_WF_BITMAP_T and
1489                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1490                    we check that we might make the data inconsistent. */
1491                 r = drbd_khelper(mdev, "before-resync-target");
1492                 r = (r >> 8) & 0xff;
1493                 if (r > 0) {
1494                         dev_info(DEV, "before-resync-target handler returned %d, "
1495                              "dropping connection.\n", r);
1496                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1497                         return;
1498                 }
1499         } else /* C_SYNC_SOURCE */ {
1500                 r = drbd_khelper(mdev, "before-resync-source");
1501                 r = (r >> 8) & 0xff;
1502                 if (r > 0) {
1503                         if (r == 3) {
1504                                 dev_info(DEV, "before-resync-source handler returned %d, "
1505                                          "ignoring. Old userland tools?", r);
1506                         } else {
1507                                 dev_info(DEV, "before-resync-source handler returned %d, "
1508                                          "dropping connection.\n", r);
1509                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1510                                 return;
1511                         }
1512                 }
1513         }
1514
1515         drbd_state_lock(mdev);
1516
1517         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1518                 drbd_state_unlock(mdev);
1519                 return;
1520         }
1521
1522         write_lock_irq(&global_state_lock);
1523         ns = mdev->state;
1524
1525         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1526
1527         ns.conn = side;
1528
1529         if (side == C_SYNC_TARGET)
1530                 ns.disk = D_INCONSISTENT;
1531         else /* side == C_SYNC_SOURCE */
1532                 ns.pdsk = D_INCONSISTENT;
1533
1534         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1535         ns = mdev->state;
1536
1537         if (ns.conn < C_CONNECTED)
1538                 r = SS_UNKNOWN_ERROR;
1539
1540         if (r == SS_SUCCESS) {
1541                 unsigned long tw = drbd_bm_total_weight(mdev);
1542                 unsigned long now = jiffies;
1543                 int i;
1544
1545                 mdev->rs_failed    = 0;
1546                 mdev->rs_paused    = 0;
1547                 mdev->rs_same_csum = 0;
1548                 mdev->rs_last_events = 0;
1549                 mdev->rs_last_sect_ev = 0;
1550                 mdev->rs_total     = tw;
1551                 mdev->rs_start     = now;
1552                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1553                         mdev->rs_mark_left[i] = tw;
1554                         mdev->rs_mark_time[i] = now;
1555                 }
1556                 _drbd_pause_after(mdev);
1557         }
1558         write_unlock_irq(&global_state_lock);
1559
1560         if (side == C_SYNC_TARGET)
1561                 mdev->bm_resync_fo = 0;
1562
1563         /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1564          * with w_send_oos, or the sync target will get confused as to
1565          * how much bits to resync.  We cannot do that always, because for an
1566          * empty resync and protocol < 95, we need to do it here, as we call
1567          * drbd_resync_finished from here in that case.
1568          * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1569          * and from after_state_ch otherwise. */
1570         if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1571                 drbd_gen_and_send_sync_uuid(mdev);
1572
1573         if (r == SS_SUCCESS) {
1574                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1575                      drbd_conn_str(ns.conn),
1576                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1577                      (unsigned long) mdev->rs_total);
1578
1579                 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1580                         /* This still has a race (about when exactly the peers
1581                          * detect connection loss) that can lead to a full sync
1582                          * on next handshake. In 8.3.9 we fixed this with explicit
1583                          * resync-finished notifications, but the fix
1584                          * introduces a protocol change.  Sleeping for some
1585                          * time longer than the ping interval + timeout on the
1586                          * SyncSource, to give the SyncTarget the chance to
1587                          * detect connection loss, then waiting for a ping
1588                          * response (implicit in drbd_resync_finished) reduces
1589                          * the race considerably, but does not solve it. */
1590                         if (side == C_SYNC_SOURCE)
1591                                 schedule_timeout_interruptible(
1592                                         mdev->net_conf->ping_int * HZ +
1593                                         mdev->net_conf->ping_timeo*HZ/9);
1594                         drbd_resync_finished(mdev);
1595                 }
1596
1597                 drbd_rs_controller_reset(mdev);
1598                 /* ns.conn may already be != mdev->state.conn,
1599                  * we may have been paused in between, or become paused until
1600                  * the timer triggers.
1601                  * No matter, that is handled in resync_timer_fn() */
1602                 if (ns.conn == C_SYNC_TARGET)
1603                         mod_timer(&mdev->resync_timer, jiffies);
1604
1605                 drbd_md_sync(mdev);
1606         }
1607         put_ldev(mdev);
1608         drbd_state_unlock(mdev);
1609 }
1610
1611 int drbd_worker(struct drbd_thread *thi)
1612 {
1613         struct drbd_conf *mdev = thi->mdev;
1614         struct drbd_work *w = NULL;
1615         LIST_HEAD(work_list);
1616         int intr = 0, i;
1617
1618         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1619
1620         while (get_t_state(thi) == Running) {
1621                 drbd_thread_current_set_cpu(mdev);
1622
1623                 if (down_trylock(&mdev->data.work.s)) {
1624                         mutex_lock(&mdev->data.mutex);
1625                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1626                                 drbd_tcp_uncork(mdev->data.socket);
1627                         mutex_unlock(&mdev->data.mutex);
1628
1629                         intr = down_interruptible(&mdev->data.work.s);
1630
1631                         mutex_lock(&mdev->data.mutex);
1632                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1633                                 drbd_tcp_cork(mdev->data.socket);
1634                         mutex_unlock(&mdev->data.mutex);
1635                 }
1636
1637                 if (intr) {
1638                         D_ASSERT(intr == -EINTR);
1639                         flush_signals(current);
1640                         ERR_IF (get_t_state(thi) == Running)
1641                                 continue;
1642                         break;
1643                 }
1644
1645                 if (get_t_state(thi) != Running)
1646                         break;
1647                 /* With this break, we have done a down() but not consumed
1648                    the entry from the list. The cleanup code takes care of
1649                    this...   */
1650
1651                 w = NULL;
1652                 spin_lock_irq(&mdev->data.work.q_lock);
1653                 ERR_IF(list_empty(&mdev->data.work.q)) {
1654                         /* something terribly wrong in our logic.
1655                          * we were able to down() the semaphore,
1656                          * but the list is empty... doh.
1657                          *
1658                          * what is the best thing to do now?
1659                          * try again from scratch, restarting the receiver,
1660                          * asender, whatnot? could break even more ugly,
1661                          * e.g. when we are primary, but no good local data.
1662                          *
1663                          * I'll try to get away just starting over this loop.
1664                          */
1665                         spin_unlock_irq(&mdev->data.work.q_lock);
1666                         continue;
1667                 }
1668                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1669                 list_del_init(&w->list);
1670                 spin_unlock_irq(&mdev->data.work.q_lock);
1671
1672                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1673                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1674                         if (mdev->state.conn >= C_CONNECTED)
1675                                 drbd_force_state(mdev,
1676                                                 NS(conn, C_NETWORK_FAILURE));
1677                 }
1678         }
1679         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1680         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1681
1682         spin_lock_irq(&mdev->data.work.q_lock);
1683         i = 0;
1684         while (!list_empty(&mdev->data.work.q)) {
1685                 list_splice_init(&mdev->data.work.q, &work_list);
1686                 spin_unlock_irq(&mdev->data.work.q_lock);
1687
1688                 while (!list_empty(&work_list)) {
1689                         w = list_entry(work_list.next, struct drbd_work, list);
1690                         list_del_init(&w->list);
1691                         w->cb(mdev, w, 1);
1692                         i++; /* dead debugging code */
1693                 }
1694
1695                 spin_lock_irq(&mdev->data.work.q_lock);
1696         }
1697         sema_init(&mdev->data.work.s, 0);
1698         /* DANGEROUS race: if someone did queue his work within the spinlock,
1699          * but up() ed outside the spinlock, we could get an up() on the
1700          * semaphore without corresponding list entry.
1701          * So don't do that.
1702          */
1703         spin_unlock_irq(&mdev->data.work.q_lock);
1704
1705         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1706         /* _drbd_set_state only uses stop_nowait.
1707          * wait here for the Exiting receiver. */
1708         drbd_thread_stop(&mdev->receiver);
1709         drbd_mdev_cleanup(mdev);
1710
1711         dev_info(DEV, "worker terminated\n");
1712
1713         clear_bit(DEVICE_DYING, &mdev->flags);
1714         clear_bit(CONFIG_PENDING, &mdev->flags);
1715         wake_up(&mdev->state_wait);
1716
1717         return 0;
1718 }