drbd: Converted the transfer log from mdev to tconn
[linux-block.git] / drivers / block / drbd / drbd_req.c
index 3424d675b769a661857cdd41aa90a30e28c07b59..fa799e372babbde6ca949a8563c40a258348a695 100644 (file)
@@ -56,6 +56,39 @@ static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
        part_stat_unlock();
 }
 
+static struct drbd_request *drbd_req_new(struct drbd_conf *mdev,
+                                              struct bio *bio_src)
+{
+       struct drbd_request *req;
+
+       req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
+       if (!req)
+               return NULL;
+
+       drbd_req_make_private_bio(req, bio_src);
+       req->rq_state    = bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0;
+       req->w.mdev      = mdev;
+       req->master_bio  = bio_src;
+       req->epoch       = 0;
+
+       drbd_clear_interval(&req->i);
+       req->i.sector     = bio_src->bi_sector;
+       req->i.size      = bio_src->bi_size;
+       req->i.local = true;
+       req->i.waiting = false;
+
+       INIT_LIST_HEAD(&req->tl_requests);
+       INIT_LIST_HEAD(&req->w.list);
+
+       return req;
+}
+
+static void drbd_req_free(struct drbd_request *req)
+{
+       mempool_free(req, drbd_request_mempool);
+}
+
+/* rw is bio_data_dir(), only READ or WRITE */
 static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const int rw)
 {
        const unsigned long s = req->rq_state;
@@ -77,13 +110,13 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
                 * Other places where we set out-of-sync:
                 * READ with local io-error */
                if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
-                       drbd_set_out_of_sync(mdev, req->sector, req->size);
+                       drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
 
                if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
-                       drbd_set_in_sync(mdev, req->sector, req->size);
+                       drbd_set_in_sync(mdev, req->i.sector, req->i.size);
 
                /* one might be tempted to move the drbd_al_complete_io
-                * to the local io completion callback drbd_endio_pri.
+                * to the local io completion callback drbd_request_endio.
                 * but, if this was a mirror write, we may only
                 * drbd_al_complete_io after this is RQ_NET_DONE,
                 * otherwise the extent could be dropped from the al
@@ -95,12 +128,12 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
                if (s & RQ_LOCAL_MASK) {
                        if (get_ldev_if_state(mdev, D_FAILED)) {
                                if (s & RQ_IN_ACT_LOG)
-                                       drbd_al_complete_io(mdev, req->sector);
+                                       drbd_al_complete_io(mdev, req->i.sector);
                                put_ldev(mdev);
                        } else if (__ratelimit(&drbd_ratelimit_state)) {
                                dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu), "
                                     "but my Disk seems to have failed :(\n",
-                                    (unsigned long long) req->sector);
+                                    (unsigned long long) req->i.sector);
                        }
                }
        }
@@ -120,14 +153,15 @@ static void queue_barrier(struct drbd_conf *mdev)
        if (test_bit(CREATE_BARRIER, &mdev->flags))
                return;
 
-       b = mdev->newest_tle;
+       b = mdev->tconn->newest_tle;
        b->w.cb = w_send_barrier;
+       b->w.mdev = mdev;
        /* inc_ap_pending done here, so we won't
         * get imbalanced on connection loss.
         * dec_ap_pending will be done in got_BarrierAck
         * or (on connection loss) in tl_clear.  */
        inc_ap_pending(mdev);
-       drbd_queue_work(&mdev->data.work, &b->w);
+       drbd_queue_work(&mdev->tconn->data.work, &b->w);
        set_bit(CREATE_BARRIER, &mdev->flags);
 }
 
@@ -135,10 +169,6 @@ static void _about_to_complete_local_write(struct drbd_conf *mdev,
        struct drbd_request *req)
 {
        const unsigned long s = req->rq_state;
-       struct drbd_request *i;
-       struct drbd_epoch_entry *e;
-       struct hlist_node *n;
-       struct hlist_head *slot;
 
        /* Before we can signal completion to the upper layers,
         * we may need to close the current epoch.
@@ -148,53 +178,8 @@ static void _about_to_complete_local_write(struct drbd_conf *mdev,
         */
        if (mdev->state.conn >= C_CONNECTED &&
            (s & RQ_NET_SENT) != 0 &&
-           req->epoch == mdev->newest_tle->br_number)
+           req->epoch == mdev->tconn->newest_tle->br_number)
                queue_barrier(mdev);
