xprtrdma: Split the completion queue
[linux-2.6-block.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/pci.h>  /* for Tavor hack below */
52 #include <linux/slab.h>
53
54 #include "xprt_rdma.h"
55
56 /*
57  * Globals/Macros
58  */
59
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY        RPCDBG_TRANS
62 #endif
63
64 /*
65  * internal functions
66  */
67
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80         struct rpcrdma_rep *rep;
81         void (*func)(struct rpcrdma_rep *);
82         unsigned long flags;
83
84         data = data;
85         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86         while (!list_empty(&rpcrdma_tasklets_g)) {
87                 rep = list_entry(rpcrdma_tasklets_g.next,
88                                  struct rpcrdma_rep, rr_list);
89                 list_del(&rep->rr_list);
90                 func = rep->rr_func;
91                 rep->rr_func = NULL;
92                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93
94                 if (func)
95                         func(rep);
96                 else
97                         rpcrdma_recv_buffer_put(rep);
98
99                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100         }
101         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109         unsigned long flags;
110
111         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114         tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120         struct rpcrdma_ep *ep = context;
121
122         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123                 __func__, event->event, event->device->name, context);
124         if (ep->rep_connected == 1) {
125                 ep->rep_connected = -EIO;
126                 ep->rep_func(ep);
127                 wake_up_all(&ep->rep_connect_wait);
128         }
129 }
130
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134         struct rpcrdma_ep *ep = context;
135
136         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137                 __func__, event->event, event->device->name, context);
138         if (ep->rep_connected == 1) {
139                 ep->rep_connected = -EIO;
140                 ep->rep_func(ep);
141                 wake_up_all(&ep->rep_connect_wait);
142         }
143 }
144
145 static void
146 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147 {
148         struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
149
150         dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
151                 __func__, frmr, wc->status, wc->opcode);
152
153         if (wc->wr_id == 0ULL)
154                 return;
155         if (wc->status != IB_WC_SUCCESS)
156                 return;
157
158         if (wc->opcode == IB_WC_FAST_REG_MR)
159                 frmr->r.frmr.state = FRMR_IS_VALID;
160         else if (wc->opcode == IB_WC_LOCAL_INV)
161                 frmr->r.frmr.state = FRMR_IS_INVALID;
162 }
163
164 static int
165 rpcrdma_sendcq_poll(struct ib_cq *cq)
166 {
167         struct ib_wc wc;
168         int rc;
169
170         while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
171                 rpcrdma_sendcq_process_wc(&wc);
172         return rc;
173 }
174
175 /*
176  * Handle send, fast_reg_mr, and local_inv completions.
177  *
178  * Send events are typically suppressed and thus do not result
179  * in an upcall. Occasionally one is signaled, however. This
180  * prevents the provider's completion queue from wrapping and
181  * losing a completion.
182  */
183 static void
184 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
185 {
186         int rc;
187
188         rc = rpcrdma_sendcq_poll(cq);
189         if (rc) {
190                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
191                         __func__, rc);
192                 return;
193         }
194
195         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
196         if (rc) {
197                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
198                         __func__, rc);
199                 return;
200         }
201
202         rpcrdma_sendcq_poll(cq);
203 }
204
205 static void
206 rpcrdma_recvcq_process_wc(struct ib_wc *wc)
207 {
208         struct rpcrdma_rep *rep =
209                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
210
211         dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
212                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
213
214         if (wc->status != IB_WC_SUCCESS) {
215                 rep->rr_len = ~0U;
216                 goto out_schedule;
217         }
218         if (wc->opcode != IB_WC_RECV)
219                 return;
220
221         rep->rr_len = wc->byte_len;
222         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
223                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
224
225         if (rep->rr_len >= 16) {
226                 struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
227                 unsigned int credits = ntohl(p->rm_credit);
228
229                 if (credits == 0)
230                         credits = 1;    /* don't deadlock */
231                 else if (credits > rep->rr_buffer->rb_max_requests)
232                         credits = rep->rr_buffer->rb_max_requests;
233                 atomic_set(&rep->rr_buffer->rb_credits, credits);
234         }
235
236 out_schedule:
237         rpcrdma_schedule_tasklet(rep);
238 }
239
240 static int
241 rpcrdma_recvcq_poll(struct ib_cq *cq)
242 {
243         struct ib_wc wc;
244         int rc;
245
246         while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
247                 rpcrdma_recvcq_process_wc(&wc);
248         return rc;
249 }
250
251 /*
252  * Handle receive completions.
253  *
254  * It is reentrant but processes single events in order to maintain
255  * ordering of receives to keep server credits.
256  *
257  * It is the responsibility of the scheduled tasklet to return
258  * recv buffers to the pool. NOTE: this affects synchronization of
259  * connection shutdown. That is, the structures required for
260  * the completion of the reply handler must remain intact until
261  * all memory has been reclaimed.
262  */
263 static void
264 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
265 {
266         int rc;
267
268         rc = rpcrdma_recvcq_poll(cq);
269         if (rc) {
270                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
271                         __func__, rc);
272                 return;
273         }
274
275         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
276         if (rc) {
277                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
278                         __func__, rc);
279                 return;
280         }
281
282         rpcrdma_recvcq_poll(cq);
283 }
284
285 #ifdef RPC_DEBUG
286 static const char * const conn[] = {
287         "address resolved",
288         "address error",
289         "route resolved",
290         "route error",
291         "connect request",
292         "connect response",
293         "connect error",
294         "unreachable",
295         "rejected",
296         "established",
297         "disconnected",
298         "device removal"
299 };
300 #endif
301
302 static int
303 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
304 {
305         struct rpcrdma_xprt *xprt = id->context;
306         struct rpcrdma_ia *ia = &xprt->rx_ia;
307         struct rpcrdma_ep *ep = &xprt->rx_ep;
308 #ifdef RPC_DEBUG
309         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
310 #endif
311         struct ib_qp_attr attr;
312         struct ib_qp_init_attr iattr;
313         int connstate = 0;
314
315         switch (event->event) {
316         case RDMA_CM_EVENT_ADDR_RESOLVED:
317         case RDMA_CM_EVENT_ROUTE_RESOLVED:
318                 ia->ri_async_rc = 0;
319                 complete(&ia->ri_done);
320                 break;
321         case RDMA_CM_EVENT_ADDR_ERROR:
322                 ia->ri_async_rc = -EHOSTUNREACH;
323                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
324                         __func__, ep);
325                 complete(&ia->ri_done);
326                 break;
327         case RDMA_CM_EVENT_ROUTE_ERROR:
328                 ia->ri_async_rc = -ENETUNREACH;
329                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
330                         __func__, ep);
331                 complete(&ia->ri_done);
332                 break;
333         case RDMA_CM_EVENT_ESTABLISHED:
334                 connstate = 1;
335                 ib_query_qp(ia->ri_id->qp, &attr,
336                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
337                         &iattr);
338                 dprintk("RPC:       %s: %d responder resources"
339                         " (%d initiator)\n",
340                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
341                 goto connected;
342         case RDMA_CM_EVENT_CONNECT_ERROR:
343                 connstate = -ENOTCONN;
344                 goto connected;
345         case RDMA_CM_EVENT_UNREACHABLE:
346                 connstate = -ENETDOWN;
347                 goto connected;
348         case RDMA_CM_EVENT_REJECTED:
349                 connstate = -ECONNREFUSED;
350                 goto connected;
351         case RDMA_CM_EVENT_DISCONNECTED:
352                 connstate = -ECONNABORTED;
353                 goto connected;
354         case RDMA_CM_EVENT_DEVICE_REMOVAL:
355                 connstate = -ENODEV;
356 connected:
357                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
358                         __func__,
359                         (event->event <= 11) ? conn[event->event] :
360                                                 "unknown connection error",
361                         &addr->sin_addr.s_addr,
362                         ntohs(addr->sin_port),
363                         ep, event->event);
364                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
365                 dprintk("RPC:       %s: %sconnected\n",
366                                         __func__, connstate > 0 ? "" : "dis");
367                 ep->rep_connected = connstate;
368                 ep->rep_func(ep);
369                 wake_up_all(&ep->rep_connect_wait);
370                 break;
371         default:
372                 dprintk("RPC:       %s: unexpected CM event %d\n",
373                         __func__, event->event);
374                 break;
375         }
376
377 #ifdef RPC_DEBUG
378         if (connstate == 1) {
379                 int ird = attr.max_dest_rd_atomic;
380                 int tird = ep->rep_remote_cma.responder_resources;
381                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
382                         "on %s, memreg %d slots %d ird %d%s\n",
383                         &addr->sin_addr.s_addr,
384                         ntohs(addr->sin_port),
385                         ia->ri_id->device->name,
386                         ia->ri_memreg_strategy,
387                         xprt->rx_buf.rb_max_requests,
388                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
389         } else if (connstate < 0) {
390                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
391                         &addr->sin_addr.s_addr,
392                         ntohs(addr->sin_port),
393                         connstate);
394         }
395 #endif
396
397         return 0;
398 }
399
400 static struct rdma_cm_id *
401 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
402                         struct rpcrdma_ia *ia, struct sockaddr *addr)
403 {
404         struct rdma_cm_id *id;
405         int rc;
406
407         init_completion(&ia->ri_done);
408
409         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
410         if (IS_ERR(id)) {
411                 rc = PTR_ERR(id);
412                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
413                         __func__, rc);
414                 return id;
415         }
416
417         ia->ri_async_rc = -ETIMEDOUT;
418         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
419         if (rc) {
420                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
421                         __func__, rc);
422                 goto out;
423         }
424         wait_for_completion_interruptible_timeout(&ia->ri_done,
425                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
426         rc = ia->ri_async_rc;
427         if (rc)
428                 goto out;
429
430         ia->ri_async_rc = -ETIMEDOUT;
431         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
432         if (rc) {
433                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
434                         __func__, rc);
435                 goto out;
436         }
437         wait_for_completion_interruptible_timeout(&ia->ri_done,
438                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
439         rc = ia->ri_async_rc;
440         if (rc)
441                 goto out;
442
443         return id;
444
445 out:
446         rdma_destroy_id(id);
447         return ERR_PTR(rc);
448 }
449
450 /*
451  * Drain any cq, prior to teardown.
452  */
453 static void
454 rpcrdma_clean_cq(struct ib_cq *cq)
455 {
456         struct ib_wc wc;
457         int count = 0;
458
459         while (1 == ib_poll_cq(cq, 1, &wc))
460                 ++count;
461
462         if (count)
463                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
464                         __func__, count, wc.opcode);
465 }
466
467 /*
468  * Exported functions.
469  */
470
471 /*
472  * Open and initialize an Interface Adapter.
473  *  o initializes fields of struct rpcrdma_ia, including
474  *    interface and provider attributes and protection zone.
475  */
476 int
477 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
478 {
479         int rc, mem_priv;
480         struct ib_device_attr devattr;
481         struct rpcrdma_ia *ia = &xprt->rx_ia;
482
483         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
484         if (IS_ERR(ia->ri_id)) {
485                 rc = PTR_ERR(ia->ri_id);
486                 goto out1;
487         }
488
489         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
490         if (IS_ERR(ia->ri_pd)) {
491                 rc = PTR_ERR(ia->ri_pd);
492                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
493                         __func__, rc);
494                 goto out2;
495         }
496
497         /*
498          * Query the device to determine if the requested memory
499          * registration strategy is supported. If it isn't, set the
500          * strategy to a globally supported model.
501          */
502         rc = ib_query_device(ia->ri_id->device, &devattr);
503         if (rc) {
504                 dprintk("RPC:       %s: ib_query_device failed %d\n",
505                         __func__, rc);
506                 goto out2;
507         }
508
509         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
510                 ia->ri_have_dma_lkey = 1;
511                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
512         }
513
514         if (memreg == RPCRDMA_FRMR) {
515                 /* Requires both frmr reg and local dma lkey */
516                 if ((devattr.device_cap_flags &
517                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
518                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
519                         dprintk("RPC:       %s: FRMR registration "
520                                 "not supported by HCA\n", __func__);
521                         memreg = RPCRDMA_MTHCAFMR;
522                 } else {
523                         /* Mind the ia limit on FRMR page list depth */
524                         ia->ri_max_frmr_depth = min_t(unsigned int,
525                                 RPCRDMA_MAX_DATA_SEGS,
526                                 devattr.max_fast_reg_page_list_len);
527                 }
528         }
529         if (memreg == RPCRDMA_MTHCAFMR) {
530                 if (!ia->ri_id->device->alloc_fmr) {
531                         dprintk("RPC:       %s: MTHCAFMR registration "
532                                 "not supported by HCA\n", __func__);
533 #if RPCRDMA_PERSISTENT_REGISTRATION
534                         memreg = RPCRDMA_ALLPHYSICAL;
535 #else
536                         rc = -ENOMEM;
537                         goto out2;
538 #endif
539                 }
540         }
541
542         /*
543          * Optionally obtain an underlying physical identity mapping in
544          * order to do a memory window-based bind. This base registration
545          * is protected from remote access - that is enabled only by binding
546          * for the specific bytes targeted during each RPC operation, and
547          * revoked after the corresponding completion similar to a storage
548          * adapter.
549          */
550         switch (memreg) {
551         case RPCRDMA_FRMR:
552                 break;
553 #if RPCRDMA_PERSISTENT_REGISTRATION
554         case RPCRDMA_ALLPHYSICAL:
555                 mem_priv = IB_ACCESS_LOCAL_WRITE |
556                                 IB_ACCESS_REMOTE_WRITE |
557                                 IB_ACCESS_REMOTE_READ;
558                 goto register_setup;
559 #endif
560         case RPCRDMA_MTHCAFMR:
561                 if (ia->ri_have_dma_lkey)
562                         break;
563                 mem_priv = IB_ACCESS_LOCAL_WRITE;
564 #if RPCRDMA_PERSISTENT_REGISTRATION
565         register_setup:
566 #endif
567                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
568                 if (IS_ERR(ia->ri_bind_mem)) {
569                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
570                                 "phys register failed with %lX\n",
571                                 __func__, PTR_ERR(ia->ri_bind_mem));
572                         rc = -ENOMEM;
573                         goto out2;
574                 }
575                 break;
576         default:
577                 printk(KERN_ERR "RPC: Unsupported memory "
578                                 "registration mode: %d\n", memreg);
579                 rc = -ENOMEM;
580                 goto out2;
581         }
582         dprintk("RPC:       %s: memory registration strategy is %d\n",
583                 __func__, memreg);
584
585         /* Else will do memory reg/dereg for each chunk */
586         ia->ri_memreg_strategy = memreg;
587
588         return 0;
589 out2:
590         rdma_destroy_id(ia->ri_id);
591         ia->ri_id = NULL;
592 out1:
593         return rc;
594 }
595
596 /*
597  * Clean up/close an IA.
598  *   o if event handles and PD have been initialized, free them.
599  *   o close the IA
600  */
601 void
602 rpcrdma_ia_close(struct rpcrdma_ia *ia)
603 {
604         int rc;
605
606         dprintk("RPC:       %s: entering\n", __func__);
607         if (ia->ri_bind_mem != NULL) {
608                 rc = ib_dereg_mr(ia->ri_bind_mem);
609                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
610                         __func__, rc);
611         }
612         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
613                 if (ia->ri_id->qp)
614                         rdma_destroy_qp(ia->ri_id);
615                 rdma_destroy_id(ia->ri_id);
616                 ia->ri_id = NULL;
617         }
618         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
619                 rc = ib_dealloc_pd(ia->ri_pd);
620                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
621                         __func__, rc);
622         }
623 }
624
625 /*
626  * Create unconnected endpoint.
627  */
628 int
629 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
630                                 struct rpcrdma_create_data_internal *cdata)
631 {
632         struct ib_device_attr devattr;
633         struct ib_cq *sendcq, *recvcq;
634         int rc, err;
635
636         rc = ib_query_device(ia->ri_id->device, &devattr);
637         if (rc) {
638                 dprintk("RPC:       %s: ib_query_device failed %d\n",
639                         __func__, rc);
640                 return rc;
641         }
642
643         /* check provider's send/recv wr limits */
644         if (cdata->max_requests > devattr.max_qp_wr)
645                 cdata->max_requests = devattr.max_qp_wr;
646
647         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
648         ep->rep_attr.qp_context = ep;
649         /* send_cq and recv_cq initialized below */
650         ep->rep_attr.srq = NULL;
651         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
652         switch (ia->ri_memreg_strategy) {
653         case RPCRDMA_FRMR: {
654                 int depth = 7;
655
656                 /* Add room for frmr register and invalidate WRs.
657                  * 1. FRMR reg WR for head
658                  * 2. FRMR invalidate WR for head
659                  * 3. N FRMR reg WRs for pagelist
660                  * 4. N FRMR invalidate WRs for pagelist
661                  * 5. FRMR reg WR for tail
662                  * 6. FRMR invalidate WR for tail
663                  * 7. The RDMA_SEND WR
664                  */
665
666                 /* Calculate N if the device max FRMR depth is smaller than
667                  * RPCRDMA_MAX_DATA_SEGS.
668                  */
669                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
670                         int delta = RPCRDMA_MAX_DATA_SEGS -
671                                     ia->ri_max_frmr_depth;
672
673                         do {
674                                 depth += 2; /* FRMR reg + invalidate */
675                                 delta -= ia->ri_max_frmr_depth;
676                         } while (delta > 0);
677
678                 }
679                 ep->rep_attr.cap.max_send_wr *= depth;
680                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
681                         cdata->max_requests = devattr.max_qp_wr / depth;
682                         if (!cdata->max_requests)
683                                 return -EINVAL;
684                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
685                                                        depth;
686                 }
687                 break;
688         }
689         default:
690                 break;
691         }
692         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
693         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
694         ep->rep_attr.cap.max_recv_sge = 1;
695         ep->rep_attr.cap.max_inline_data = 0;
696         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
697         ep->rep_attr.qp_type = IB_QPT_RC;
698         ep->rep_attr.port_num = ~0;
699
700         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
701                 "iovs: send %d recv %d\n",
702                 __func__,
703                 ep->rep_attr.cap.max_send_wr,
704                 ep->rep_attr.cap.max_recv_wr,
705                 ep->rep_attr.cap.max_send_sge,
706                 ep->rep_attr.cap.max_recv_sge);
707
708         /* set trigger for requesting send completion */
709         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
710         if (ep->rep_cqinit <= 2)
711                 ep->rep_cqinit = 0;
712         INIT_CQCOUNT(ep);
713         ep->rep_ia = ia;
714         init_waitqueue_head(&ep->rep_connect_wait);
715         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
716
717         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
718                                   rpcrdma_cq_async_error_upcall, NULL,
719                                   ep->rep_attr.cap.max_send_wr + 1, 0);
720         if (IS_ERR(sendcq)) {
721                 rc = PTR_ERR(sendcq);
722                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
723                         __func__, rc);
724                 goto out1;
725         }
726
727         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
728         if (rc) {
729                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
730                         __func__, rc);
731                 goto out2;
732         }
733
734         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
735                                   rpcrdma_cq_async_error_upcall, NULL,
736                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
737         if (IS_ERR(recvcq)) {
738                 rc = PTR_ERR(recvcq);
739                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
740                         __func__, rc);
741                 goto out2;
742         }
743
744         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
745         if (rc) {
746                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
747                         __func__, rc);
748                 ib_destroy_cq(recvcq);
749                 goto out2;
750         }
751
752         ep->rep_attr.send_cq = sendcq;
753         ep->rep_attr.recv_cq = recvcq;
754
755         /* Initialize cma parameters */
756
757         /* RPC/RDMA does not use private data */
758         ep->rep_remote_cma.private_data = NULL;
759         ep->rep_remote_cma.private_data_len = 0;
760
761         /* Client offers RDMA Read but does not initiate */
762         ep->rep_remote_cma.initiator_depth = 0;
763         if (devattr.max_qp_rd_atom > 32)        /* arbitrary but <= 255 */
764                 ep->rep_remote_cma.responder_resources = 32;
765         else
766                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
767
768         ep->rep_remote_cma.retry_count = 7;
769         ep->rep_remote_cma.flow_control = 0;
770         ep->rep_remote_cma.rnr_retry_count = 0;
771
772         return 0;
773
774 out2:
775         err = ib_destroy_cq(sendcq);
776         if (err)
777                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
778                         __func__, err);
779 out1:
780         return rc;
781 }
782
783 /*
784  * rpcrdma_ep_destroy
785  *
786  * Disconnect and destroy endpoint. After this, the only
787  * valid operations on the ep are to free it (if dynamically
788  * allocated) or re-create it.
789  */
790 void
791 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
792 {
793         int rc;
794
795         dprintk("RPC:       %s: entering, connected is %d\n",
796                 __func__, ep->rep_connected);
797
798         cancel_delayed_work_sync(&ep->rep_connect_worker);
799
800         if (ia->ri_id->qp) {
801                 rc = rpcrdma_ep_disconnect(ep, ia);
802                 if (rc)
803                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
804                                 " returned %i\n", __func__, rc);
805                 rdma_destroy_qp(ia->ri_id);
806                 ia->ri_id->qp = NULL;
807         }
808
809         /* padding - could be done in rpcrdma_buffer_destroy... */
810         if (ep->rep_pad_mr) {
811                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
812                 ep->rep_pad_mr = NULL;
813         }
814
815         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
816         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
817         if (rc)
818                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
819                         __func__, rc);
820
821         rpcrdma_clean_cq(ep->rep_attr.send_cq);
822         rc = ib_destroy_cq(ep->rep_attr.send_cq);
823         if (rc)
824                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
825                         __func__, rc);
826 }
827
828 /*
829  * Connect unconnected endpoint.
830  */
831 int
832 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
833 {
834         struct rdma_cm_id *id;
835         int rc = 0;
836         int retry_count = 0;
837
838         if (ep->rep_connected != 0) {
839                 struct rpcrdma_xprt *xprt;
840 retry:
841                 rc = rpcrdma_ep_disconnect(ep, ia);
842                 if (rc && rc != -ENOTCONN)
843                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
844                                 " status %i\n", __func__, rc);
845
846                 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
847                 rpcrdma_clean_cq(ep->rep_attr.send_cq);
848
849                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
850                 id = rpcrdma_create_id(xprt, ia,
851                                 (struct sockaddr *)&xprt->rx_data.addr);
852                 if (IS_ERR(id)) {
853                         rc = PTR_ERR(id);
854                         goto out;
855                 }
856                 /* TEMP TEMP TEMP - fail if new device:
857                  * Deregister/remarshal *all* requests!
858                  * Close and recreate adapter, pd, etc!
859                  * Re-determine all attributes still sane!
860                  * More stuff I haven't thought of!
861                  * Rrrgh!
862                  */
863                 if (ia->ri_id->device != id->device) {
864                         printk("RPC:       %s: can't reconnect on "
865                                 "different device!\n", __func__);
866                         rdma_destroy_id(id);
867                         rc = -ENETDOWN;
868                         goto out;
869                 }
870                 /* END TEMP */
871                 rdma_destroy_qp(ia->ri_id);
872                 rdma_destroy_id(ia->ri_id);
873                 ia->ri_id = id;
874         }
875
876         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
877         if (rc) {
878                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
879                         __func__, rc);
880                 goto out;
881         }
882
883 /* XXX Tavor device performs badly with 2K MTU! */
884 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
885         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
886         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
887             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
888              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
889                 struct ib_qp_attr attr = {
890                         .path_mtu = IB_MTU_1024
891                 };
892                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
893         }
894 }
895
896         ep->rep_connected = 0;
897
898         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
899         if (rc) {
900                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
901                                 __func__, rc);
902                 goto out;
903         }
904
905         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
906
907         /*
908          * Check state. A non-peer reject indicates no listener
909          * (ECONNREFUSED), which may be a transient state. All
910          * others indicate a transport condition which has already
911          * undergone a best-effort.
912          */
913         if (ep->rep_connected == -ECONNREFUSED &&
914             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
915                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
916                 goto retry;
917         }
918         if (ep->rep_connected <= 0) {
919                 /* Sometimes, the only way to reliably connect to remote
920                  * CMs is to use same nonzero values for ORD and IRD. */
921                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
922                     (ep->rep_remote_cma.responder_resources == 0 ||
923                      ep->rep_remote_cma.initiator_depth !=
924                                 ep->rep_remote_cma.responder_resources)) {
925                         if (ep->rep_remote_cma.responder_resources == 0)
926                                 ep->rep_remote_cma.responder_resources = 1;
927                         ep->rep_remote_cma.initiator_depth =
928                                 ep->rep_remote_cma.responder_resources;
929                         goto retry;
930                 }
931                 rc = ep->rep_connected;
932         } else {
933                 dprintk("RPC:       %s: connected\n", __func__);
934         }
935
936 out:
937         if (rc)
938                 ep->rep_connected = rc;
939         return rc;
940 }
941
942 /*
943  * rpcrdma_ep_disconnect
944  *
945  * This is separate from destroy to facilitate the ability
946  * to reconnect without recreating the endpoint.
947  *
948  * This call is not reentrant, and must not be made in parallel
949  * on the same endpoint.
950  */
951 int
952 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
953 {
954         int rc;
955
956         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
957         rpcrdma_clean_cq(ep->rep_attr.send_cq);
958         rc = rdma_disconnect(ia->ri_id);
959         if (!rc) {
960                 /* returns without wait if not connected */
961                 wait_event_interruptible(ep->rep_connect_wait,
962                                                         ep->rep_connected != 1);
963                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
964                         (ep->rep_connected == 1) ? "still " : "dis");
965         } else {
966                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
967                 ep->rep_connected = rc;
968         }
969         return rc;
970 }
971
972 /*
973  * Initialize buffer memory
974  */
975 int
976 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
977         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
978 {
979         char *p;
980         size_t len;
981         int i, rc;
982         struct rpcrdma_mw *r;
983
984         buf->rb_max_requests = cdata->max_requests;
985         spin_lock_init(&buf->rb_lock);
986         atomic_set(&buf->rb_credits, 1);
987
988         /* Need to allocate:
989          *   1.  arrays for send and recv pointers
990          *   2.  arrays of struct rpcrdma_req to fill in pointers
991          *   3.  array of struct rpcrdma_rep for replies
992          *   4.  padding, if any
993          *   5.  mw's, fmr's or frmr's, if any
994          * Send/recv buffers in req/rep need to be registered
995          */
996
997         len = buf->rb_max_requests *
998                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
999         len += cdata->padding;
1000         switch (ia->ri_memreg_strategy) {
1001         case RPCRDMA_FRMR:
1002                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1003                                 sizeof(struct rpcrdma_mw);
1004                 break;
1005         case RPCRDMA_MTHCAFMR:
1006                 /* TBD we are perhaps overallocating here */
1007                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1008                                 sizeof(struct rpcrdma_mw);
1009                 break;
1010         default:
1011                 break;
1012         }
1013
1014         /* allocate 1, 4 and 5 in one shot */
1015         p = kzalloc(len, GFP_KERNEL);
1016         if (p == NULL) {
1017                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1018                         __func__, len);
1019                 rc = -ENOMEM;
1020                 goto out;
1021         }
1022         buf->rb_pool = p;       /* for freeing it later */
1023
1024         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1025         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1026         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1027         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1028
1029         /*
1030          * Register the zeroed pad buffer, if any.
1031          */
1032         if (cdata->padding) {
1033                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1034                                             &ep->rep_pad_mr, &ep->rep_pad);
1035                 if (rc)
1036                         goto out;
1037         }
1038         p += cdata->padding;
1039
1040         INIT_LIST_HEAD(&buf->rb_mws);
1041         r = (struct rpcrdma_mw *)p;
1042         switch (ia->ri_memreg_strategy) {
1043         case RPCRDMA_FRMR:
1044                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1045                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1046                                                 ia->ri_max_frmr_depth);
1047                         if (IS_ERR(r->r.frmr.fr_mr)) {
1048                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1049                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1050                                         " failed %i\n", __func__, rc);
1051                                 goto out;
1052                         }
1053                         r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1054                                                 ia->ri_id->device,
1055                                                 ia->ri_max_frmr_depth);
1056                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1057                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1058                                 dprintk("RPC:       %s: "
1059                                         "ib_alloc_fast_reg_page_list "
1060                                         "failed %i\n", __func__, rc);
1061
1062                                 ib_dereg_mr(r->r.frmr.fr_mr);
1063                                 goto out;
1064                         }
1065                         list_add(&r->mw_list, &buf->rb_mws);
1066                         ++r;
1067                 }
1068                 break;
1069         case RPCRDMA_MTHCAFMR:
1070                 /* TBD we are perhaps overallocating here */
1071                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1072                         static struct ib_fmr_attr fa =
1073                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1074                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1075                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1076                                 &fa);
1077                         if (IS_ERR(r->r.fmr)) {
1078                                 rc = PTR_ERR(r->r.fmr);
1079                                 dprintk("RPC:       %s: ib_alloc_fmr"
1080                                         " failed %i\n", __func__, rc);
1081                                 goto out;
1082                         }
1083                         list_add(&r->mw_list, &buf->rb_mws);
1084                         ++r;
1085                 }
1086                 break;
1087         default:
1088                 break;
1089         }
1090
1091         /*
1092          * Allocate/init the request/reply buffers. Doing this
1093          * using kmalloc for now -- one for each buf.
1094          */
1095         for (i = 0; i < buf->rb_max_requests; i++) {
1096                 struct rpcrdma_req *req;
1097                 struct rpcrdma_rep *rep;
1098
1099                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1100                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1101                 /* Typical ~2400b, so rounding up saves work later */
1102                 if (len < 4096)
1103                         len = 4096;
1104                 req = kmalloc(len, GFP_KERNEL);
1105                 if (req == NULL) {
1106                         dprintk("RPC:       %s: request buffer %d alloc"
1107                                 " failed\n", __func__, i);
1108                         rc = -ENOMEM;
1109                         goto out;
1110                 }
1111                 memset(req, 0, sizeof(struct rpcrdma_req));
1112                 buf->rb_send_bufs[i] = req;
1113                 buf->rb_send_bufs[i]->rl_buffer = buf;
1114
1115                 rc = rpcrdma_register_internal(ia, req->rl_base,
1116                                 len - offsetof(struct rpcrdma_req, rl_base),
1117                                 &buf->rb_send_bufs[i]->rl_handle,
1118                                 &buf->rb_send_bufs[i]->rl_iov);
1119                 if (rc)
1120                         goto out;
1121
1122                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1123
1124                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1125                 rep = kmalloc(len, GFP_KERNEL);
1126                 if (rep == NULL) {
1127                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1128                                 __func__, i);
1129                         rc = -ENOMEM;
1130                         goto out;
1131                 }
1132                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1133                 buf->rb_recv_bufs[i] = rep;
1134                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1135
1136                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1137                                 len - offsetof(struct rpcrdma_rep, rr_base),
1138                                 &buf->rb_recv_bufs[i]->rr_handle,
1139                                 &buf->rb_recv_bufs[i]->rr_iov);
1140                 if (rc)
1141                         goto out;
1142
1143         }
1144         dprintk("RPC:       %s: max_requests %d\n",
1145                 __func__, buf->rb_max_requests);
1146         /* done */
1147         return 0;
1148 out:
1149         rpcrdma_buffer_destroy(buf);
1150         return rc;
1151 }
1152
1153 /*
1154  * Unregister and destroy buffer memory. Need to deal with
1155  * partial initialization, so it's callable from failed create.
1156  * Must be called before destroying endpoint, as registrations
1157  * reference it.
1158  */
1159 void
1160 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1161 {
1162         int rc, i;
1163         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1164         struct rpcrdma_mw *r;
1165
1166         /* clean up in reverse order from create
1167          *   1.  recv mr memory (mr free, then kfree)
1168          *   2.  send mr memory (mr free, then kfree)
1169          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1170          *   4.  arrays
1171          */
1172         dprintk("RPC:       %s: entering\n", __func__);
1173
1174         for (i = 0; i < buf->rb_max_requests; i++) {
1175                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1176                         rpcrdma_deregister_internal(ia,
1177                                         buf->rb_recv_bufs[i]->rr_handle,
1178                                         &buf->rb_recv_bufs[i]->rr_iov);
1179                         kfree(buf->rb_recv_bufs[i]);
1180                 }
1181                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1182                         rpcrdma_deregister_internal(ia,
1183                                         buf->rb_send_bufs[i]->rl_handle,
1184                                         &buf->rb_send_bufs[i]->rl_iov);
1185                         kfree(buf->rb_send_bufs[i]);
1186                 }
1187         }
1188
1189         while (!list_empty(&buf->rb_mws)) {
1190                 r = list_entry(buf->rb_mws.next,
1191                         struct rpcrdma_mw, mw_list);
1192                 list_del(&r->mw_list);
1193                 switch (ia->ri_memreg_strategy) {
1194                 case RPCRDMA_FRMR:
1195                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1196                         if (rc)
1197                                 dprintk("RPC:       %s:"
1198                                         " ib_dereg_mr"
1199                                         " failed %i\n",
1200                                         __func__, rc);
1201                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1202                         break;
1203                 case RPCRDMA_MTHCAFMR:
1204                         rc = ib_dealloc_fmr(r->r.fmr);
1205                         if (rc)
1206                                 dprintk("RPC:       %s:"
1207                                         " ib_dealloc_fmr"
1208                                         " failed %i\n",
1209                                         __func__, rc);
1210                         break;
1211                 default:
1212                         break;
1213                 }
1214         }
1215
1216         kfree(buf->rb_pool);
1217 }
1218
1219 /*
1220  * Get a set of request/reply buffers.
1221  *
1222  * Reply buffer (if needed) is attached to send buffer upon return.
1223  * Rule:
1224  *    rb_send_index and rb_recv_index MUST always be pointing to the
1225  *    *next* available buffer (non-NULL). They are incremented after
1226  *    removing buffers, and decremented *before* returning them.
1227  */
1228 struct rpcrdma_req *
1229 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1230 {
1231         struct rpcrdma_req *req;
1232         unsigned long flags;
1233         int i;
1234         struct rpcrdma_mw *r;
1235
1236         spin_lock_irqsave(&buffers->rb_lock, flags);
1237         if (buffers->rb_send_index == buffers->rb_max_requests) {
1238                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1239                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1240                 return ((struct rpcrdma_req *)NULL);
1241         }
1242
1243         req = buffers->rb_send_bufs[buffers->rb_send_index];
1244         if (buffers->rb_send_index < buffers->rb_recv_index) {
1245                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1246                         __func__,
1247                         buffers->rb_recv_index - buffers->rb_send_index);
1248                 req->rl_reply = NULL;
1249         } else {
1250                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1251                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1252         }
1253         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1254         if (!list_empty(&buffers->rb_mws)) {
1255                 i = RPCRDMA_MAX_SEGS - 1;
1256                 do {
1257                         r = list_entry(buffers->rb_mws.next,
1258                                         struct rpcrdma_mw, mw_list);
1259                         list_del(&r->mw_list);
1260                         req->rl_segments[i].mr_chunk.rl_mw = r;
1261                 } while (--i >= 0);
1262         }
1263         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1264         return req;
1265 }
1266
1267 /*
1268  * Put request/reply buffers back into pool.
1269  * Pre-decrement counter/array index.
1270  */
1271 void
1272 rpcrdma_buffer_put(struct rpcrdma_req *req)
1273 {
1274         struct rpcrdma_buffer *buffers = req->rl_buffer;
1275         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1276         int i;
1277         unsigned long flags;
1278
1279         BUG_ON(req->rl_nchunks != 0);
1280         spin_lock_irqsave(&buffers->rb_lock, flags);
1281         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1282         req->rl_niovs = 0;
1283         if (req->rl_reply) {
1284                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1285                 req->rl_reply->rr_func = NULL;
1286                 req->rl_reply = NULL;
1287         }
1288         switch (ia->ri_memreg_strategy) {
1289         case RPCRDMA_FRMR:
1290         case RPCRDMA_MTHCAFMR:
1291                 /*
1292                  * Cycle mw's back in reverse order, and "spin" them.
1293                  * This delays and scrambles reuse as much as possible.
1294                  */
1295                 i = 1;
1296                 do {
1297                         struct rpcrdma_mw **mw;
1298                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1299                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1300                         *mw = NULL;
1301                 } while (++i < RPCRDMA_MAX_SEGS);
1302                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1303                                         &buffers->rb_mws);
1304                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1305                 break;
1306         default:
1307                 break;
1308         }
1309         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1310 }
1311
1312 /*
1313  * Recover reply buffers from pool.
1314  * This happens when recovering from error conditions.
1315  * Post-increment counter/array index.
1316  */
1317 void
1318 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1319 {
1320         struct rpcrdma_buffer *buffers = req->rl_buffer;
1321         unsigned long flags;
1322
1323         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1324                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1325         spin_lock_irqsave(&buffers->rb_lock, flags);
1326         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1327                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1328                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1329         }
1330         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1331 }
1332
1333 /*
1334  * Put reply buffers back into pool when not attached to
1335  * request. This happens in error conditions.
1336  */
1337 void
1338 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1339 {
1340         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1341         unsigned long flags;
1342
1343         rep->rr_func = NULL;
1344         spin_lock_irqsave(&buffers->rb_lock, flags);
1345         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1346         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1347 }
1348
1349 /*
1350  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1351  */
1352
1353 int
1354 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1355                                 struct ib_mr **mrp, struct ib_sge *iov)
1356 {
1357         struct ib_phys_buf ipb;
1358         struct ib_mr *mr;
1359         int rc;
1360
1361         /*
1362          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1363          */
1364         iov->addr = ib_dma_map_single(ia->ri_id->device,
1365                         va, len, DMA_BIDIRECTIONAL);
1366         iov->length = len;
1367
1368         if (ia->ri_have_dma_lkey) {
1369                 *mrp = NULL;
1370                 iov->lkey = ia->ri_dma_lkey;
1371                 return 0;
1372         } else if (ia->ri_bind_mem != NULL) {
1373                 *mrp = NULL;
1374                 iov->lkey = ia->ri_bind_mem->lkey;
1375                 return 0;
1376         }
1377
1378         ipb.addr = iov->addr;
1379         ipb.size = iov->length;
1380         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1381                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1382
1383         dprintk("RPC:       %s: phys convert: 0x%llx "
1384                         "registered 0x%llx length %d\n",
1385                         __func__, (unsigned long long)ipb.addr,
1386                         (unsigned long long)iov->addr, len);
1387
1388         if (IS_ERR(mr)) {
1389                 *mrp = NULL;
1390                 rc = PTR_ERR(mr);
1391                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1392         } else {
1393                 *mrp = mr;
1394                 iov->lkey = mr->lkey;
1395                 rc = 0;
1396         }
1397
1398         return rc;
1399 }
1400
1401 int
1402 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1403                                 struct ib_mr *mr, struct ib_sge *iov)
1404 {
1405         int rc;
1406
1407         ib_dma_unmap_single(ia->ri_id->device,
1408                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1409
1410         if (NULL == mr)
1411                 return 0;
1412
1413         rc = ib_dereg_mr(mr);
1414         if (rc)
1415                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1416         return rc;
1417 }
1418
1419 /*
1420  * Wrappers for chunk registration, shared by read/write chunk code.
1421  */
1422
1423 static void
1424 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1425 {
1426         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1427         seg->mr_dmalen = seg->mr_len;
1428         if (seg->mr_page)
1429                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1430                                 seg->mr_page, offset_in_page(seg->mr_offset),
1431                                 seg->mr_dmalen, seg->mr_dir);
1432         else
1433                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1434                                 seg->mr_offset,
1435                                 seg->mr_dmalen, seg->mr_dir);
1436         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1437                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1438                         __func__,
1439                         (unsigned long long)seg->mr_dma,
1440                         seg->mr_offset, seg->mr_dmalen);
1441         }
1442 }
1443
1444 static void
1445 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1446 {
1447         if (seg->mr_page)
1448                 ib_dma_unmap_page(ia->ri_id->device,
1449                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1450         else
1451                 ib_dma_unmap_single(ia->ri_id->device,
1452                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1453 }
1454
1455 static int
1456 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1457                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1458                         struct rpcrdma_xprt *r_xprt)
1459 {
1460         struct rpcrdma_mr_seg *seg1 = seg;
1461         struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1462
1463         u8 key;
1464         int len, pageoff;
1465         int i, rc;
1466         int seg_len;
1467         u64 pa;
1468         int page_no;
1469
1470         pageoff = offset_in_page(seg1->mr_offset);
1471         seg1->mr_offset -= pageoff;     /* start of page */
1472         seg1->mr_len += pageoff;
1473         len = -pageoff;
1474         if (*nsegs > ia->ri_max_frmr_depth)
1475                 *nsegs = ia->ri_max_frmr_depth;
1476         for (page_no = i = 0; i < *nsegs;) {
1477                 rpcrdma_map_one(ia, seg, writing);
1478                 pa = seg->mr_dma;
1479                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1480                         seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1481                                 page_list[page_no++] = pa;
1482                         pa += PAGE_SIZE;
1483                 }
1484                 len += seg->mr_len;
1485                 ++seg;
1486                 ++i;
1487                 /* Check for holes */
1488                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1489                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1490                         break;
1491         }
1492         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1493                 __func__, seg1->mr_chunk.rl_mw, i);
1494
1495         if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1496                 dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1497                         __func__,
1498                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1499                 /* Invalidate before using. */
1500                 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1501                 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1502                 invalidate_wr.next = &frmr_wr;
1503                 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1504                 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1505                 invalidate_wr.ex.invalidate_rkey =
1506                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1507                 DECR_CQCOUNT(&r_xprt->rx_ep);
1508                 post_wr = &invalidate_wr;
1509         } else
1510                 post_wr = &frmr_wr;
1511
1512         /* Bump the key */
1513         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1514         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1515
1516         /* Prepare FRMR WR */
1517         memset(&frmr_wr, 0, sizeof frmr_wr);
1518         frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1519         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1520         frmr_wr.send_flags = IB_SEND_SIGNALED;
1521         frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1522         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1523         frmr_wr.wr.fast_reg.page_list_len = page_no;
1524         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1525         frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1526         BUG_ON(frmr_wr.wr.fast_reg.length < len);
1527         frmr_wr.wr.fast_reg.access_flags = (writing ?
1528                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1529                                 IB_ACCESS_REMOTE_READ);
1530         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1531         DECR_CQCOUNT(&r_xprt->rx_ep);
1532
1533         rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1534
1535         if (rc) {
1536                 dprintk("RPC:       %s: failed ib_post_send for register,"
1537                         " status %i\n", __func__, rc);
1538                 while (i--)
1539                         rpcrdma_unmap_one(ia, --seg);
1540         } else {
1541                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1542                 seg1->mr_base = seg1->mr_dma + pageoff;
1543                 seg1->mr_nsegs = i;
1544                 seg1->mr_len = len;
1545         }
1546         *nsegs = i;
1547         return rc;
1548 }
1549
1550 static int
1551 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1552                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1553 {
1554         struct rpcrdma_mr_seg *seg1 = seg;
1555         struct ib_send_wr invalidate_wr, *bad_wr;
1556         int rc;
1557
1558         while (seg1->mr_nsegs--)
1559                 rpcrdma_unmap_one(ia, seg++);
1560
1561         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1562         invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1563         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1564         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1565         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1566         DECR_CQCOUNT(&r_xprt->rx_ep);
1567
1568         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1569         if (rc)
1570                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1571                         " status %i\n", __func__, rc);
1572         return rc;
1573 }
1574
1575 static int
1576 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1577                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1578 {
1579         struct rpcrdma_mr_seg *seg1 = seg;
1580         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1581         int len, pageoff, i, rc;
1582
1583         pageoff = offset_in_page(seg1->mr_offset);
1584         seg1->mr_offset -= pageoff;     /* start of page */
1585         seg1->mr_len += pageoff;
1586         len = -pageoff;
1587         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1588                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1589         for (i = 0; i < *nsegs;) {
1590                 rpcrdma_map_one(ia, seg, writing);
1591                 physaddrs[i] = seg->mr_dma;
1592                 len += seg->mr_len;
1593                 ++seg;
1594                 ++i;
1595                 /* Check for holes */
1596                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1597                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1598                         break;
1599         }
1600         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1601                                 physaddrs, i, seg1->mr_dma);
1602         if (rc) {
1603                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1604                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1605                         len, (unsigned long long)seg1->mr_dma,
1606                         pageoff, i, rc);
1607                 while (i--)
1608                         rpcrdma_unmap_one(ia, --seg);
1609         } else {
1610                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1611                 seg1->mr_base = seg1->mr_dma + pageoff;
1612                 seg1->mr_nsegs = i;
1613                 seg1->mr_len = len;
1614         }
1615         *nsegs = i;
1616         return rc;
1617 }
1618
1619 static int
1620 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1621                         struct rpcrdma_ia *ia)
1622 {
1623         struct rpcrdma_mr_seg *seg1 = seg;
1624         LIST_HEAD(l);
1625         int rc;
1626
1627         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1628         rc = ib_unmap_fmr(&l);
1629         while (seg1->mr_nsegs--)
1630                 rpcrdma_unmap_one(ia, seg++);
1631         if (rc)
1632                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1633                         " status %i\n", __func__, rc);
1634         return rc;
1635 }
1636
1637 int
1638 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1639                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1640 {
1641         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1642         int rc = 0;
1643
1644         switch (ia->ri_memreg_strategy) {
1645
1646 #if RPCRDMA_PERSISTENT_REGISTRATION
1647         case RPCRDMA_ALLPHYSICAL:
1648                 rpcrdma_map_one(ia, seg, writing);
1649                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1650                 seg->mr_base = seg->mr_dma;
1651                 seg->mr_nsegs = 1;
1652                 nsegs = 1;
1653                 break;
1654 #endif
1655
1656         /* Registration using frmr registration */
1657         case RPCRDMA_FRMR:
1658                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1659                 break;
1660
1661         /* Registration using fmr memory registration */
1662         case RPCRDMA_MTHCAFMR:
1663                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1664                 break;
1665
1666         default:
1667                 return -1;
1668         }
1669         if (rc)
1670                 return -1;
1671
1672         return nsegs;
1673 }
1674
1675 int
1676 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1677                 struct rpcrdma_xprt *r_xprt)
1678 {
1679         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1680         int nsegs = seg->mr_nsegs, rc;
1681
1682         switch (ia->ri_memreg_strategy) {
1683
1684 #if RPCRDMA_PERSISTENT_REGISTRATION
1685         case RPCRDMA_ALLPHYSICAL:
1686                 BUG_ON(nsegs != 1);
1687                 rpcrdma_unmap_one(ia, seg);
1688                 rc = 0;
1689                 break;
1690 #endif
1691
1692         case RPCRDMA_FRMR:
1693                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1694                 break;
1695
1696         case RPCRDMA_MTHCAFMR:
1697                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1698                 break;
1699
1700         default:
1701                 break;
1702         }
1703         return nsegs;
1704 }
1705
1706 /*
1707  * Prepost any receive buffer, then post send.
1708  *
1709  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1710  */
1711 int
1712 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1713                 struct rpcrdma_ep *ep,
1714                 struct rpcrdma_req *req)
1715 {
1716         struct ib_send_wr send_wr, *send_wr_fail;
1717         struct rpcrdma_rep *rep = req->rl_reply;
1718         int rc;
1719
1720         if (rep) {
1721                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1722                 if (rc)
1723                         goto out;
1724                 req->rl_reply = NULL;
1725         }
1726
1727         send_wr.next = NULL;
1728         send_wr.wr_id = 0ULL;   /* no send cookie */
1729         send_wr.sg_list = req->rl_send_iov;
1730         send_wr.num_sge = req->rl_niovs;
1731         send_wr.opcode = IB_WR_SEND;
1732         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1733                 ib_dma_sync_single_for_device(ia->ri_id->device,
1734                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1735                         DMA_TO_DEVICE);
1736         ib_dma_sync_single_for_device(ia->ri_id->device,
1737                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1738                 DMA_TO_DEVICE);
1739         ib_dma_sync_single_for_device(ia->ri_id->device,
1740                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1741                 DMA_TO_DEVICE);
1742
1743         if (DECR_CQCOUNT(ep) > 0)
1744                 send_wr.send_flags = 0;
1745         else { /* Provider must take a send completion every now and then */
1746                 INIT_CQCOUNT(ep);
1747                 send_wr.send_flags = IB_SEND_SIGNALED;
1748         }
1749
1750         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1751         if (rc)
1752                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1753                         rc);
1754 out:
1755         return rc;
1756 }
1757
1758 /*
1759  * (Re)post a receive buffer.
1760  */
1761 int
1762 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1763                      struct rpcrdma_ep *ep,
1764                      struct rpcrdma_rep *rep)
1765 {
1766         struct ib_recv_wr recv_wr, *recv_wr_fail;
1767         int rc;
1768
1769         recv_wr.next = NULL;
1770         recv_wr.wr_id = (u64) (unsigned long) rep;
1771         recv_wr.sg_list = &rep->rr_iov;
1772         recv_wr.num_sge = 1;
1773
1774         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1775                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1776
1777         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1778
1779         if (rc)
1780                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1781                         rc);
1782         return rc;
1783 }