xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
authorChuck Lever <chuck.lever@oracle.com>
Fri, 4 Mar 2016 16:28:36 +0000 (11:28 -0500)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Mon, 14 Mar 2016 18:56:04 +0000 (14:56 -0400)
Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2498e
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, xprtrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Because each ib_cqe carries a pointer to a completion method, the
core can now post its own operations on a consumer's QP, and handle
the completions itself, without changes to the consumer.

xprtrdma's reply processing is already handled in a work queue, but
there is some initial order-dependent processing that is done in the
soft IRQ context before a work item is scheduled.

IB_POLL_SOFTIRQ is a direct replacement for the current xprtrdma
receive code path.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Devesh Sharma <devesh.sharma@broadcom.com>
Reviewed-by: Sagi Grimberg <sagig@mellanox.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index fc1ef5f144b866f752000db2603bf0b668af2fad..05779f48745b701355813d6f0dd7e0e5b780ee52 100644 (file)
@@ -212,11 +212,18 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
        atomic_set(&buffer->rb_credits, credits);
 }
 
+/**
+ * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * @cq:        completion queue (ignored)
+ * @wc:        completed WR
+ *
+ */
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc)
+rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
 {
-       struct rpcrdma_rep *rep =
-                       (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
+                                              rr_cqe);
 
        /* WARNING: Only wr_id and status are reliable at this point */
        if (wc->status != IB_WC_SUCCESS)
@@ -242,55 +249,20 @@ out_schedule:
 
 out_fail:
        if (wc->status != IB_WC_WR_FLUSH_ERR)
-               pr_err("RPC:       %s: rep %p: %s\n",
-                      __func__, rep, ib_wc_status_msg(wc->status));
+               pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
+                      ib_wc_status_msg(wc->status),
+                      wc->status, wc->vendor_err);
        rep->rr_len = RPCRDMA_BAD_LEN;
        goto out_schedule;
 }
 
-/* The wc array is on stack: automatic memory is always CPU-local.
- *
- * struct ib_wc is 64 bytes, making the poll array potentially
- * large. But this is at the bottom of the call chain. Further
- * substantial work is done in another thread.
- */
-static void
-rpcrdma_recvcq_poll(struct ib_cq *cq)
-{
-       struct ib_wc *pos, wcs[4];
-       int count, rc;
-
-       do {
-               pos = wcs;
-
-               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
-               if (rc < 0)
-                       break;
-
-               count = rc;
-               while (count-- > 0)
-                       rpcrdma_recvcq_process_wc(pos++);
-       } while (rc == ARRAY_SIZE(wcs));
-}
-
-/* Handle provider receive completion upcalls.
- */
-static void
-rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
-{
-       do {
-               rpcrdma_recvcq_poll(cq);
-       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
-                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
-}
-
 static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
        struct ib_wc wc;
 
        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-               rpcrdma_recvcq_process_wc(&wc);
+               rpcrdma_receive_wc(NULL, &wc);
        while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
                rpcrdma_sendcq_process_wc(&wc);
 }
@@ -655,9 +627,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                goto out2;
        }
 
-       cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
-       recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
-                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+       recvcq = ib_alloc_cq(ia->ri_device, NULL,
+                            ep->rep_attr.cap.max_recv_wr + 1,
+                            0, IB_POLL_SOFTIRQ);
        if (IS_ERR(recvcq)) {
                rc = PTR_ERR(recvcq);
                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -665,14 +637,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                goto out2;
        }
 
-       rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
-       if (rc) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               ib_destroy_cq(recvcq);
-               goto out2;
-       }
-
        ep->rep_attr.send_cq = sendcq;
        ep->rep_attr.recv_cq = recvcq;
 
@@ -735,10 +699,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
                ia->ri_id->qp = NULL;
        }
 
-       rc = ib_destroy_cq(ep->rep_attr.recv_cq);
-       if (rc)
-               dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-                       __func__, rc);
+       ib_free_cq(ep->rep_attr.recv_cq);
 
        rc = ib_destroy_cq(ep->rep_attr.send_cq);
        if (rc)
@@ -947,6 +908,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
        }
 
        rep->rr_device = ia->ri_device;
+       rep->rr_cqe.done = rpcrdma_receive_wc;
        rep->rr_rxprt = r_xprt;
        INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
        return rep;
@@ -1322,7 +1284,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
        int rc;
 
        recv_wr.next = NULL;
-       recv_wr.wr_id = (u64) (unsigned long) rep;
+       recv_wr.wr_cqe = &rep->rr_cqe;
        recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
        recv_wr.num_sge = 1;
 
index 7bf6f43fa4b9654dcc7f33255f310852675f2d95..d60feb9aa6319c7816496ebc62e1374be76aeffd 100644 (file)
@@ -171,6 +171,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
 struct rpcrdma_buffer;
 
 struct rpcrdma_rep {
+       struct ib_cqe           rr_cqe;
        unsigned int            rr_len;
        struct ib_device        *rr_device;
        struct rpcrdma_xprt     *rr_rxprt;