-
-       /* we need to do the conflict detection stuff,
-        * if we have the ee_hash (two_primaries) and
-        * this has been on the network */
-       if ((s & RQ_NET_DONE) && mdev->ee_hash != NULL) {
-               const sector_t sector = req->sector;
-               const int size = req->size;
-
-               /* ASSERT:
-                * there must be no conflicting requests, since
-                * they must have been failed on the spot */
-#define OVERLAPS overlaps(sector, size, i->sector, i->size)
-               slot = tl_hash_slot(mdev, sector);
-               hlist_for_each_entry(i, n, slot, collision) {
-                       if (OVERLAPS) {
-                               dev_alert(DEV, "LOGIC BUG: completed: %p %llus +%u; "
-                                     "other: %p %llus +%u\n",
-                                     req, (unsigned long long)sector, size,
-                                     i, (unsigned long long)i->sector, i->size);
-                       }
-               }
-
-               /* maybe "wake" those conflicting epoch entries
-                * that wait for this request to finish.
-                *
-                * currently, there can be only _one_ such ee
-                * (well, or some more, which would be pending
-                * P_DISCARD_ACK not yet sent by the asender...),
-                * since we block the receiver thread upon the
-                * first conflict detection, which will wait on
-                * misc_wait.  maybe we want to assert that?
-                *
-                * anyways, if we found one,
-                * we just have to do a wake_up.  */
-#undef OVERLAPS
-#define OVERLAPS overlaps(sector, size, e->sector, e->size)
-               slot = ee_hash_slot(mdev, req->sector);
-               hlist_for_each_entry(e, n, slot, collision) {
-                       if (OVERLAPS) {
-                               wake_up(&mdev->misc_wait);
-                               break;
-                       }
-               }
-       }
-#undef OVERLAPS
 }
 
 void complete_master_bio(struct drbd_conf *mdev,
@@ -204,6 +189,20 @@ void complete_master_bio(struct drbd_conf *mdev,
        dec_ap_bio(mdev);
 }
 
+
+static void drbd_remove_request_interval(struct rb_root *root,
+                                        struct drbd_request *req)
+{
+       struct drbd_conf *mdev = req->w.mdev;
+       struct drbd_interval *i = &req->i;
+
+       drbd_remove_interval(root, i);
+
+       /* Wake up any processes waiting for this request to complete.  */
+       if (i->waiting)
+               wake_up(&mdev->misc_wait);
+}
+
 /* Helper for __req_mod().
  * Set m->bio to the master bio, if it is fit to be completed,
  * or leave it alone (it is initialized to NULL in __req_mod),
@@ -213,7 +212,7 @@ void complete_master_bio(struct drbd_conf *mdev,
 void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
 {
        const unsigned long s = req->rq_state;
-       struct drbd_conf *mdev = req->mdev;
+       struct drbd_conf *mdev = req->w.mdev;
        /* only WRITES may end up here without a master bio (on barrier ack) */
        int rw = req->master_bio ? bio_data_dir(req->master_bio) : WRITE;
 
@@ -226,18 +225,22 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
         *      the receiver,
         *      the bio_endio completion callbacks.
         */
+       if (s & RQ_LOCAL_PENDING)
+               return;
+       if (req->i.waiting) {
+               /* Retry all conflicting peer requests.  */
+               wake_up(&mdev->misc_wait);
+       }
        if (s & RQ_NET_QUEUED)
                return;
        if (s & RQ_NET_PENDING)
                return;
-       if (s & RQ_LOCAL_PENDING)
-               return;
 
        if (req->master_bio) {
-               /* this is data_received (remote read)
+               /* this is DATA_RECEIVED (remote read)
                 * or protocol C P_WRITE_ACK
                 * or protocol B P_RECV_ACK
-                * or protocol A "handed_over_to_network" (SendAck)
+                * or protocol A "HANDED_OVER_TO_NETWORK" (SendAck)
                 * or canceled or failed,
                 * or killed from the transfer log due to connection loss.
                 */
@@ -253,16 +256,22 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
                 * what we need to do here is just: complete the master_bio.
                 *
                 * local completion error, if any, has been stored as ERR_PTR
-                * in private_bio within drbd_endio_pri.
+                * in private_bio within drbd_request_endio.
                 */
                int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
                int error = PTR_ERR(req->private_bio);
 
                /* remove the request from the conflict detection
                 * respective block_id verification hash */
-               if (!hlist_unhashed(&req->collision))
-                       hlist_del(&req->collision);
-               else
+               if (!drbd_interval_empty(&req->i)) {
+                       struct rb_root *root;
+
+                       if (rw == WRITE)
+                               root = &mdev->write_requests;
+                       else
+                               root = &mdev->read_requests;
+                       drbd_remove_request_interval(root, req);
+               } else if (!(s & RQ_POSTPONED))
                        D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
 
                /* for writes we need to do some extra housekeeping */
@@ -272,8 +281,10 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
                /* Update disk stats */
                _drbd_end_io_acct(mdev, req);
 
-               m->error = ok ? 0 : (error ?: -EIO);
-               m->bio = req->master_bio;
+               if (!(s & RQ_POSTPONED)) {
+                       m->error = ok ? 0 : (error ?: -EIO);
+                       m->bio = req->master_bio;
+               }
                req->master_bio = NULL;
        }
 
@@ -290,100 +301,12 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
 
 static void _req_may_be_done_not_susp(struct drbd_request *req, struct bio_and_error *m)
 {
-       struct drbd_conf *mdev = req->mdev;
+       struct drbd_conf *mdev = req->w.mdev;
 
        if (!is_susp(mdev->state))
                _req_may_be_done(req, m);
 }
 
-/*
- * checks whether there was an overlapping request
- * or ee already registered.
- *
- * if so, return 1, in which case this request is completed on the spot,
- * without ever being submitted or send.
- *
- * return 0 if it is ok to submit this request.
- *
- * NOTE:
- * paranoia: assume something above us is broken, and issues different write
- * requests for the same block simultaneously...
- *
- * To ensure these won't be reordered differently on both nodes, resulting in
- * diverging data sets, we discard the later one(s). Not that this is supposed
- * to happen, but this is the rationale why we also have to check for
- * conflicting requests with local origin, and why we have to do so regardless
- * of whether we allowed multiple primaries.
- *
- * BTW, in case we only have one primary, the ee_hash is empty anyways, and the
- * second hlist_for_each_entry becomes a noop. This is even simpler than to
- * grab a reference on the net_conf, and check for the two_primaries flag...
- */
-static int _req_conflicts(struct drbd_request *req)
-{
-       struct drbd_conf *mdev = req->mdev;
-       const sector_t sector = req->sector;
-       const int size = req->size;
-       struct drbd_request *i;
-       struct drbd_epoch_entry *e;
-       struct hlist_node *n;
-       struct hlist_head *slot;
-
-       D_ASSERT(hlist_unhashed(&req->collision));
-
-       if (!get_net_conf(mdev))
-               return 0;
-
-       /* BUG_ON */
-       ERR_IF (mdev->tl_hash_s == 0)
-               goto out_no_conflict;
-       BUG_ON(mdev->tl_hash == NULL);
-
-#define OVERLAPS overlaps(i->sector, i->size, sector, size)
-       slot = tl_hash_slot(mdev, sector);
-       hlist_for_each_entry(i, n, slot, collision) {
-               if (OVERLAPS) {
-                       dev_alert(DEV, "%s[%u] Concurrent local write detected! "
-                             "[DISCARD L] new: %llus +%u; "
-                             "pending: %llus +%u\n",
-                             current->comm, current->pid,
-                             (unsigned long long)sector, size,
-                             (unsigned long long)i->sector, i->size);
-                       goto out_conflict;
-               }
-       }
-
-       if (mdev->ee_hash_s) {
-               /* now, check for overlapping requests with remote origin */
-               BUG_ON(mdev->ee_hash == NULL);
-#undef OVERLAPS
-#define OVERLAPS overlaps(e->sector, e->size, sector, size)
-               slot = ee_hash_slot(mdev, sector);
-               hlist_for_each_entry(e, n, slot, collision) {
-                       if (OVERLAPS) {
-                               dev_alert(DEV, "%s[%u] Concurrent remote write detected!"
-                                     " [DISCARD L] new: %llus +%u; "
-                                     "pending: %llus +%u\n",
-                                     current->comm, current->pid,
-                                     (unsigned long long)sector, size,
-                                     (unsigned long long)e->sector, e->size);
-                               goto out_conflict;
-                       }
-               }
-       }
-#undef OVERLAPS
-
-out_no_conflict:
-       /* this is like it should be, and what we expected.
-        * our users do behave after all... */
-       put_net_conf(mdev);
-       return 0;
-
-out_conflict:
-       put_net_conf(mdev);
-       return 1;
-}
-
 /* obviously this could be coded as many single functions
  * instead of one huge switch,
  * or by putting the code directly in the respective locations
@@ -399,9 +322,11 @@ out_conflict:
 int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                struct bio_and_error *m)
 {
-       struct drbd_conf *mdev = req->mdev;
+       struct drbd_conf *mdev = req->w.mdev;
        int rv = 0;
-       m->bio = NULL;
+
+       if (m)
+               m->bio = NULL;
 
        switch (what) {
        default:
@@ -410,29 +335,29 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 
        /* does not happen...
         * initialization done in drbd_req_new
-       case created:
+       case CREATED:
                break;
                */
 
-       case to_be_send: /* via network */
-               /* reached via drbd_make_request_common
+       case TO_BE_SENT: /* via network */
+               /* reached via __drbd_make_request
                 * and from w_read_retry_remote */
                D_ASSERT(!(req->rq_state & RQ_NET_MASK));
                req->rq_state |= RQ_NET_PENDING;
                inc_ap_pending(mdev);
                break;
 
-       case to_be_submitted: /* locally */
-               /* reached via drbd_make_request_common */
+       case TO_BE_SUBMITTED: /* locally */
+               /* reached via __drbd_make_request */
                D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
                req->rq_state |= RQ_LOCAL_PENDING;
                break;
 
-       case completed_ok:
+       case COMPLETED_OK:
                if (bio_data_dir(req->master_bio) == WRITE)
-                       mdev->writ_cnt += req->size>>9;
+                       mdev->writ_cnt += req->i.size >> 9;
                else
-                       mdev->read_cnt += req->size>>9;
+                       mdev->read_cnt += req->i.size >> 9;
 
                req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
                req->rq_state &= ~RQ_LOCAL_PENDING;
@@ -441,7 +366,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                put_ldev(mdev);
                break;
 
-       case write_completed_with_error:
+       case WRITE_COMPLETED_WITH_ERROR:
                req->rq_state |= RQ_LOCAL_COMPLETED;
                req->rq_state &= ~RQ_LOCAL_PENDING;
 
@@ -450,7 +375,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                put_ldev(mdev);
                break;
 
-       case read_ahead_completed_with_error:
+       case READ_AHEAD_COMPLETED_WITH_ERROR:
                /* it is legal to fail READA */
                req->rq_state |= RQ_LOCAL_COMPLETED;
                req->rq_state &= ~RQ_LOCAL_PENDING;
@@ -458,8 +383,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                put_ldev(mdev);
                break;
 
-       case read_completed_with_error:
-               drbd_set_out_of_sync(mdev, req->sector, req->size);
+       case READ_COMPLETED_WITH_ERROR:
+               drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
 
                req->rq_state |= RQ_LOCAL_COMPLETED;
                req->rq_state &= ~RQ_LOCAL_PENDING;
@@ -476,22 +401,22 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                        break;
                }
 
