drbd: introduce P_ZEROES (REQ_OP_WRITE_ZEROES on the "wire")
[linux-2.6-block.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63         struct drbd_device *device;
64
65         device = bio->bi_private;
66         device->md_io.error = blk_status_to_errno(bio->bi_status);
67
68         /* special case: drbd_md_read() during drbd_adm_attach() */
69         if (device->ldev)
70                 put_ldev(device);
71         bio_put(bio);
72
73         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74          * to timeout on the lower level device, and eventually detach from it.
75          * If this io completion runs after that timeout expired, this
76          * drbd_md_put_buffer() may allow us to finally try and re-attach.
77          * During normal operation, this only puts that extra reference
78          * down to 1 again.
79          * Make sure we first drop the reference, and only then signal
80          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81          * next drbd_md_sync_page_io(), that we trigger the
82          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
83          */
84         drbd_md_put_buffer(device);
85         device->md_io.done = 1;
86         wake_up(&device->misc_wait);
87 }
88
89 /* reads on behalf of the partner,
90  * "submitted" by the receiver
91  */
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
93 {
94         unsigned long flags = 0;
95         struct drbd_peer_device *peer_device = peer_req->peer_device;
96         struct drbd_device *device = peer_device->device;
97
98         spin_lock_irqsave(&device->resource->req_lock, flags);
99         device->read_cnt += peer_req->i.size >> 9;
100         list_del(&peer_req->w.list);
101         if (list_empty(&device->read_ee))
102                 wake_up(&device->ee_wait);
103         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
105         spin_unlock_irqrestore(&device->resource->req_lock, flags);
106
107         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108         put_ldev(device);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
114 {
115         unsigned long flags = 0;
116         struct drbd_peer_device *peer_device = peer_req->peer_device;
117         struct drbd_device *device = peer_device->device;
118         struct drbd_connection *connection = peer_device->connection;
119         struct drbd_interval i;
120         int do_wake;
121         u64 block_id;
122         int do_al_complete_io;
123
124         /* after we moved peer_req to done_ee,
125          * we may no longer access it,
126          * it may be freed/reused already!
127          * (as soon as we release the req_lock) */
128         i = peer_req->i;
129         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130         block_id = peer_req->block_id;
131         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
132
133         if (peer_req->flags & EE_WAS_ERROR) {
134                 /* In protocol != C, we usually do not send write acks.
135                  * In case of a write error, send the neg ack anyways. */
136                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137                         inc_unacked(device);
138                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
139         }
140
141         spin_lock_irqsave(&device->resource->req_lock, flags);
142         device->writ_cnt += peer_req->i.size >> 9;
143         list_move_tail(&peer_req->w.list, &device->done_ee);
144
145         /*
146          * Do not remove from the write_requests tree here: we did not send the
147          * Ack yet and did not wake possibly waiting conflicting requests.
148          * Removed from the tree from "drbd_process_done_ee" within the
149          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150          * _drbd_clear_done_ee.
151          */
152
153         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
154
155         /* FIXME do we want to detach for failed REQ_OP_DISCARD?
156          * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
157         if (peer_req->flags & EE_WAS_ERROR)
158                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
159
160         if (connection->cstate >= C_WF_REPORT_PARAMS) {
161                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163                         kref_put(&device->kref, drbd_destroy_device);
164         }
165         spin_unlock_irqrestore(&device->resource->req_lock, flags);
166
167         if (block_id == ID_SYNCER)
168                 drbd_rs_complete_io(device, i.sector);
169
170         if (do_wake)
171                 wake_up(&device->ee_wait);
172
173         if (do_al_complete_io)
174                 drbd_al_complete_io(device, &i);
175
176         put_ldev(device);
177 }
178
179 /* writes on behalf of the partner, or resync writes,
180  * "submitted" by the receiver.
181  */
182 void drbd_peer_request_endio(struct bio *bio)
183 {
184         struct drbd_peer_request *peer_req = bio->bi_private;
185         struct drbd_device *device = peer_req->peer_device->device;
186         bool is_write = bio_data_dir(bio) == WRITE;
187         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188                           bio_op(bio) == REQ_OP_DISCARD;
189
190         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191                 drbd_warn(device, "%s: error=%d s=%llus\n",
192                                 is_write ? (is_discard ? "discard" : "write")
193                                         : "read", bio->bi_status,
194                                 (unsigned long long)peer_req->i.sector);
195
196         if (bio->bi_status)
197                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
198
199         bio_put(bio); /* no need for the bio anymore */
200         if (atomic_dec_and_test(&peer_req->pending_bios)) {
201                 if (is_write)
202                         drbd_endio_write_sec_final(peer_req);
203                 else
204                         drbd_endio_read_sec_final(peer_req);
205         }
206 }
207
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
210 {
211         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212                 device->minor, device->resource->name, device->vnr);
213 }
214
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216  */
217 void drbd_request_endio(struct bio *bio)
218 {
219         unsigned long flags;
220         struct drbd_request *req = bio->bi_private;
221         struct drbd_device *device = req->device;
222         struct bio_and_error m;
223         enum drbd_req_event what;
224
225         /* If this request was aborted locally before,
226          * but now was completed "successfully",
227          * chances are that this caused arbitrary data corruption.
228          *
229          * "aborting" requests, or force-detaching the disk, is intended for
230          * completely blocked/hung local backing devices which do no longer
231          * complete requests at all, not even do error completions.  In this
232          * situation, usually a hard-reset and failover is the only way out.
233          *
234          * By "aborting", basically faking a local error-completion,
235          * we allow for a more graceful swichover by cleanly migrating services.
236          * Still the affected node has to be rebooted "soon".
237          *
238          * By completing these requests, we allow the upper layers to re-use
239          * the associated data pages.
240          *
241          * If later the local backing device "recovers", and now DMAs some data
242          * from disk into the original request pages, in the best case it will
243          * just put random data into unused pages; but typically it will corrupt
244          * meanwhile completely unrelated data, causing all sorts of damage.
245          *
246          * Which means delayed successful completion,
247          * especially for READ requests,
248          * is a reason to panic().
249          *
250          * We assume that a delayed *error* completion is OK,
251          * though we still will complain noisily about it.
252          */
253         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254                 if (__ratelimit(&drbd_ratelimit_state))
255                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
256
257                 if (!bio->bi_status)
258                         drbd_panic_after_delayed_completion_of_aborted_request(device);
259         }
260
261         /* to avoid recursion in __req_mod */
262         if (unlikely(bio->bi_status)) {
263                 switch (bio_op(bio)) {
264                 case REQ_OP_WRITE_ZEROES:
265                 case REQ_OP_DISCARD:
266                         if (bio->bi_status == BLK_STS_NOTSUPP)
267                                 what = DISCARD_COMPLETED_NOTSUPP;
268                         else
269                                 what = DISCARD_COMPLETED_WITH_ERROR;
270                         break;
271                 case REQ_OP_READ:
272                         if (bio->bi_opf & REQ_RAHEAD)
273                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
274                         else
275                                 what = READ_COMPLETED_WITH_ERROR;
276                         break;
277                 default:
278                         what = WRITE_COMPLETED_WITH_ERROR;
279                         break;
280                 }
281         } else {
282                 what = COMPLETED_OK;
283         }
284
285         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
286         bio_put(bio);
287
288         /* not req_mod(), we need irqsave here! */
289         spin_lock_irqsave(&device->resource->req_lock, flags);
290         __req_mod(req, what, &m);
291         spin_unlock_irqrestore(&device->resource->req_lock, flags);
292         put_ldev(device);
293
294         if (m.bio)
295                 complete_master_bio(device, &m);
296 }
297
298 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
299 {
300         SHASH_DESC_ON_STACK(desc, tfm);
301         struct page *page = peer_req->pages;
302         struct page *tmp;
303         unsigned len;
304         void *src;
305
306         desc->tfm = tfm;
307         desc->flags = 0;
308
309         crypto_shash_init(desc);
310
311         src = kmap_atomic(page);
312         while ((tmp = page_chain_next(page))) {
313                 /* all but the last page will be fully used */
314                 crypto_shash_update(desc, src, PAGE_SIZE);
315                 kunmap_atomic(src);
316                 page = tmp;
317                 src = kmap_atomic(page);
318         }
319         /* and now the last, possibly only partially used page */
320         len = peer_req->i.size & (PAGE_SIZE - 1);
321         crypto_shash_update(desc, src, len ?: PAGE_SIZE);
322         kunmap_atomic(src);
323
324         crypto_shash_final(desc, digest);
325         shash_desc_zero(desc);
326 }
327
328 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
329 {
330         SHASH_DESC_ON_STACK(desc, tfm);
331         struct bio_vec bvec;
332         struct bvec_iter iter;
333
334         desc->tfm = tfm;
335         desc->flags = 0;
336
337         crypto_shash_init(desc);
338
339         bio_for_each_segment(bvec, bio, iter) {
340                 u8 *src;
341
342                 src = kmap_atomic(bvec.bv_page);
343                 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
344                 kunmap_atomic(src);
345
346                 /* REQ_OP_WRITE_SAME has only one segment,
347                  * checksum the payload only once. */
348                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
349                         break;
350         }
351         crypto_shash_final(desc, digest);
352         shash_desc_zero(desc);
353 }
354
355 /* MAYBE merge common code with w_e_end_ov_req */
356 static int w_e_send_csum(struct drbd_work *w, int cancel)
357 {
358         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
359         struct drbd_peer_device *peer_device = peer_req->peer_device;
360         struct drbd_device *device = peer_device->device;
361         int digest_size;
362         void *digest;
363         int err = 0;
364
365         if (unlikely(cancel))
366                 goto out;
367
368         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
369                 goto out;
370
371         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
372         digest = kmalloc(digest_size, GFP_NOIO);
373         if (digest) {
374                 sector_t sector = peer_req->i.sector;
375                 unsigned int size = peer_req->i.size;
376                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
377                 /* Free peer_req and pages before send.
378                  * In case we block on congestion, we could otherwise run into
379                  * some distributed deadlock, if the other side blocks on
380                  * congestion as well, because our receiver blocks in
381                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
382                 drbd_free_peer_req(device, peer_req);
383                 peer_req = NULL;
384                 inc_rs_pending(device);
385                 err = drbd_send_drequest_csum(peer_device, sector, size,
386                                               digest, digest_size,
387                                               P_CSUM_RS_REQUEST);
388                 kfree(digest);
389         } else {
390                 drbd_err(device, "kmalloc() of digest failed.\n");
391                 err = -ENOMEM;
392         }
393
394 out:
395         if (peer_req)
396                 drbd_free_peer_req(device, peer_req);
397
398         if (unlikely(err))
399                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
400         return err;
401 }
402
403 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
404
405 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
406 {
407         struct drbd_device *device = peer_device->device;
408         struct drbd_peer_request *peer_req;
409
410         if (!get_ldev(device))
411                 return -EIO;
412
413         /* GFP_TRY, because if there is no memory available right now, this may
414          * be rescheduled for later. It is "only" background resync, after all. */
415         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
416                                        size, size, GFP_TRY);
417         if (!peer_req)
418                 goto defer;
419
420         peer_req->w.cb = w_e_send_csum;
421         spin_lock_irq(&device->resource->req_lock);
422         list_add_tail(&peer_req->w.list, &device->read_ee);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         atomic_add(size >> 9, &device->rs_sect_ev);
426         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
427                                      DRBD_FAULT_RS_RD) == 0)
428                 return 0;
429
430         /* If it failed because of ENOMEM, retry should help.  If it failed
431          * because bio_add_page failed (probably broken lower level driver),
432          * retry may or may not help.
433          * If it does not, you may need to force disconnect. */
434         spin_lock_irq(&device->resource->req_lock);
435         list_del(&peer_req->w.list);
436         spin_unlock_irq(&device->resource->req_lock);
437
438         drbd_free_peer_req(device, peer_req);
439 defer:
440         put_ldev(device);
441         return -EAGAIN;
442 }
443
444 int w_resync_timer(struct drbd_work *w, int cancel)
445 {
446         struct drbd_device *device =
447                 container_of(w, struct drbd_device, resync_work);
448
449         switch (device->state.conn) {
450         case C_VERIFY_S:
451                 make_ov_request(device, cancel);
452                 break;
453         case C_SYNC_TARGET:
454                 make_resync_request(device, cancel);
455                 break;
456         }
457
458         return 0;
459 }
460
461 void resync_timer_fn(struct timer_list *t)
462 {
463         struct drbd_device *device = from_timer(device, t, resync_timer);
464
465         drbd_queue_work_if_unqueued(
466                 &first_peer_device(device)->connection->sender_work,
467                 &device->resync_work);
468 }
469
470 static void fifo_set(struct fifo_buffer *fb, int value)
471 {
472         int i;
473
474         for (i = 0; i < fb->size; i++)
475                 fb->values[i] = value;
476 }
477
478 static int fifo_push(struct fifo_buffer *fb, int value)
479 {
480         int ov;
481
482         ov = fb->values[fb->head_index];
483         fb->values[fb->head_index++] = value;
484
485         if (fb->head_index >= fb->size)
486                 fb->head_index = 0;
487
488         return ov;
489 }
490
491 static void fifo_add_val(struct fifo_buffer *fb, int value)
492 {
493         int i;
494
495         for (i = 0; i < fb->size; i++)
496                 fb->values[i] += value;
497 }
498
499 struct fifo_buffer *fifo_alloc(int fifo_size)
500 {
501         struct fifo_buffer *fb;
502
503         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
504         if (!fb)
505                 return NULL;
506
507         fb->head_index = 0;
508         fb->size = fifo_size;
509         fb->total = 0;
510
511         return fb;
512 }
513
514 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
515 {
516         struct disk_conf *dc;
517         unsigned int want;     /* The number of sectors we want in-flight */
518         int req_sect; /* Number of sectors to request in this turn */
519         int correction; /* Number of sectors more we need in-flight */
520         int cps; /* correction per invocation of drbd_rs_controller() */
521         int steps; /* Number of time steps to plan ahead */
522         int curr_corr;
523         int max_sect;
524         struct fifo_buffer *plan;
525
526         dc = rcu_dereference(device->ldev->disk_conf);
527         plan = rcu_dereference(device->rs_plan_s);
528
529         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
530
531         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
532                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
533         } else { /* normal path */
534                 want = dc->c_fill_target ? dc->c_fill_target :
535                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
536         }
537
538         correction = want - device->rs_in_flight - plan->total;
539
540         /* Plan ahead */
541         cps = correction / steps;
542         fifo_add_val(plan, cps);
543         plan->total += cps * steps;
544
545         /* What we do in this step */
546         curr_corr = fifo_push(plan, 0);
547         plan->total -= curr_corr;
548
549         req_sect = sect_in + curr_corr;
550         if (req_sect < 0)
551                 req_sect = 0;
552
553         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
554         if (req_sect > max_sect)
555                 req_sect = max_sect;
556
557         /*
558         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
559                  sect_in, device->rs_in_flight, want, correction,
560                  steps, cps, device->rs_planed, curr_corr, req_sect);
561         */
562
563         return req_sect;
564 }
565
566 static int drbd_rs_number_requests(struct drbd_device *device)
567 {
568         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
569         int number, mxb;
570
571         sect_in = atomic_xchg(&device->rs_sect_in, 0);
572         device->rs_in_flight -= sect_in;
573
574         rcu_read_lock();
575         mxb = drbd_get_max_buffers(device) / 2;
576         if (rcu_dereference(device->rs_plan_s)->size) {
577                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
578                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
579         } else {
580                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
581                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
582         }
583         rcu_read_unlock();
584
585         /* Don't have more than "max-buffers"/2 in-flight.
586          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
587          * potentially causing a distributed deadlock on congestion during
588          * online-verify or (checksum-based) resync, if max-buffers,
589          * socket buffer sizes and resync rate settings are mis-configured. */
590
591         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
592          * mxb (as used here, and in drbd_alloc_pages on the peer) is
593          * "number of pages" (typically also 4k),
594          * but "rs_in_flight" is in "sectors" (512 Byte). */
595         if (mxb - device->rs_in_flight/8 < number)
596                 number = mxb - device->rs_in_flight/8;
597
598         return number;
599 }
600
601 static int make_resync_request(struct drbd_device *const device, int cancel)
602 {
603         struct drbd_peer_device *const peer_device = first_peer_device(device);
604         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
605         unsigned long bit;
606         sector_t sector;
607         const sector_t capacity = drbd_get_capacity(device->this_bdev);
608         int max_bio_size;
609         int number, rollback_i, size;
610         int align, requeue = 0;
611         int i = 0;
612         int discard_granularity = 0;
613
614         if (unlikely(cancel))
615                 return 0;
616
617         if (device->rs_total == 0) {
618                 /* empty resync? */
619                 drbd_resync_finished(device);
620                 return 0;
621         }
622
623         if (!get_ldev(device)) {
624                 /* Since we only need to access device->rsync a
625                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
626                    to continue resync with a broken disk makes no sense at
627                    all */
628                 drbd_err(device, "Disk broke down during resync!\n");
629                 return 0;
630         }
631
632         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
633                 rcu_read_lock();
634                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
635                 rcu_read_unlock();
636         }
637
638         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
639         number = drbd_rs_number_requests(device);
640         if (number <= 0)
641                 goto requeue;
642
643         for (i = 0; i < number; i++) {
644                 /* Stop generating RS requests when half of the send buffer is filled,
645                  * but notify TCP that we'd like to have more space. */
646                 mutex_lock(&connection->data.mutex);
647                 if (connection->data.socket) {
648                         struct sock *sk = connection->data.socket->sk;
649                         int queued = sk->sk_wmem_queued;
650                         int sndbuf = sk->sk_sndbuf;
651                         if (queued > sndbuf / 2) {
652                                 requeue = 1;
653                                 if (sk->sk_socket)
654                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
655                         }
656                 } else
657                         requeue = 1;
658                 mutex_unlock(&connection->data.mutex);
659                 if (requeue)
660                         goto requeue;
661
662 next_sector:
663                 size = BM_BLOCK_SIZE;
664                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
665
666                 if (bit == DRBD_END_OF_BITMAP) {
667                         device->bm_resync_fo = drbd_bm_bits(device);
668                         put_ldev(device);
669                         return 0;
670                 }
671
672                 sector = BM_BIT_TO_SECT(bit);
673
674                 if (drbd_try_rs_begin_io(device, sector)) {
675                         device->bm_resync_fo = bit;
676                         goto requeue;
677                 }
678                 device->bm_resync_fo = bit + 1;
679
680                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
681                         drbd_rs_complete_io(device, sector);
682                         goto next_sector;
683                 }
684
685 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
686                 /* try to find some adjacent bits.
687                  * we stop if we have already the maximum req size.
688                  *
689                  * Additionally always align bigger requests, in order to
690                  * be prepared for all stripe sizes of software RAIDs.
691                  */
692                 align = 1;
693                 rollback_i = i;
694                 while (i < number) {
695                         if (size + BM_BLOCK_SIZE > max_bio_size)
696                                 break;
697
698                         /* Be always aligned */
699                         if (sector & ((1<<(align+3))-1))
700                                 break;
701
702                         if (discard_granularity && size == discard_granularity)
703                                 break;
704
705                         /* do not cross extent boundaries */
706                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
707                                 break;
708                         /* now, is it actually dirty, after all?
709                          * caution, drbd_bm_test_bit is tri-state for some
710                          * obscure reason; ( b == 0 ) would get the out-of-band
711                          * only accidentally right because of the "oddly sized"
712                          * adjustment below */
713                         if (drbd_bm_test_bit(device, bit+1) != 1)
714                                 break;
715                         bit++;
716                         size += BM_BLOCK_SIZE;
717                         if ((BM_BLOCK_SIZE << align) <= size)
718                                 align++;
719                         i++;
720                 }
721                 /* if we merged some,
722                  * reset the offset to start the next drbd_bm_find_next from */
723                 if (size > BM_BLOCK_SIZE)
724                         device->bm_resync_fo = bit + 1;
725 #endif
726
727                 /* adjust very last sectors, in case we are oddly sized */
728                 if (sector + (size>>9) > capacity)
729                         size = (capacity-sector)<<9;
730
731                 if (device->use_csums) {
732                         switch (read_for_csum(peer_device, sector, size)) {
733                         case -EIO: /* Disk failure */
734                                 put_ldev(device);
735                                 return -EIO;
736                         case -EAGAIN: /* allocation failed, or ldev busy */
737                                 drbd_rs_complete_io(device, sector);
738                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
739                                 i = rollback_i;
740                                 goto requeue;
741                         case 0:
742                                 /* everything ok */
743                                 break;
744                         default:
745                                 BUG();
746                         }
747                 } else {
748                         int err;
749
750                         inc_rs_pending(device);
751                         err = drbd_send_drequest(peer_device,
752                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
753                                                  sector, size, ID_SYNCER);
754                         if (err) {
755                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
756                                 dec_rs_pending(device);
757                                 put_ldev(device);
758                                 return err;
759                         }
760                 }
761         }
762
763         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
764                 /* last syncer _request_ was sent,
765                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
766                  * next sync group will resume), as soon as we receive the last
767                  * resync data block, and the last bit is cleared.
768                  * until then resync "work" is "inactive" ...
769                  */
770                 put_ldev(device);
771                 return 0;
772         }
773
774  requeue:
775         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
776         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
777         put_ldev(device);
778         return 0;
779 }
780
781 static int make_ov_request(struct drbd_device *device, int cancel)
782 {
783         int number, i, size;
784         sector_t sector;
785         const sector_t capacity = drbd_get_capacity(device->this_bdev);
786         bool stop_sector_reached = false;
787
788         if (unlikely(cancel))
789                 return 1;
790
791         number = drbd_rs_number_requests(device);
792
793         sector = device->ov_position;
794         for (i = 0; i < number; i++) {
795                 if (sector >= capacity)
796                         return 1;
797
798                 /* We check for "finished" only in the reply path:
799                  * w_e_end_ov_reply().
800                  * We need to send at least one request out. */
801                 stop_sector_reached = i > 0
802                         && verify_can_do_stop_sector(device)
803                         && sector >= device->ov_stop_sector;
804                 if (stop_sector_reached)
805                         break;
806
807                 size = BM_BLOCK_SIZE;
808
809                 if (drbd_try_rs_begin_io(device, sector)) {
810                         device->ov_position = sector;
811                         goto requeue;
812                 }
813
814                 if (sector + (size>>9) > capacity)
815                         size = (capacity-sector)<<9;
816
817                 inc_rs_pending(device);
818                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
819                         dec_rs_pending(device);
820                         return 0;
821                 }
822                 sector += BM_SECT_PER_BIT;
823         }
824         device->ov_position = sector;
825
826  requeue:
827         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
828         if (i == 0 || !stop_sector_reached)
829                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
830         return 1;
831 }
832
833 int w_ov_finished(struct drbd_work *w, int cancel)
834 {
835         struct drbd_device_work *dw =
836                 container_of(w, struct drbd_device_work, w);
837         struct drbd_device *device = dw->device;
838         kfree(dw);
839         ov_out_of_sync_print(device);
840         drbd_resync_finished(device);
841
842         return 0;
843 }
844
845 static int w_resync_finished(struct drbd_work *w, int cancel)
846 {
847         struct drbd_device_work *dw =
848                 container_of(w, struct drbd_device_work, w);
849         struct drbd_device *device = dw->device;
850         kfree(dw);
851
852         drbd_resync_finished(device);
853
854         return 0;
855 }
856
857 static void ping_peer(struct drbd_device *device)
858 {
859         struct drbd_connection *connection = first_peer_device(device)->connection;
860
861         clear_bit(GOT_PING_ACK, &connection->flags);
862         request_ping(connection);
863         wait_event(connection->ping_wait,
864                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
865 }
866
867 int drbd_resync_finished(struct drbd_device *device)
868 {
869         struct drbd_connection *connection = first_peer_device(device)->connection;
870         unsigned long db, dt, dbdt;
871         unsigned long n_oos;
872         union drbd_state os, ns;
873         struct drbd_device_work *dw;
874         char *khelper_cmd = NULL;
875         int verify_done = 0;
876
877         /* Remove all elements from the resync LRU. Since future actions
878          * might set bits in the (main) bitmap, then the entries in the
879          * resync LRU would be wrong. */
880         if (drbd_rs_del_all(device)) {
881                 /* In case this is not possible now, most probably because
882                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
883                  * queue (or even the read operations for those packets
884                  * is not finished by now).   Retry in 100ms. */
885
886                 schedule_timeout_interruptible(HZ / 10);
887                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
888                 if (dw) {
889                         dw->w.cb = w_resync_finished;
890                         dw->device = device;
891                         drbd_queue_work(&connection->sender_work, &dw->w);
892                         return 1;
893                 }
894                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
895         }
896
897         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
898         if (dt <= 0)
899                 dt = 1;
900
901         db = device->rs_total;
902         /* adjust for verify start and stop sectors, respective reached position */
903         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
904                 db -= device->ov_left;
905
906         dbdt = Bit2KB(db/dt);
907         device->rs_paused /= HZ;
908
909         if (!get_ldev(device))
910                 goto out;
911
912         ping_peer(device);
913
914         spin_lock_irq(&device->resource->req_lock);
915         os = drbd_read_state(device);
916
917         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
918
919         /* This protects us against multiple calls (that can happen in the presence
920            of application IO), and against connectivity loss just before we arrive here. */
921         if (os.conn <= C_CONNECTED)
922                 goto out_unlock;
923
924         ns = os;
925         ns.conn = C_CONNECTED;
926
927         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
928              verify_done ? "Online verify" : "Resync",
929              dt + device->rs_paused, device->rs_paused, dbdt);
930
931         n_oos = drbd_bm_total_weight(device);
932
933         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
934                 if (n_oos) {
935                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
936                               n_oos, Bit2KB(1));
937                         khelper_cmd = "out-of-sync";
938                 }
939         } else {
940                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
941
942                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
943                         khelper_cmd = "after-resync-target";
944
945                 if (device->use_csums && device->rs_total) {
946                         const unsigned long s = device->rs_same_csum;
947                         const unsigned long t = device->rs_total;
948                         const int ratio =
949                                 (t == 0)     ? 0 :
950                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
951                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
952                              "transferred %luK total %luK\n",
953                              ratio,
954                              Bit2KB(device->rs_same_csum),
955                              Bit2KB(device->rs_total - device->rs_same_csum),
956                              Bit2KB(device->rs_total));
957                 }
958         }
959
960         if (device->rs_failed) {
961                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
962
963                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
964                         ns.disk = D_INCONSISTENT;
965                         ns.pdsk = D_UP_TO_DATE;
966                 } else {
967                         ns.disk = D_UP_TO_DATE;
968                         ns.pdsk = D_INCONSISTENT;
969                 }
970         } else {
971                 ns.disk = D_UP_TO_DATE;
972                 ns.pdsk = D_UP_TO_DATE;
973
974                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
975                         if (device->p_uuid) {
976                                 int i;
977                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
978                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
979                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
980                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
981                         } else {
982                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
983                         }
984                 }
985
986                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
987                         /* for verify runs, we don't update uuids here,
988                          * so there would be nothing to report. */
989                         drbd_uuid_set_bm(device, 0UL);
990                         drbd_print_uuids(device, "updated UUIDs");
991                         if (device->p_uuid) {
992                                 /* Now the two UUID sets are equal, update what we
993                                  * know of the peer. */
994                                 int i;
995                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
996                                         device->p_uuid[i] = device->ldev->md.uuid[i];
997                         }
998                 }
999         }
1000
1001         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1002 out_unlock:
1003         spin_unlock_irq(&device->resource->req_lock);
1004
1005         /* If we have been sync source, and have an effective fencing-policy,
1006          * once *all* volumes are back in sync, call "unfence". */
1007         if (os.conn == C_SYNC_SOURCE) {
1008                 enum drbd_disk_state disk_state = D_MASK;
1009                 enum drbd_disk_state pdsk_state = D_MASK;
1010                 enum drbd_fencing_p fp = FP_DONT_CARE;
1011
1012                 rcu_read_lock();
1013                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1014                 if (fp != FP_DONT_CARE) {
1015                         struct drbd_peer_device *peer_device;
1016                         int vnr;
1017                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1018                                 struct drbd_device *device = peer_device->device;
1019                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1020                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1021                         }
1022                 }
1023                 rcu_read_unlock();
1024                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1025                         conn_khelper(connection, "unfence-peer");
1026         }
1027
1028         put_ldev(device);
1029 out:
1030         device->rs_total  = 0;
1031         device->rs_failed = 0;
1032         device->rs_paused = 0;
1033
1034         /* reset start sector, if we reached end of device */
1035         if (verify_done && device->ov_left == 0)
1036                 device->ov_start_sector = 0;
1037
1038         drbd_md_sync(device);
1039
1040         if (khelper_cmd)
1041                 drbd_khelper(device, khelper_cmd);
1042
1043         return 1;
1044 }
1045
1046 /* helper */
1047 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1048 {
1049         if (drbd_peer_req_has_active_page(peer_req)) {
1050                 /* This might happen if sendpage() has not finished */
1051                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1052                 atomic_add(i, &device->pp_in_use_by_net);
1053                 atomic_sub(i, &device->pp_in_use);
1054                 spin_lock_irq(&device->resource->req_lock);
1055                 list_add_tail(&peer_req->w.list, &device->net_ee);
1056                 spin_unlock_irq(&device->resource->req_lock);
1057                 wake_up(&drbd_pp_wait);
1058         } else
1059                 drbd_free_peer_req(device, peer_req);
1060 }
1061
1062 /**
1063  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1064  * @w:          work object.
1065  * @cancel:     The connection will be closed anyways
1066  */
1067 int w_e_end_data_req(struct drbd_work *w, int cancel)
1068 {
1069         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1070         struct drbd_peer_device *peer_device = peer_req->peer_device;
1071         struct drbd_device *device = peer_device->device;
1072         int err;
1073
1074         if (unlikely(cancel)) {
1075                 drbd_free_peer_req(device, peer_req);
1076                 dec_unacked(device);
1077                 return 0;
1078         }
1079
1080         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1081                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1082         } else {
1083                 if (__ratelimit(&drbd_ratelimit_state))
1084                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1085                             (unsigned long long)peer_req->i.sector);
1086
1087                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1088         }
1089
1090         dec_unacked(device);
1091
1092         move_to_net_ee_or_free(device, peer_req);
1093
1094         if (unlikely(err))
1095                 drbd_err(device, "drbd_send_block() failed\n");
1096         return err;
1097 }
1098
1099 static bool all_zero(struct drbd_peer_request *peer_req)
1100 {
1101         struct page *page = peer_req->pages;
1102         unsigned int len = peer_req->i.size;
1103
1104         page_chain_for_each(page) {
1105                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1106                 unsigned int i, words = l / sizeof(long);
1107                 unsigned long *d;
1108
1109                 d = kmap_atomic(page);
1110                 for (i = 0; i < words; i++) {
1111                         if (d[i]) {
1112                                 kunmap_atomic(d);
1113                                 return false;
1114                         }
1115                 }
1116                 kunmap_atomic(d);
1117                 len -= l;
1118         }
1119
1120         return true;
1121 }
1122
1123 /**
1124  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1125  * @w:          work object.
1126  * @cancel:     The connection will be closed anyways
1127  */
1128 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1129 {
1130         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1131         struct drbd_peer_device *peer_device = peer_req->peer_device;
1132         struct drbd_device *device = peer_device->device;
1133         int err;
1134
1135         if (unlikely(cancel)) {
1136                 drbd_free_peer_req(device, peer_req);
1137                 dec_unacked(device);
1138                 return 0;
1139         }
1140
1141         if (get_ldev_if_state(device, D_FAILED)) {
1142                 drbd_rs_complete_io(device, peer_req->i.sector);
1143                 put_ldev(device);
1144         }
1145
1146         if (device->state.conn == C_AHEAD) {
1147                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1148         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1149                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1150                         inc_rs_pending(device);
1151                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1152                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1153                         else
1154                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1155                 } else {
1156                         if (__ratelimit(&drbd_ratelimit_state))
1157                                 drbd_err(device, "Not sending RSDataReply, "
1158                                     "partner DISKLESS!\n");
1159                         err = 0;
1160                 }
1161         } else {
1162                 if (__ratelimit(&drbd_ratelimit_state))
1163                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1164                             (unsigned long long)peer_req->i.sector);
1165
1166                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1167
1168                 /* update resync data with failure */
1169                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1170         }
1171
1172         dec_unacked(device);
1173
1174         move_to_net_ee_or_free(device, peer_req);
1175
1176         if (unlikely(err))
1177                 drbd_err(device, "drbd_send_block() failed\n");
1178         return err;
1179 }
1180
1181 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1182 {
1183         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1184         struct drbd_peer_device *peer_device = peer_req->peer_device;
1185         struct drbd_device *device = peer_device->device;
1186         struct digest_info *di;
1187         int digest_size;
1188         void *digest = NULL;
1189         int err, eq = 0;
1190
1191         if (unlikely(cancel)) {
1192                 drbd_free_peer_req(device, peer_req);
1193                 dec_unacked(device);
1194                 return 0;
1195         }
1196
1197         if (get_ldev(device)) {
1198                 drbd_rs_complete_io(device, peer_req->i.sector);
1199                 put_ldev(device);
1200         }
1201
1202         di = peer_req->digest;
1203
1204         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1205                 /* quick hack to try to avoid a race against reconfiguration.
1206                  * a real fix would be much more involved,
1207                  * introducing more locking mechanisms */
1208                 if (peer_device->connection->csums_tfm) {
1209                         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1210                         D_ASSERT(device, digest_size == di->digest_size);
1211                         digest = kmalloc(digest_size, GFP_NOIO);
1212                 }
1213                 if (digest) {
1214                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1215                         eq = !memcmp(digest, di->digest, digest_size);
1216                         kfree(digest);
1217                 }
1218
1219                 if (eq) {
1220                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1221                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1222                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1223                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1224                 } else {
1225                         inc_rs_pending(device);
1226                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1227                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1228                         kfree(di);
1229                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1230                 }
1231         } else {
1232                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1233                 if (__ratelimit(&drbd_ratelimit_state))
1234                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1235         }
1236
1237         dec_unacked(device);
1238         move_to_net_ee_or_free(device, peer_req);
1239
1240         if (unlikely(err))
1241                 drbd_err(device, "drbd_send_block/ack() failed\n");
1242         return err;
1243 }
1244
1245 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1246 {
1247         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1248         struct drbd_peer_device *peer_device = peer_req->peer_device;
1249         struct drbd_device *device = peer_device->device;
1250         sector_t sector = peer_req->i.sector;
1251         unsigned int size = peer_req->i.size;
1252         int digest_size;
1253         void *digest;
1254         int err = 0;
1255
1256         if (unlikely(cancel))
1257                 goto out;
1258
1259         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1260         digest = kmalloc(digest_size, GFP_NOIO);
1261         if (!digest) {
1262                 err = 1;        /* terminate the connection in case the allocation failed */
1263                 goto out;
1264         }
1265
1266         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1267                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1268         else
1269                 memset(digest, 0, digest_size);
1270
1271         /* Free e and pages before send.
1272          * In case we block on congestion, we could otherwise run into
1273          * some distributed deadlock, if the other side blocks on
1274          * congestion as well, because our receiver blocks in
1275          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1276         drbd_free_peer_req(device, peer_req);
1277         peer_req = NULL;
1278         inc_rs_pending(device);
1279         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1280         if (err)
1281                 dec_rs_pending(device);
1282         kfree(digest);
1283
1284 out:
1285         if (peer_req)
1286                 drbd_free_peer_req(device, peer_req);
1287         dec_unacked(device);
1288         return err;
1289 }
1290
1291 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1292 {
1293         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1294                 device->ov_last_oos_size += size>>9;
1295         } else {
1296                 device->ov_last_oos_start = sector;
1297                 device->ov_last_oos_size = size>>9;
1298         }
1299         drbd_set_out_of_sync(device, sector, size);
1300 }
1301
1302 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1303 {
1304         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1305         struct drbd_peer_device *peer_device = peer_req->peer_device;
1306         struct drbd_device *device = peer_device->device;
1307         struct digest_info *di;
1308         void *digest;
1309         sector_t sector = peer_req->i.sector;
1310         unsigned int size = peer_req->i.size;
1311         int digest_size;
1312         int err, eq = 0;
1313         bool stop_sector_reached = false;
1314
1315         if (unlikely(cancel)) {
1316                 drbd_free_peer_req(device, peer_req);
1317                 dec_unacked(device);
1318                 return 0;
1319         }
1320
1321         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1322          * the resync lru has been cleaned up already */
1323         if (get_ldev(device)) {
1324                 drbd_rs_complete_io(device, peer_req->i.sector);
1325                 put_ldev(device);
1326         }
1327
1328         di = peer_req->digest;
1329
1330         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1331                 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1332                 digest = kmalloc(digest_size, GFP_NOIO);
1333                 if (digest) {
1334                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1335
1336                         D_ASSERT(device, digest_size == di->digest_size);
1337                         eq = !memcmp(digest, di->digest, digest_size);
1338                         kfree(digest);
1339                 }
1340         }
1341
1342         /* Free peer_req and pages before send.
1343          * In case we block on congestion, we could otherwise run into
1344          * some distributed deadlock, if the other side blocks on
1345          * congestion as well, because our receiver blocks in
1346          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1347         drbd_free_peer_req(device, peer_req);
1348         if (!eq)
1349                 drbd_ov_out_of_sync_found(device, sector, size);
1350         else
1351                 ov_out_of_sync_print(device);
1352
1353         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1354                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1355
1356         dec_unacked(device);
1357
1358         --device->ov_left;
1359
1360         /* let's advance progress step marks only for every other megabyte */
1361         if ((device->ov_left & 0x200) == 0x200)
1362                 drbd_advance_rs_marks(device, device->ov_left);
1363
1364         stop_sector_reached = verify_can_do_stop_sector(device) &&
1365                 (sector + (size>>9)) >= device->ov_stop_sector;
1366
1367         if (device->ov_left == 0 || stop_sector_reached) {
1368                 ov_out_of_sync_print(device);
1369                 drbd_resync_finished(device);
1370         }
1371
1372         return err;
1373 }
1374
1375 /* FIXME
1376  * We need to track the number of pending barrier acks,
1377  * and to be able to wait for them.
1378  * See also comment in drbd_adm_attach before drbd_suspend_io.
1379  */
1380 static int drbd_send_barrier(struct drbd_connection *connection)
1381 {
1382         struct p_barrier *p;
1383         struct drbd_socket *sock;
1384
1385         sock = &connection->data;
1386         p = conn_prepare_command(connection, sock);
1387         if (!p)
1388                 return -EIO;
1389         p->barrier = connection->send.current_epoch_nr;
1390         p->pad = 0;
1391         connection->send.current_epoch_writes = 0;
1392         connection->send.last_sent_barrier_jif = jiffies;
1393
1394         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1395 }
1396
1397 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1398 {
1399         struct drbd_socket *sock = &pd->connection->data;
1400         if (!drbd_prepare_command(pd, sock))
1401                 return -EIO;
1402         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1403 }
1404
1405 int w_send_write_hint(struct drbd_work *w, int cancel)
1406 {
1407         struct drbd_device *device =
1408                 container_of(w, struct drbd_device, unplug_work);
1409
1410         if (cancel)
1411                 return 0;
1412         return pd_send_unplug_remote(first_peer_device(device));
1413 }
1414
1415 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1416 {
1417         if (!connection->send.seen_any_write_yet) {
1418                 connection->send.seen_any_write_yet = true;
1419                 connection->send.current_epoch_nr = epoch;
1420                 connection->send.current_epoch_writes = 0;
1421                 connection->send.last_sent_barrier_jif = jiffies;
1422         }
1423 }
1424
1425 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1426 {
1427         /* re-init if first write on this connection */
1428         if (!connection->send.seen_any_write_yet)
1429                 return;
1430         if (connection->send.current_epoch_nr != epoch) {
1431                 if (connection->send.current_epoch_writes)
1432                         drbd_send_barrier(connection);
1433                 connection->send.current_epoch_nr = epoch;
1434         }
1435 }
1436
1437 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1438 {
1439         struct drbd_request *req = container_of(w, struct drbd_request, w);
1440         struct drbd_device *device = req->device;
1441         struct drbd_peer_device *const peer_device = first_peer_device(device);
1442         struct drbd_connection *const connection = peer_device->connection;
1443         int err;
1444
1445         if (unlikely(cancel)) {
1446                 req_mod(req, SEND_CANCELED);
1447                 return 0;
1448         }
1449         req->pre_send_jif = jiffies;
1450
1451         /* this time, no connection->send.current_epoch_writes++;
1452          * If it was sent, it was the closing barrier for the last
1453          * replicated epoch, before we went into AHEAD mode.
1454          * No more barriers will be sent, until we leave AHEAD mode again. */
1455         maybe_send_barrier(connection, req->epoch);
1456
1457         err = drbd_send_out_of_sync(peer_device, req);
1458         req_mod(req, OOS_HANDED_TO_NETWORK);
1459
1460         return err;
1461 }
1462
1463 /**
1464  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1465  * @w:          work object.
1466  * @cancel:     The connection will be closed anyways
1467  */
1468 int w_send_dblock(struct drbd_work *w, int cancel)
1469 {
1470         struct drbd_request *req = container_of(w, struct drbd_request, w);
1471         struct drbd_device *device = req->device;
1472         struct drbd_peer_device *const peer_device = first_peer_device(device);
1473         struct drbd_connection *connection = peer_device->connection;
1474         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1475         int err;
1476
1477         if (unlikely(cancel)) {
1478                 req_mod(req, SEND_CANCELED);
1479                 return 0;
1480         }
1481         req->pre_send_jif = jiffies;
1482
1483         re_init_if_first_write(connection, req->epoch);
1484         maybe_send_barrier(connection, req->epoch);
1485         connection->send.current_epoch_writes++;
1486
1487         err = drbd_send_dblock(peer_device, req);
1488         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1489
1490         if (do_send_unplug && !err)
1491                 pd_send_unplug_remote(peer_device);
1492
1493         return err;
1494 }
1495
1496 /**
1497  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1498  * @w:          work object.
1499  * @cancel:     The connection will be closed anyways
1500  */
1501 int w_send_read_req(struct drbd_work *w, int cancel)
1502 {
1503         struct drbd_request *req = container_of(w, struct drbd_request, w);
1504         struct drbd_device *device = req->device;
1505         struct drbd_peer_device *const peer_device = first_peer_device(device);
1506         struct drbd_connection *connection = peer_device->connection;
1507         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1508         int err;
1509
1510         if (unlikely(cancel)) {
1511                 req_mod(req, SEND_CANCELED);
1512                 return 0;
1513         }
1514         req->pre_send_jif = jiffies;
1515
1516         /* Even read requests may close a write epoch,
1517          * if there was any yet. */
1518         maybe_send_barrier(connection, req->epoch);
1519
1520         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1521                                  (unsigned long)req);
1522
1523         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1524
1525         if (do_send_unplug && !err)
1526                 pd_send_unplug_remote(peer_device);
1527
1528         return err;
1529 }
1530
1531 int w_restart_disk_io(struct drbd_work *w, int cancel)
1532 {
1533         struct drbd_request *req = container_of(w, struct drbd_request, w);
1534         struct drbd_device *device = req->device;
1535
1536         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1537                 drbd_al_begin_io(device, &req->i);
1538
1539         drbd_req_make_private_bio(req, req->master_bio);
1540         bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1541         generic_make_request(req->private_bio);
1542
1543         return 0;
1544 }
1545
1546 static int _drbd_may_sync_now(struct drbd_device *device)
1547 {
1548         struct drbd_device *odev = device;
1549         int resync_after;
1550
1551         while (1) {
1552                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1553                         return 1;
1554                 rcu_read_lock();
1555                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1556                 rcu_read_unlock();
1557                 if (resync_after == -1)
1558                         return 1;
1559                 odev = minor_to_device(resync_after);
1560                 if (!odev)
1561                         return 1;
1562                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1563                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1564                     odev->state.aftr_isp || odev->state.peer_isp ||
1565                     odev->state.user_isp)
1566                         return 0;
1567         }
1568 }
1569
1570 /**
1571  * drbd_pause_after() - Pause resync on all devices that may not resync now
1572  * @device:     DRBD device.
1573  *
1574  * Called from process context only (admin command and after_state_ch).
1575  */
1576 static bool drbd_pause_after(struct drbd_device *device)
1577 {
1578         bool changed = false;
1579         struct drbd_device *odev;
1580         int i;
1581
1582         rcu_read_lock();
1583         idr_for_each_entry(&drbd_devices, odev, i) {
1584                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1585                         continue;
1586                 if (!_drbd_may_sync_now(odev) &&
1587                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1588                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1589                         changed = true;
1590         }
1591         rcu_read_unlock();
1592
1593         return changed;
1594 }
1595
1596 /**
1597  * drbd_resume_next() - Resume resync on all devices that may resync now
1598  * @device:     DRBD device.
1599  *
1600  * Called from process context only (admin command and worker).
1601  */
1602 static bool drbd_resume_next(struct drbd_device *device)
1603 {
1604         bool changed = false;
1605         struct drbd_device *odev;
1606         int i;
1607
1608         rcu_read_lock();
1609         idr_for_each_entry(&drbd_devices, odev, i) {
1610                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1611                         continue;
1612                 if (odev->state.aftr_isp) {
1613                         if (_drbd_may_sync_now(odev) &&
1614                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1615                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1616                                 changed = true;
1617                 }
1618         }
1619         rcu_read_unlock();
1620         return changed;
1621 }
1622
1623 void resume_next_sg(struct drbd_device *device)
1624 {
1625         lock_all_resources();
1626         drbd_resume_next(device);
1627         unlock_all_resources();
1628 }
1629
1630 void suspend_other_sg(struct drbd_device *device)
1631 {
1632         lock_all_resources();
1633         drbd_pause_after(device);
1634         unlock_all_resources();
1635 }
1636
1637 /* caller must lock_all_resources() */
1638 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1639 {
1640         struct drbd_device *odev;
1641         int resync_after;
1642
1643         if (o_minor == -1)
1644                 return NO_ERROR;
1645         if (o_minor < -1 || o_minor > MINORMASK)
1646                 return ERR_RESYNC_AFTER;
1647
1648         /* check for loops */
1649         odev = minor_to_device(o_minor);
1650         while (1) {
1651                 if (odev == device)
1652                         return ERR_RESYNC_AFTER_CYCLE;
1653
1654                 /* You are free to depend on diskless, non-existing,
1655                  * or not yet/no longer existing minors.
1656                  * We only reject dependency loops.
1657                  * We cannot follow the dependency chain beyond a detached or
1658                  * missing minor.
1659                  */
1660                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1661                         return NO_ERROR;
1662
1663                 rcu_read_lock();
1664                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1665                 rcu_read_unlock();
1666                 /* dependency chain ends here, no cycles. */
1667                 if (resync_after == -1)
1668                         return NO_ERROR;
1669
1670                 /* follow the dependency chain */
1671                 odev = minor_to_device(resync_after);
1672         }
1673 }
1674
1675 /* caller must lock_all_resources() */
1676 void drbd_resync_after_changed(struct drbd_device *device)
1677 {
1678         int changed;
1679
1680         do {
1681                 changed  = drbd_pause_after(device);
1682                 changed |= drbd_resume_next(device);
1683         } while (changed);
1684 }
1685
1686 void drbd_rs_controller_reset(struct drbd_device *device)
1687 {
1688         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1689         struct fifo_buffer *plan;
1690
1691         atomic_set(&device->rs_sect_in, 0);
1692         atomic_set(&device->rs_sect_ev, 0);
1693         device->rs_in_flight = 0;
1694         device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1695
1696         /* Updating the RCU protected object in place is necessary since
1697            this function gets called from atomic context.
1698            It is valid since all other updates also lead to an completely
1699            empty fifo */
1700         rcu_read_lock();
1701         plan = rcu_dereference(device->rs_plan_s);
1702         plan->total = 0;
1703         fifo_set(plan, 0);
1704         rcu_read_unlock();
1705 }
1706
1707 void start_resync_timer_fn(struct timer_list *t)
1708 {
1709         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1710         drbd_device_post_work(device, RS_START);
1711 }
1712
1713 static void do_start_resync(struct drbd_device *device)
1714 {
1715         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1716                 drbd_warn(device, "postponing start_resync ...\n");
1717                 device->start_resync_timer.expires = jiffies + HZ/10;
1718                 add_timer(&device->start_resync_timer);
1719                 return;
1720         }
1721
1722         drbd_start_resync(device, C_SYNC_SOURCE);
1723         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1724 }
1725
1726 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1727 {
1728         bool csums_after_crash_only;
1729         rcu_read_lock();
1730         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1731         rcu_read_unlock();
1732         return connection->agreed_pro_version >= 89 &&          /* supported? */
1733                 connection->csums_tfm &&                        /* configured? */
1734                 (csums_after_crash_only == false                /* use for each resync? */
1735                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1736 }
1737
1738 /**
1739  * drbd_start_resync() - Start the resync process
1740  * @device:     DRBD device.
1741  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1742  *
1743  * This function might bring you directly into one of the
1744  * C_PAUSED_SYNC_* states.
1745  */
1746 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1747 {
1748         struct drbd_peer_device *peer_device = first_peer_device(device);
1749         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1750         union drbd_state ns;
1751         int r;
1752
1753         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1754                 drbd_err(device, "Resync already running!\n");
1755                 return;
1756         }
1757
1758         if (!connection) {
1759                 drbd_err(device, "No connection to peer, aborting!\n");
1760                 return;
1761         }
1762
1763         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1764                 if (side == C_SYNC_TARGET) {
1765                         /* Since application IO was locked out during C_WF_BITMAP_T and
1766                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1767                            we check that we might make the data inconsistent. */
1768                         r = drbd_khelper(device, "before-resync-target");
1769                         r = (r >> 8) & 0xff;
1770                         if (r > 0) {
1771                                 drbd_info(device, "before-resync-target handler returned %d, "
1772                                          "dropping connection.\n", r);
1773                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1774                                 return;
1775                         }
1776                 } else /* C_SYNC_SOURCE */ {
1777                         r = drbd_khelper(device, "before-resync-source");
1778                         r = (r >> 8) & 0xff;
1779                         if (r > 0) {
1780                                 if (r == 3) {
1781                                         drbd_info(device, "before-resync-source handler returned %d, "
1782                                                  "ignoring. Old userland tools?", r);
1783                                 } else {
1784                                         drbd_info(device, "before-resync-source handler returned %d, "
1785                                                  "dropping connection.\n", r);
1786                                         conn_request_state(connection,
1787                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1788                                         return;
1789                                 }
1790                         }
1791                 }
1792         }
1793
1794         if (current == connection->worker.task) {
1795                 /* The worker should not sleep waiting for state_mutex,
1796                    that can take long */
1797                 if (!mutex_trylock(device->state_mutex)) {
1798                         set_bit(B_RS_H_DONE, &device->flags);
1799                         device->start_resync_timer.expires = jiffies + HZ/5;
1800                         add_timer(&device->start_resync_timer);
1801                         return;
1802                 }
1803         } else {
1804                 mutex_lock(device->state_mutex);
1805         }
1806
1807         lock_all_resources();
1808         clear_bit(B_RS_H_DONE, &device->flags);
1809         /* Did some connection breakage or IO error race with us? */
1810         if (device->state.conn < C_CONNECTED
1811         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1812                 unlock_all_resources();
1813                 goto out;
1814         }
1815
1816         ns = drbd_read_state(device);
1817
1818         ns.aftr_isp = !_drbd_may_sync_now(device);
1819
1820         ns.conn = side;
1821
1822         if (side == C_SYNC_TARGET)
1823                 ns.disk = D_INCONSISTENT;
1824         else /* side == C_SYNC_SOURCE */
1825                 ns.pdsk = D_INCONSISTENT;
1826
1827         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1828         ns = drbd_read_state(device);
1829
1830         if (ns.conn < C_CONNECTED)
1831                 r = SS_UNKNOWN_ERROR;
1832
1833         if (r == SS_SUCCESS) {
1834                 unsigned long tw = drbd_bm_total_weight(device);
1835                 unsigned long now = jiffies;
1836                 int i;
1837
1838                 device->rs_failed    = 0;
1839                 device->rs_paused    = 0;
1840                 device->rs_same_csum = 0;
1841                 device->rs_last_sect_ev = 0;
1842                 device->rs_total     = tw;
1843                 device->rs_start     = now;
1844                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1845                         device->rs_mark_left[i] = tw;
1846                         device->rs_mark_time[i] = now;
1847                 }
1848                 drbd_pause_after(device);
1849                 /* Forget potentially stale cached per resync extent bit-counts.
1850                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1851                  * disabled, and know the disk state is ok. */
1852                 spin_lock(&device->al_lock);
1853                 lc_reset(device->resync);
1854                 device->resync_locked = 0;
1855                 device->resync_wenr = LC_FREE;
1856                 spin_unlock(&device->al_lock);
1857         }
1858         unlock_all_resources();
1859
1860         if (r == SS_SUCCESS) {
1861                 wake_up(&device->al_wait); /* for lc_reset() above */
1862                 /* reset rs_last_bcast when a resync or verify is started,
1863                  * to deal with potential jiffies wrap. */
1864                 device->rs_last_bcast = jiffies - HZ;
1865
1866                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1867                      drbd_conn_str(ns.conn),
1868                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1869                      (unsigned long) device->rs_total);
1870                 if (side == C_SYNC_TARGET) {
1871                         device->bm_resync_fo = 0;
1872                         device->use_csums = use_checksum_based_resync(connection, device);
1873                 } else {
1874                         device->use_csums = false;
1875                 }
1876
1877                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1878                  * with w_send_oos, or the sync target will get confused as to
1879                  * how much bits to resync.  We cannot do that always, because for an
1880                  * empty resync and protocol < 95, we need to do it here, as we call
1881                  * drbd_resync_finished from here in that case.
1882                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1883                  * and from after_state_ch otherwise. */
1884                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1885                         drbd_gen_and_send_sync_uuid(peer_device);
1886
1887                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1888                         /* This still has a race (about when exactly the peers
1889                          * detect connection loss) that can lead to a full sync
1890                          * on next handshake. In 8.3.9 we fixed this with explicit
1891                          * resync-finished notifications, but the fix
1892                          * introduces a protocol change.  Sleeping for some
1893                          * time longer than the ping interval + timeout on the
1894                          * SyncSource, to give the SyncTarget the chance to
1895                          * detect connection loss, then waiting for a ping
1896                          * response (implicit in drbd_resync_finished) reduces
1897                          * the race considerably, but does not solve it. */
1898                         if (side == C_SYNC_SOURCE) {
1899                                 struct net_conf *nc;
1900                                 int timeo;
1901
1902                                 rcu_read_lock();
1903                                 nc = rcu_dereference(connection->net_conf);
1904                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1905                                 rcu_read_unlock();
1906                                 schedule_timeout_interruptible(timeo);
1907                         }
1908                         drbd_resync_finished(device);
1909                 }
1910
1911                 drbd_rs_controller_reset(device);
1912                 /* ns.conn may already be != device->state.conn,
1913                  * we may have been paused in between, or become paused until
1914                  * the timer triggers.
1915                  * No matter, that is handled in resync_timer_fn() */
1916                 if (ns.conn == C_SYNC_TARGET)
1917                         mod_timer(&device->resync_timer, jiffies);
1918
1919                 drbd_md_sync(device);
1920         }
1921         put_ldev(device);
1922 out:
1923         mutex_unlock(device->state_mutex);
1924 }
1925
1926 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1927 {
1928         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1929         device->rs_last_bcast = jiffies;
1930
1931         if (!get_ldev(device))
1932                 return;
1933
1934         drbd_bm_write_lazy(device, 0);
1935         if (resync_done && is_sync_state(device->state.conn))
1936                 drbd_resync_finished(device);
1937
1938         drbd_bcast_event(device, &sib);
1939         /* update timestamp, in case it took a while to write out stuff */
1940         device->rs_last_bcast = jiffies;
1941         put_ldev(device);
1942 }
1943
1944 static void drbd_ldev_destroy(struct drbd_device *device)
1945 {
1946         lc_destroy(device->resync);
1947         device->resync = NULL;
1948         lc_destroy(device->act_log);
1949         device->act_log = NULL;
1950
1951         __acquire(local);
1952         drbd_backing_dev_free(device, device->ldev);
1953         device->ldev = NULL;
1954         __release(local);
1955
1956         clear_bit(GOING_DISKLESS, &device->flags);
1957         wake_up(&device->misc_wait);
1958 }
1959
1960 static void go_diskless(struct drbd_device *device)
1961 {
1962         D_ASSERT(device, device->state.disk == D_FAILED);
1963         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1964          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1965          * the protected members anymore, though, so once put_ldev reaches zero
1966          * again, it will be safe to free them. */
1967
1968         /* Try to write changed bitmap pages, read errors may have just
1969          * set some bits outside the area covered by the activity log.
1970          *
1971          * If we have an IO error during the bitmap writeout,
1972          * we will want a full sync next time, just in case.
1973          * (Do we want a specific meta data flag for this?)
1974          *
1975          * If that does not make it to stable storage either,
1976          * we cannot do anything about that anymore.
1977          *
1978          * We still need to check if both bitmap and ldev are present, we may
1979          * end up here after a failed attach, before ldev was even assigned.
1980          */
1981         if (device->bitmap && device->ldev) {
1982                 /* An interrupted resync or similar is allowed to recounts bits
1983                  * while we detach.
1984                  * Any modifications would not be expected anymore, though.
1985                  */
1986                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1987                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1988                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1989                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1990                                 drbd_md_sync(device);
1991                         }
1992                 }
1993         }
1994
1995         drbd_force_state(device, NS(disk, D_DISKLESS));
1996 }
1997
1998 static int do_md_sync(struct drbd_device *device)
1999 {
2000         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2001         drbd_md_sync(device);
2002         return 0;
2003 }
2004
2005 /* only called from drbd_worker thread, no locking */
2006 void __update_timing_details(
2007                 struct drbd_thread_timing_details *tdp,
2008                 unsigned int *cb_nr,
2009                 void *cb,
2010                 const char *fn, const unsigned int line)
2011 {
2012         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2013         struct drbd_thread_timing_details *td = tdp + i;
2014
2015         td->start_jif = jiffies;
2016         td->cb_addr = cb;
2017         td->caller_fn = fn;
2018         td->line = line;
2019         td->cb_nr = *cb_nr;
2020
2021         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2022         td = tdp + i;
2023         memset(td, 0, sizeof(*td));
2024
2025         ++(*cb_nr);
2026 }
2027
2028 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2029 {
2030         if (test_bit(MD_SYNC, &todo))
2031                 do_md_sync(device);
2032         if (test_bit(RS_DONE, &todo) ||
2033             test_bit(RS_PROGRESS, &todo))
2034                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2035         if (test_bit(GO_DISKLESS, &todo))
2036                 go_diskless(device);
2037         if (test_bit(DESTROY_DISK, &todo))
2038                 drbd_ldev_destroy(device);
2039         if (test_bit(RS_START, &todo))
2040                 do_start_resync(device);
2041 }
2042
2043 #define DRBD_DEVICE_WORK_MASK   \
2044         ((1UL << GO_DISKLESS)   \
2045         |(1UL << DESTROY_DISK)  \
2046         |(1UL << MD_SYNC)       \
2047         |(1UL << RS_START)      \
2048         |(1UL << RS_PROGRESS)   \
2049         |(1UL << RS_DONE)       \
2050         )
2051
2052 static unsigned long get_work_bits(unsigned long *flags)
2053 {
2054         unsigned long old, new;
2055         do {
2056                 old = *flags;
2057                 new = old & ~DRBD_DEVICE_WORK_MASK;
2058         } while (cmpxchg(flags, old, new) != old);
2059         return old & DRBD_DEVICE_WORK_MASK;
2060 }
2061
2062 static void do_unqueued_work(struct drbd_connection *connection)
2063 {
2064         struct drbd_peer_device *peer_device;
2065         int vnr;
2066
2067         rcu_read_lock();
2068         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2069                 struct drbd_device *device = peer_device->device;
2070                 unsigned long todo = get_work_bits(&device->flags);
2071                 if (!todo)
2072                         continue;
2073
2074                 kref_get(&device->kref);
2075                 rcu_read_unlock();
2076                 do_device_work(device, todo);
2077                 kref_put(&device->kref, drbd_destroy_device);
2078                 rcu_read_lock();
2079         }
2080         rcu_read_unlock();
2081 }
2082
2083 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2084 {
2085         spin_lock_irq(&queue->q_lock);
2086         list_splice_tail_init(&queue->q, work_list);
2087         spin_unlock_irq(&queue->q_lock);
2088         return !list_empty(work_list);
2089 }
2090
2091 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2092 {
2093         DEFINE_WAIT(wait);
2094         struct net_conf *nc;
2095         int uncork, cork;
2096
2097         dequeue_work_batch(&connection->sender_work, work_list);
2098         if (!list_empty(work_list))
2099                 return;
2100
2101         /* Still nothing to do?
2102          * Maybe we still need to close the current epoch,
2103          * even if no new requests are queued yet.
2104          *
2105          * Also, poke TCP, just in case.
2106          * Then wait for new work (or signal). */
2107         rcu_read_lock();
2108         nc = rcu_dereference(connection->net_conf);
2109         uncork = nc ? nc->tcp_cork : 0;
2110         rcu_read_unlock();
2111         if (uncork) {
2112                 mutex_lock(&connection->data.mutex);
2113                 if (connection->data.socket)
2114                         drbd_tcp_uncork(connection->data.socket);
2115                 mutex_unlock(&connection->data.mutex);
2116         }
2117
2118         for (;;) {
2119                 int send_barrier;
2120                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2121                 spin_lock_irq(&connection->resource->req_lock);
2122                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2123                 if (!list_empty(&connection->sender_work.q))
2124                         list_splice_tail_init(&connection->sender_work.q, work_list);
2125                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2126                 if (!list_empty(work_list) || signal_pending(current)) {
2127                         spin_unlock_irq(&connection->resource->req_lock);
2128                         break;
2129                 }
2130
2131                 /* We found nothing new to do, no to-be-communicated request,
2132                  * no other work item.  We may still need to close the last
2133                  * epoch.  Next incoming request epoch will be connection ->
2134                  * current transfer log epoch number.  If that is different
2135                  * from the epoch of the last request we communicated, it is
2136                  * safe to send the epoch separating barrier now.
2137                  */
2138                 send_barrier =
2139                         atomic_read(&connection->current_tle_nr) !=
2140                         connection->send.current_epoch_nr;
2141                 spin_unlock_irq(&connection->resource->req_lock);
2142
2143                 if (send_barrier)
2144                         maybe_send_barrier(connection,
2145                                         connection->send.current_epoch_nr + 1);
2146
2147                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2148                         break;
2149
2150                 /* drbd_send() may have called flush_signals() */
2151                 if (get_t_state(&connection->worker) != RUNNING)
2152                         break;
2153
2154                 schedule();
2155                 /* may be woken up for other things but new work, too,
2156                  * e.g. if the current epoch got closed.
2157                  * In which case we send the barrier above. */
2158         }
2159         finish_wait(&connection->sender_work.q_wait, &wait);
2160
2161         /* someone may have changed the config while we have been waiting above. */
2162         rcu_read_lock();
2163         nc = rcu_dereference(connection->net_conf);
2164         cork = nc ? nc->tcp_cork : 0;
2165         rcu_read_unlock();
2166         mutex_lock(&connection->data.mutex);
2167         if (connection->data.socket) {
2168                 if (cork)
2169                         drbd_tcp_cork(connection->data.socket);
2170                 else if (!uncork)
2171                         drbd_tcp_uncork(connection->data.socket);
2172         }
2173         mutex_unlock(&connection->data.mutex);
2174 }
2175
2176 int drbd_worker(struct drbd_thread *thi)
2177 {
2178         struct drbd_connection *connection = thi->connection;
2179         struct drbd_work *w = NULL;
2180         struct drbd_peer_device *peer_device;
2181         LIST_HEAD(work_list);
2182         int vnr;
2183
2184         while (get_t_state(thi) == RUNNING) {
2185                 drbd_thread_current_set_cpu(thi);
2186
2187                 if (list_empty(&work_list)) {
2188                         update_worker_timing_details(connection, wait_for_work);
2189                         wait_for_work(connection, &work_list);
2190                 }
2191
2192                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2193                         update_worker_timing_details(connection, do_unqueued_work);
2194                         do_unqueued_work(connection);
2195                 }
2196
2197                 if (signal_pending(current)) {
2198                         flush_signals(current);
2199                         if (get_t_state(thi) == RUNNING) {
2200                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2201                                 continue;
2202                         }
2203                         break;
2204                 }
2205
2206                 if (get_t_state(thi) != RUNNING)
2207                         break;
2208
2209                 if (!list_empty(&work_list)) {
2210                         w = list_first_entry(&work_list, struct drbd_work, list);
2211                         list_del_init(&w->list);
2212                         update_worker_timing_details(connection, w->cb);
2213                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2214                                 continue;
2215                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2216                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2217                 }
2218         }
2219
2220         do {
2221                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2222                         update_worker_timing_details(connection, do_unqueued_work);
2223                         do_unqueued_work(connection);
2224                 }
2225                 if (!list_empty(&work_list)) {
2226                         w = list_first_entry(&work_list, struct drbd_work, list);
2227                         list_del_init(&w->list);
2228                         update_worker_timing_details(connection, w->cb);
2229                         w->cb(w, 1);
2230                 } else
2231                         dequeue_work_batch(&connection->sender_work, &work_list);
2232         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2233
2234         rcu_read_lock();
2235         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2236                 struct drbd_device *device = peer_device->device;
2237                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2238                 kref_get(&device->kref);
2239                 rcu_read_unlock();
2240                 drbd_device_cleanup(device);
2241                 kref_put(&device->kref, drbd_destroy_device);
2242                 rcu_read_lock();
2243         }
2244         rcu_read_unlock();
2245
2246         return 0;
2247 }