-               /* _req_mod(req,to_be_send); oops, recursion... */
+               /* _req_mod(req,TO_BE_SENT); oops, recursion... */
                req->rq_state |= RQ_NET_PENDING;
                inc_ap_pending(mdev);
-               /* fall through: _req_mod(req,queue_for_net_read); */
+               /* fall through: _req_mod(req,QUEUE_FOR_NET_READ); */
 
-       case queue_for_net_read:
+       case QUEUE_FOR_NET_READ:
                /* READ or READA, and
                 * no local disk,
                 * or target area marked as invalid,
                 * or just got an io-error. */
-               /* from drbd_make_request_common
+               /* from __drbd_make_request
                 * or from bio_endio during read io-error recovery */
 
                /* so we can verify the handle in the answer packet
                 * corresponding hlist_del is in _req_may_be_done() */
-               hlist_add_head(&req->collision, ar_hash_slot(mdev, req->sector));
+               drbd_insert_interval(&mdev->read_requests, &req->i);
 
                set_bit(UNPLUG_REMOTE, &mdev->flags);
 
@@ -500,15 +425,15 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                req->w.cb = (req->rq_state & RQ_LOCAL_MASK)
                        ? w_read_retry_remote
                        : w_send_read_req;
-               drbd_queue_work(&mdev->data.work, &req->w);
+               drbd_queue_work(&mdev->tconn->data.work, &req->w);
                break;
 
-       case queue_for_net_write:
+       case QUEUE_FOR_NET_WRITE:
                /* assert something? */
-               /* from drbd_make_request_common only */
+               /* from __drbd_make_request only */
 
-               hlist_add_head(&req->collision, tl_hash_slot(mdev, req->sector));
                /* corresponding hlist_del is in _req_may_be_done() */
+               drbd_insert_interval(&mdev->write_requests, &req->i);
 
                /* NOTE
                 * In case the req ended up on the transfer log before being
@@ -519,7 +444,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                 *
                 * _req_add_to_epoch(req); this has to be after the
                 * _maybe_start_new_epoch(req); which happened in
-                * drbd_make_request_common, because we now may set the bit
+                * __drbd_make_request, because we now may set the bit
                 * again ourselves to close the current epoch.
                 *
                 * Add req to the (now) current epoch (barrier). */
@@ -529,38 +454,38 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                 * hurting performance. */
                set_bit(UNPLUG_REMOTE, &mdev->flags);
 
-               /* see drbd_make_request_common,
+               /* see __drbd_make_request,
                 * just after it grabs the req_lock */
                D_ASSERT(test_bit(CREATE_BARRIER, &mdev->flags) == 0);
 
-               req->epoch = mdev->newest_tle->br_number;
+               req->epoch = mdev->tconn->newest_tle->br_number;
 
                /* increment size of current epoch */
-               mdev->newest_tle->n_writes++;
+               mdev->tconn->newest_tle->n_writes++;
 
                /* queue work item to send data */
                D_ASSERT(req->rq_state & RQ_NET_PENDING);
                req->rq_state |= RQ_NET_QUEUED;
                req->w.cb =  w_send_dblock;
-               drbd_queue_work(&mdev->data.work, &req->w);
+               drbd_queue_work(&mdev->tconn->data.work, &req->w);
 
                /* close the epoch, in case it outgrew the limit */
-               if (mdev->newest_tle->n_writes >= mdev->net_conf->max_epoch_size)
+               if (mdev->tconn->newest_tle->n_writes >= mdev->tconn->net_conf->max_epoch_size)
                        queue_barrier(mdev);
 
                break;
 
-       case queue_for_send_oos:
+       case QUEUE_FOR_SEND_OOS:
                req->rq_state |= RQ_NET_QUEUED;
                req->w.cb =  w_send_oos;
-               drbd_queue_work(&mdev->data.work, &req->w);
+               drbd_queue_work(&mdev->tconn->data.work, &req->w);
                break;
 
-       case oos_handed_to_network:
+       case OOS_HANDED_TO_NETWORK:
                /* actually the same */
-       case send_canceled:
+       case SEND_CANCELED:
                /* treat it the same */
-       case send_failed:
+       case SEND_FAILED:
                /* real cleanup will be done from tl_clear.  just update flags
                 * so it is no longer marked as on the worker queue */
                req->rq_state &= ~RQ_NET_QUEUED;
@@ -569,13 +494,13 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                _req_may_be_done_not_susp(req, m);
                break;
 
-       case handed_over_to_network:
+       case HANDED_OVER_TO_NETWORK:
                /* assert something? */
                if (bio_data_dir(req->master_bio) == WRITE)
-                       atomic_add(req->size>>9, &mdev->ap_in_flight);
+                       atomic_add(req->i.size >> 9, &mdev->ap_in_flight);
 
                if (bio_data_dir(req->master_bio) == WRITE &&
-                   mdev->net_conf->wire_protocol == DRBD_PROT_A) {
+                   mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A) {
                        /* this is what is dangerous about protocol A:
                         * pretend it was successfully written on the peer. */
                        if (req->rq_state & RQ_NET_PENDING) {
@@ -590,17 +515,17 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                req->rq_state &= ~RQ_NET_QUEUED;
                req->rq_state |= RQ_NET_SENT;
                /* because _drbd_send_zc_bio could sleep, and may want to
-                * dereference the bio even after the "write_acked_by_peer" and
-                * "completed_ok" events came in, once we return from
+                * dereference the bio even after the "WRITE_ACKED_BY_PEER" and
+                * "COMPLETED_OK" events came in, once we return from
                 * _drbd_send_zc_bio (drbd_send_dblock), we have to check
                 * whether it is done already, and end it.  */
                _req_may_be_done_not_susp(req, m);
                break;
 
-       case read_retry_remote_canceled:
+       case READ_RETRY_REMOTE_CANCELED:
                req->rq_state &= ~RQ_NET_QUEUED;
                /* fall through, in case we raced with drbd_disconnect */
-       case connection_lost_while_pending:
+       case CONNECTION_LOST_WHILE_PENDING:
                /* transfer log cleanup after connection loss */
                /* assert something? */
                if (req->rq_state & RQ_NET_PENDING)
@@ -608,7 +533,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
                req->rq_state |= RQ_NET_DONE;
                if (req->rq_state & RQ_NET_SENT && req->rq_state & RQ_WRITE)
-                       atomic_sub(req->size>>9, &mdev->ap_in_flight);
+                       atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
 
                /* if it is still queued, we may not complete it here.
                 * it will be canceled soon. */
@@ -616,19 +541,15 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                        _req_may_be_done(req, m); /* Allowed while state.susp */
                break;
 
-       case write_acked_by_peer_and_sis:
+       case WRITE_ACKED_BY_PEER_AND_SIS:
                req->rq_state |= RQ_NET_SIS;
-       case conflict_discarded_by_peer:
+       case DISCARD_WRITE:
                /* for discarded conflicting writes of multiple primaries,
                 * there is no need to keep anything in the tl, potential
                 * node crashes are covered by the activity log. */
-               if (what == conflict_discarded_by_peer)
-                       dev_alert(DEV, "Got DiscardAck packet %llus +%u!"
-                             " DRBD is not a random data generator!\n",
-                             (unsigned long long)req->sector, req->size);
                req->rq_state |= RQ_NET_DONE;
                /* fall through */
-       case write_acked_by_peer:
+       case WRITE_ACKED_BY_PEER:
                /* protocol C; successfully written on peer.
                 * Nothing to do here.
                 * We want to keep the tl in place for all protocols, to cater
@@ -640,39 +561,50 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                 * P_BARRIER_ACK, but that is an unnecessary optimization. */
 
                /* this makes it effectively the same as for: */
-       case recv_acked_by_peer:
+       case RECV_ACKED_BY_PEER:
                /* protocol B; pretends to be successfully written on peer.
-                * see also notes above in handed_over_to_network about
+                * see also notes above in HANDED_OVER_TO_NETWORK about
                 * protocol != C */
                req->rq_state |= RQ_NET_OK;
                D_ASSERT(req->rq_state & RQ_NET_PENDING);
                dec_ap_pending(mdev);
-               atomic_sub(req->size>>9, &mdev->ap_in_flight);
+               atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
                req->rq_state &= ~RQ_NET_PENDING;
                _req_may_be_done_not_susp(req, m);
                break;
 
-       case neg_acked:
+       case POSTPONE_WRITE:
+               /*
+                * If this node has already detected the write conflict, the
+                * worker will be waiting on misc_wait.  Wake it up once this
+                * request has completed locally.
+                */
+               D_ASSERT(req->rq_state & RQ_NET_PENDING);
+               req->rq_state |= RQ_POSTPONED;
+               _req_may_be_done_not_susp(req, m);
+               break;
+
+       case NEG_ACKED:
                /* assert something? */
                if (req->rq_state & RQ_NET_PENDING) {
                        dec_ap_pending(mdev);
-                       atomic_sub(req->size>>9, &mdev->ap_in_flight);
+                       atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
                }
                req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
 
                req->rq_state |= RQ_NET_DONE;
                _req_may_be_done_not_susp(req, m);
-               /* else: done by handed_over_to_network */
+               /* else: done by HANDED_OVER_TO_NETWORK */
                break;
 
-       case fail_frozen_disk_io:
+       case FAIL_FROZEN_DISK_IO:
                if (!(req->rq_state & RQ_LOCAL_COMPLETED))
                        break;
 
                _req_may_be_done(req, m); /* Allowed while state.susp */
                break;
 
-       case restart_frozen_disk_io:
+       case RESTART_FROZEN_DISK_IO:
                if (!(req->rq_state & RQ_LOCAL_COMPLETED))
                        break;
 
@@ -684,24 +616,24 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 
                get_ldev(mdev);
                req->w.cb = w_restart_disk_io;
-               drbd_queue_work(&mdev->data.work, &req->w);
+               drbd_queue_work(&mdev->tconn->data.work, &req->w);
                break;
 
-       case resend:
+       case RESEND:
                /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
                   before the connection loss (B&C only); only P_BARRIER_ACK was missing.
                   Trowing them out of the TL here by pretending we got a BARRIER_ACK
                   We ensure that the peer was not rebooted */
                if (!(req->rq_state & RQ_NET_OK)) {
                        if (req->w.cb) {
-                               drbd_queue_work(&mdev->data.work, &req->w);
+                               drbd_queue_work(&mdev->tconn->data.work, &req->w);
                                rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
                        }
                        break;
                }
-               /* else, fall through to barrier_acked */
+               /* else, fall through to BARRIER_ACKED */
 
-       case barrier_acked:
+       case BARRIER_ACKED:
                if (!(req->rq_state & RQ_WRITE))
                        break;
 
@@ -709,18 +641,18 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                        /* barrier came in before all requests have been acked.
                         * this is bad, because if the connection is lost now,
                         * we won't be able to clean them up... */
-                       dev_err(DEV, "FIXME (barrier_acked but pending)\n");
-                       list_move(&req->tl_requests, &mdev->out_of_sequence_requests);
+                       dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
+                       list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
                }
                if ((req->rq_state & RQ_NET_MASK) != 0) {
                        req->rq_state |= RQ_NET_DONE;
-                       if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
-                               atomic_sub(req->size>>9, &mdev->ap_in_flight);
+                       if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A)
+                               atomic_sub(req->i.size>>9, &mdev->ap_in_flight);
                }
                _req_may_be_done(req, m); /* Allowed while state.susp */
                break;
 
-       case data_received:
+       case DATA_RECEIVED:
                D_ASSERT(req->rq_state & RQ_NET_PENDING);
                dec_ap_pending(mdev);
                req->rq_state &= ~RQ_NET_PENDING;
@@ -746,14 +678,11 @@ static int drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int s
 
        if (mdev->state.disk == D_UP_TO_DATE)
                return 1;
-       if (mdev->state.disk >= D_OUTDATED)
-               return 0;
-       if (mdev->state.disk <  D_INCONSISTENT)
+       if (mdev->state.disk != D_INCONSISTENT)
                return 0;
-       /* state.disk == D_INCONSISTENT   We will have a look at the BitMap */
-       nr_sectors = drbd_get_capacity(mdev->this_bdev);
        esector = sector + (size >> 9) - 1;
 
+       nr_sectors = drbd_get_capacity(mdev->this_bdev);
        D_ASSERT(sector  < nr_sectors);
        D_ASSERT(esector < nr_sectors);
 
@@ -763,7 +692,30 @@ static int drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int s
        return 0 == drbd_bm_count_bits(mdev, sbnr, ebnr);
 }
 
-static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
+/*
+ * complete_conflicting_writes  -  wait for any conflicting write requests
+ *
+ * The write_requests tree contains all active write requests which we
+ * currently know about.  Wait for any requests to complete which conflict with
+ * the new one.
+ */
+static int complete_conflicting_writes(struct drbd_conf *mdev,
+                                      sector_t sector, int size)
+{
+       for(;;) {
+               struct drbd_interval *i;
+               int err;
+
+               i = drbd_find_overlap(&mdev->write_requests, sector, size);
+               if (!i)
+                       return 0;
+               err = drbd_wait_misc(mdev, i);
+               if (err)
+                       return err;
+       }
+}
+
+int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
 {
        const int rw = bio_rw(bio);
        const int size = bio->bi_size;
@@ -771,7 +723,7 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, uns
        struct drbd_tl_epoch *b = NULL;
        struct drbd_request *req;
        int local, remote, send_oos = 0;
-       int err = -EIO;
+       int err;
        int ret = 0;
 
        /* allocate outside of all locks; */
@@ -841,6 +793,7 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, uns
        if (!(local || remote) && !is_susp(mdev->state)) {
                if (__ratelimit(&drbd_ratelimit_state))
                        dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
+               err = -EIO;
                goto fail_free_complete;
        }
 
@@ -851,7 +804,7 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio, uns
         * spinlock, and grabbing the spinlock.
         * if we lost that race, we retry.  */
        if (rw == WRITE && (remote || send_oos) &&
-           mdev->unused_spare_tle == NULL &&
+           mdev->tconn->unused_spare_tle == NULL &&
            test_bit(CREATE_BARRIER, &mdev->flags)) {
 allocate_barrier:
                b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
@@ -863,7 +816,20 @@ allocate_barrier:
        }
 
        /* GOOD, everything prepared, grab the spin_lock */
-       spin_lock_irq(&mdev->req_lock);
+       spin_lock_irq(&mdev->tconn->req_lock);
+
+       if (rw == WRITE) {
+               err = complete_conflicting_writes(mdev, sector, size);
+               if (err) {
+                       if (err != -ERESTARTSYS)
+                               _conn_request_state(mdev->tconn,
+                                                   NS(conn, C_TIMEOUT),
+                                                   CS_HARD);
+                       spin_unlock_irq(&mdev->tconn->req_lock);
+                       err = -EIO;
+                       goto fail_free_complete;
+               }
+       }
 
        if (is_susp(mdev->state)) {
                /* If we got suspended, use the retry mechanism of
@@ -871,7 +837,7 @@ allocate_barrier:
                   bio. In the next call to drbd_make_request
                   we sleep in inc_ap_bio() */
                ret = 1;
-               spin_unlock_irq(&mdev->req_lock);
+               spin_unlock_irq(&mdev->tconn->req_lock);
                goto fail_free_complete;
        }
 
@@ -884,21 +850,22 @@ allocate_barrier:
                        dev_warn(DEV, "lost connection while grabbing the req_lock!\n");
                if (!(local || remote)) {
                        dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
-                       spin_unlock_irq(&mdev->req_lock);
+                       spin_unlock_irq(&mdev->tconn->req_lock);
+                       err = -EIO;
                        goto fail_free_complete;
                }
        }
 
-       if (b && mdev->unused_spare_tle == NULL) {
-               mdev->unused_spare_tle = b;
+       if (b && mdev->tconn->unused_spare_tle == NULL) {
+               mdev->tconn->unused_spare_tle = b;
                b = NULL;
        }
        if (rw == WRITE && (remote || send_oos) &&
-           mdev->unused_spare_tle == NULL &&
+           mdev->tconn->unused_spare_tle == NULL &&
            test_bit(CREATE_BARRIER, &mdev->flags)) {
                /* someone closed the current epoch
                 * while we were grabbing the spinlock */
-               spin_unlock_irq(&mdev->req_lock);
+               spin_unlock_irq(&mdev->tconn->req_lock);
                goto allocate_barrier;
        }
 
@@ -916,10 +883,10 @@ allocate_barrier:
         * barrier packet.  To get the write ordering right, we only have to
         * make sure that, if this is a write request and it triggered a
         * barrier packet, this request is queued within the same spinlock. */
-       if ((remote || send_oos) && mdev->unused_spare_tle &&
+       if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
            test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
-               _tl_add_barrier(mdev, mdev->unused_spare_tle);
-               mdev->unused_spare_tle = NULL;
+               _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
+               mdev->tconn->unused_spare_tle = NULL;
        } else {
                D_ASSERT(!(remote && rw == WRITE &&
                           test_bit(CREATE_BARRIER, &mdev->flags)));
@@ -941,17 +908,11 @@ allocate_barrier:
        /* mark them early for readability.
         * this just sets some state flags. */
        if (remote)
-               _req_mod(req, to_be_send);
+               _req_mod(req, TO_BE_SENT);
        if (local)
-               _req_mod(req, to_be_submitted);
-
-       /* check this request on the collision detection hash tables.
-        * if we have a conflict, just complete it here.
-        * THINK do we want to check reads, too? (I don't think so...) */
-       if (rw == WRITE && _req_conflicts(req))
-               goto fail_conflicting;
+               _req_mod(req, TO_BE_SUBMITTED);
 
-       list_add_tail(&req->tl_requests, &mdev->newest_tle->requests);
+       list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests);
 
        /* NOTE remote first: to get the concurrent write detection right,
         * we must register the request before start of local IO.  */
@@ -961,23 +922,23 @@ allocate_barrier:
                 * or READ, but not in sync.
                 */
                _req_mod(req, (rw == WRITE)
-                               ? queue_for_net_write
-                               : queue_for_net_read);
+                               ? QUEUE_FOR_NET_WRITE
+                               : QUEUE_FOR_NET_READ);
        }
        if (send_oos && drbd_set_out_of_sync(mdev, sector, size))
-               _req_mod(req, queue_for_send_oos);
+               _req_mod(req, QUEUE_FOR_SEND_OOS);
 
        if (remote &&
-           mdev->net_conf->on_congestion != OC_BLOCK && mdev->agreed_pro_version >= 96) {
+           mdev->tconn->net_conf->on_congestion != OC_BLOCK && mdev->tconn->agreed_pro_version >= 96) {
                int congested = 0;
 
-               if (mdev->net_conf->cong_fill &&
-                   atomic_read(&mdev->ap_in_flight) >= mdev->net_conf->cong_fill) {
+               if (mdev->tconn->net_conf->cong_fill &&
+                   atomic_read(&mdev->ap_in_flight) >= mdev->tconn->net_conf->cong_fill) {
                        dev_info(DEV, "Congestion-fill threshold reached\n");
                        congested = 1;
                }
 
-               if (mdev->act_log->used >= mdev->net_conf->cong_extents) {
+               if (mdev->act_log->used >= mdev->tconn->net_conf->cong_extents) {
                        dev_info(DEV, "Congestion-extents threshold reached\n");
                        congested = 1;
                }
@@ -985,14 +946,14 @@ allocate_barrier:
                if (congested) {
                        queue_barrier(mdev); /* last barrier, after mirrored writes */
 
-                       if (mdev->net_conf->on_congestion == OC_PULL_AHEAD)
+                       if (mdev->tconn->net_conf->on_congestion == OC_PULL_AHEAD)
                                _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
-                       else  /*mdev->net_conf->on_congestion == OC_DISCONNECT */
+                       else  /*mdev->tconn->net_conf->on_congestion == OC_DISCONNECT */
                                _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);
                }
        }
 
-       spin_unlock_irq(&mdev->req_lock);
+       spin_unlock_irq(&mdev->tconn->req_lock);
        kfree(b); /* if someone else has beaten us to it... */
 
        if (local) {
@@ -1017,21 +978,6 @@ allocate_barrier:
 
        return 0;
 
-fail_conflicting:
-       /* this is a conflicting request.
-        * even though it may have been only _partially_
-        * overlapping with one of the currently pending requests,
-        * without even submitting or sending it, we will
-        * pretend that it was successfully served right now.
-        */
-       _drbd_end_io_acct(mdev, req);
-       spin_unlock_irq(&mdev->req_lock);
-       if (remote)
-               dec_ap_pending(mdev);
-       /* THINK: do we want to fail it (-EIO), or pretend success?
-        * this pretends success. */
-       err = 0;
-
 fail_free_complete:
        if (req->rq_state & RQ_IN_ACT_LOG)
                drbd_al_complete_io(mdev, sector);
@@ -1051,46 +997,19 @@ fail_and_free_req:
        return ret;
 }
 
-/* helper function for drbd_make_request
- * if we can determine just by the mdev (state) that this request will fail,
- * return 1
- * otherwise return 0
- */
-static int drbd_fail_request_early(struct drbd_conf *mdev, int is_write)
-{
-       if (mdev->state.role != R_PRIMARY &&
-               (!allow_oos || is_write)) {
-               if (__ratelimit(&drbd_ratelimit_state)) {
-                       dev_err(DEV, "Process %s[%u] tried to %s; "
-                           "since we are not in Primary state, "
-                           "we cannot allow this\n",
-                           current->comm, current->pid,
-                           is_write ? "WRITE" : "READ");
-               }
-               return 1;
-       }
-
-       return 0;
-}
-
 int drbd_make_request(struct request_queue *q, struct bio *bio)
 {
        unsigned int s_enr, e_enr;
        struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
        unsigned long start_time;
 
-       if (drbd_fail_request_early(mdev, bio_data_dir(bio) & WRITE)) {
-               bio_endio(bio, -EPERM);
-               return 0;
-       }
-
        start_time = jiffies;
 
        /*
         * what we "blindly" assume:
         */
        D_ASSERT(bio->bi_size > 0);
-       D_ASSERT((bio->bi_size & 0x1ff) == 0);
+       D_ASSERT(IS_ALIGNED(bio->bi_size, 512));
        D_ASSERT(bio->bi_idx == 0);
 
        /* to make some things easier, force alignment of requests within the
@@ -1100,7 +1019,7 @@ int drbd_make_request(struct request_queue *q, struct bio *bio)
 
        if (likely(s_enr == e_enr)) {
                inc_ap_bio(mdev, 1);
-               return drbd_make_request_common(mdev, bio, start_time);
+               return __drbd_make_request(mdev, bio, start_time);
        }
 
        /* can this bio be split generically?
@@ -1138,10 +1057,10 @@ int drbd_make_request(struct request_queue *q, struct bio *bio)
 
                D_ASSERT(e_enr == s_enr + 1);
 
-               while (drbd_make_request_common(mdev, &bp->bio1, start_time))
+               while (__drbd_make_request(mdev, &bp->bio1, start_time))
                        inc_ap_bio(mdev, 1);
 
-               while (drbd_make_request_common(mdev, &bp->bio2, start_time))
+               while (__drbd_make_request(mdev, &bp->bio2, start_time))
                        inc_ap_bio(mdev, 1);
 
                dec_ap_bio(mdev);
@@ -1198,17 +1117,17 @@ void request_timer_fn(unsigned long data)
        struct list_head *le;
        unsigned long et = 0; /* effective timeout = ko_count * timeout */
 
-       if (get_net_conf(mdev)) {
-               et = mdev->net_conf->timeout*HZ/10 * mdev->net_conf->ko_count;
-               put_net_conf(mdev);
+       if (get_net_conf(mdev->tconn)) {
+               et = mdev->tconn->net_conf->timeout*HZ/10 * mdev->tconn->net_conf->ko_count;
+               put_net_conf(mdev->tconn);
        }
        if (!et || mdev->state.conn < C_WF_REPORT_PARAMS)
                return; /* Recurring timer stopped */
 
-       spin_lock_irq(&mdev->req_lock);
-       le = &mdev->oldest_tle->requests;
+       spin_lock_irq(&mdev->tconn->req_lock);
+       le = &mdev->tconn->oldest_tle->requests;
        if (list_empty(le)) {
-               spin_unlock_irq(&mdev->req_lock);
+               spin_unlock_irq(&mdev->tconn->req_lock);
                mod_timer(&mdev->request_timer, jiffies + et);
                return;
        }
@@ -1227,5 +1146,5 @@ void request_timer_fn(unsigned long data)
                mod_timer(&mdev->request_timer, req->start_time + et);
        }
 
-       spin_unlock_irq(&mdev->req_lock);
+       spin_unlock_irq(&mdev->tconn->req_lock);
 }