xprtrmda: Reduce lock contention in completion handlers
[linux-2.6-block.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/pci.h>  /* for Tavor hack below */
52 #include <linux/slab.h>
53
54 #include "xprt_rdma.h"
55
56 /*
57  * Globals/Macros
58  */
59
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY        RPCDBG_TRANS
62 #endif
63
64 /*
65  * internal functions
66  */
67
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80         struct rpcrdma_rep *rep;
81         void (*func)(struct rpcrdma_rep *);
82         unsigned long flags;
83
84         data = data;
85         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86         while (!list_empty(&rpcrdma_tasklets_g)) {
87                 rep = list_entry(rpcrdma_tasklets_g.next,
88                                  struct rpcrdma_rep, rr_list);
89                 list_del(&rep->rr_list);
90                 func = rep->rr_func;
91                 rep->rr_func = NULL;
92                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93
94                 if (func)
95                         func(rep);
96                 else
97                         rpcrdma_recv_buffer_put(rep);
98
99                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100         }
101         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109         unsigned long flags;
110
111         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114         tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120         struct rpcrdma_ep *ep = context;
121
122         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123                 __func__, event->event, event->device->name, context);
124         if (ep->rep_connected == 1) {
125                 ep->rep_connected = -EIO;
126                 ep->rep_func(ep);
127                 wake_up_all(&ep->rep_connect_wait);
128         }
129 }
130
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134         struct rpcrdma_ep *ep = context;
135
136         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137                 __func__, event->event, event->device->name, context);
138         if (ep->rep_connected == 1) {
139                 ep->rep_connected = -EIO;
140                 ep->rep_func(ep);
141                 wake_up_all(&ep->rep_connect_wait);
142         }
143 }
144
145 static void
146 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147 {
148         struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
149
150         dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
151                 __func__, frmr, wc->status, wc->opcode);
152
153         if (wc->wr_id == 0ULL)
154                 return;
155         if (wc->status != IB_WC_SUCCESS)
156                 return;
157
158         if (wc->opcode == IB_WC_FAST_REG_MR)
159                 frmr->r.frmr.state = FRMR_IS_VALID;
160         else if (wc->opcode == IB_WC_LOCAL_INV)
161                 frmr->r.frmr.state = FRMR_IS_INVALID;
162 }
163
164 static int
165 rpcrdma_sendcq_poll(struct ib_cq *cq)
166 {
167         struct ib_wc wc;
168         int rc;
169
170         while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
171                 rpcrdma_sendcq_process_wc(&wc);
172         return rc;
173 }
174
175 /*
176  * Handle send, fast_reg_mr, and local_inv completions.
177  *
178  * Send events are typically suppressed and thus do not result
179  * in an upcall. Occasionally one is signaled, however. This
180  * prevents the provider's completion queue from wrapping and
181  * losing a completion.
182  */
183 static void
184 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
185 {
186         int rc;
187
188         rc = rpcrdma_sendcq_poll(cq);
189         if (rc) {
190                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
191                         __func__, rc);
192                 return;
193         }
194
195         rc = ib_req_notify_cq(cq,
196                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
197         if (rc == 0)
198                 return;
199         if (rc < 0) {
200                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
201                         __func__, rc);
202                 return;
203         }
204
205         rpcrdma_sendcq_poll(cq);
206 }
207
208 static void
209 rpcrdma_recvcq_process_wc(struct ib_wc *wc)
210 {
211         struct rpcrdma_rep *rep =
212                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
213
214         dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
215                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
216
217         if (wc->status != IB_WC_SUCCESS) {
218                 rep->rr_len = ~0U;
219                 goto out_schedule;
220         }
221         if (wc->opcode != IB_WC_RECV)
222                 return;
223
224         rep->rr_len = wc->byte_len;
225         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
226                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
227
228         if (rep->rr_len >= 16) {
229                 struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
230                 unsigned int credits = ntohl(p->rm_credit);
231
232                 if (credits == 0)
233                         credits = 1;    /* don't deadlock */
234                 else if (credits > rep->rr_buffer->rb_max_requests)
235                         credits = rep->rr_buffer->rb_max_requests;
236                 atomic_set(&rep->rr_buffer->rb_credits, credits);
237         }
238
239 out_schedule:
240         rpcrdma_schedule_tasklet(rep);
241 }
242
243 static int
244 rpcrdma_recvcq_poll(struct ib_cq *cq)
245 {
246         struct ib_wc wc;
247         int rc;
248
249         while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
250                 rpcrdma_recvcq_process_wc(&wc);
251         return rc;
252 }
253
254 /*
255  * Handle receive completions.
256  *
257  * It is reentrant but processes single events in order to maintain
258  * ordering of receives to keep server credits.
259  *
260  * It is the responsibility of the scheduled tasklet to return
261  * recv buffers to the pool. NOTE: this affects synchronization of
262  * connection shutdown. That is, the structures required for
263  * the completion of the reply handler must remain intact until
264  * all memory has been reclaimed.
265  */
266 static void
267 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
268 {
269         int rc;
270
271         rc = rpcrdma_recvcq_poll(cq);
272         if (rc) {
273                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
274                         __func__, rc);
275                 return;
276         }
277
278         rc = ib_req_notify_cq(cq,
279                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
280         if (rc == 0)
281                 return;
282         if (rc < 0) {
283                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
284                         __func__, rc);
285                 return;
286         }
287
288         rpcrdma_recvcq_poll(cq);
289 }
290
291 #ifdef RPC_DEBUG
292 static const char * const conn[] = {
293         "address resolved",
294         "address error",
295         "route resolved",
296         "route error",
297         "connect request",
298         "connect response",
299         "connect error",
300         "unreachable",
301         "rejected",
302         "established",
303         "disconnected",
304         "device removal"
305 };
306 #endif
307
308 static int
309 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
310 {
311         struct rpcrdma_xprt *xprt = id->context;
312         struct rpcrdma_ia *ia = &xprt->rx_ia;
313         struct rpcrdma_ep *ep = &xprt->rx_ep;
314 #ifdef RPC_DEBUG
315         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
316 #endif
317         struct ib_qp_attr attr;
318         struct ib_qp_init_attr iattr;
319         int connstate = 0;
320
321         switch (event->event) {
322         case RDMA_CM_EVENT_ADDR_RESOLVED:
323         case RDMA_CM_EVENT_ROUTE_RESOLVED:
324                 ia->ri_async_rc = 0;
325                 complete(&ia->ri_done);
326                 break;
327         case RDMA_CM_EVENT_ADDR_ERROR:
328                 ia->ri_async_rc = -EHOSTUNREACH;
329                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
330                         __func__, ep);
331                 complete(&ia->ri_done);
332                 break;
333         case RDMA_CM_EVENT_ROUTE_ERROR:
334                 ia->ri_async_rc = -ENETUNREACH;
335                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
336                         __func__, ep);
337                 complete(&ia->ri_done);
338                 break;
339         case RDMA_CM_EVENT_ESTABLISHED:
340                 connstate = 1;
341                 ib_query_qp(ia->ri_id->qp, &attr,
342                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
343                         &iattr);
344                 dprintk("RPC:       %s: %d responder resources"
345                         " (%d initiator)\n",
346                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
347                 goto connected;
348         case RDMA_CM_EVENT_CONNECT_ERROR:
349                 connstate = -ENOTCONN;
350                 goto connected;
351         case RDMA_CM_EVENT_UNREACHABLE:
352                 connstate = -ENETDOWN;
353                 goto connected;
354         case RDMA_CM_EVENT_REJECTED:
355                 connstate = -ECONNREFUSED;
356                 goto connected;
357         case RDMA_CM_EVENT_DISCONNECTED:
358                 connstate = -ECONNABORTED;
359                 goto connected;
360         case RDMA_CM_EVENT_DEVICE_REMOVAL:
361                 connstate = -ENODEV;
362 connected:
363                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
364                         __func__,
365                         (event->event <= 11) ? conn[event->event] :
366                                                 "unknown connection error",
367                         &addr->sin_addr.s_addr,
368                         ntohs(addr->sin_port),
369                         ep, event->event);
370                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
371                 dprintk("RPC:       %s: %sconnected\n",
372                                         __func__, connstate > 0 ? "" : "dis");
373                 ep->rep_connected = connstate;
374                 ep->rep_func(ep);
375                 wake_up_all(&ep->rep_connect_wait);
376                 break;
377         default:
378                 dprintk("RPC:       %s: unexpected CM event %d\n",
379                         __func__, event->event);
380                 break;
381         }
382
383 #ifdef RPC_DEBUG
384         if (connstate == 1) {
385                 int ird = attr.max_dest_rd_atomic;
386                 int tird = ep->rep_remote_cma.responder_resources;
387                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
388                         "on %s, memreg %d slots %d ird %d%s\n",
389                         &addr->sin_addr.s_addr,
390                         ntohs(addr->sin_port),
391                         ia->ri_id->device->name,
392                         ia->ri_memreg_strategy,
393                         xprt->rx_buf.rb_max_requests,
394                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
395         } else if (connstate < 0) {
396                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
397                         &addr->sin_addr.s_addr,
398                         ntohs(addr->sin_port),
399                         connstate);
400         }
401 #endif
402
403         return 0;
404 }
405
406 static struct rdma_cm_id *
407 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
408                         struct rpcrdma_ia *ia, struct sockaddr *addr)
409 {
410         struct rdma_cm_id *id;
411         int rc;
412
413         init_completion(&ia->ri_done);
414
415         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
416         if (IS_ERR(id)) {
417                 rc = PTR_ERR(id);
418                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
419                         __func__, rc);
420                 return id;
421         }
422
423         ia->ri_async_rc = -ETIMEDOUT;
424         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
425         if (rc) {
426                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
427                         __func__, rc);
428                 goto out;
429         }
430         wait_for_completion_interruptible_timeout(&ia->ri_done,
431                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
432         rc = ia->ri_async_rc;
433         if (rc)
434                 goto out;
435
436         ia->ri_async_rc = -ETIMEDOUT;
437         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
438         if (rc) {
439                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
440                         __func__, rc);
441                 goto out;
442         }
443         wait_for_completion_interruptible_timeout(&ia->ri_done,
444                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
445         rc = ia->ri_async_rc;
446         if (rc)
447                 goto out;
448
449         return id;
450
451 out:
452         rdma_destroy_id(id);
453         return ERR_PTR(rc);
454 }
455
456 /*
457  * Drain any cq, prior to teardown.
458  */
459 static void
460 rpcrdma_clean_cq(struct ib_cq *cq)
461 {
462         struct ib_wc wc;
463         int count = 0;
464
465         while (1 == ib_poll_cq(cq, 1, &wc))
466                 ++count;
467
468         if (count)
469                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
470                         __func__, count, wc.opcode);
471 }
472
473 /*
474  * Exported functions.
475  */
476
477 /*
478  * Open and initialize an Interface Adapter.
479  *  o initializes fields of struct rpcrdma_ia, including
480  *    interface and provider attributes and protection zone.
481  */
482 int
483 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
484 {
485         int rc, mem_priv;
486         struct ib_device_attr devattr;
487         struct rpcrdma_ia *ia = &xprt->rx_ia;
488
489         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
490         if (IS_ERR(ia->ri_id)) {
491                 rc = PTR_ERR(ia->ri_id);
492                 goto out1;
493         }
494
495         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
496         if (IS_ERR(ia->ri_pd)) {
497                 rc = PTR_ERR(ia->ri_pd);
498                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
499                         __func__, rc);
500                 goto out2;
501         }
502
503         /*
504          * Query the device to determine if the requested memory
505          * registration strategy is supported. If it isn't, set the
506          * strategy to a globally supported model.
507          */
508         rc = ib_query_device(ia->ri_id->device, &devattr);
509         if (rc) {
510                 dprintk("RPC:       %s: ib_query_device failed %d\n",
511                         __func__, rc);
512                 goto out2;
513         }
514
515         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
516                 ia->ri_have_dma_lkey = 1;
517                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
518         }
519
520         if (memreg == RPCRDMA_FRMR) {
521                 /* Requires both frmr reg and local dma lkey */
522                 if ((devattr.device_cap_flags &
523                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
524                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
525                         dprintk("RPC:       %s: FRMR registration "
526                                 "not supported by HCA\n", __func__);
527                         memreg = RPCRDMA_MTHCAFMR;
528                 } else {
529                         /* Mind the ia limit on FRMR page list depth */
530                         ia->ri_max_frmr_depth = min_t(unsigned int,
531                                 RPCRDMA_MAX_DATA_SEGS,
532                                 devattr.max_fast_reg_page_list_len);
533                 }
534         }
535         if (memreg == RPCRDMA_MTHCAFMR) {
536                 if (!ia->ri_id->device->alloc_fmr) {
537                         dprintk("RPC:       %s: MTHCAFMR registration "
538                                 "not supported by HCA\n", __func__);
539 #if RPCRDMA_PERSISTENT_REGISTRATION
540                         memreg = RPCRDMA_ALLPHYSICAL;
541 #else
542                         rc = -ENOMEM;
543                         goto out2;
544 #endif
545                 }
546         }
547
548         /*
549          * Optionally obtain an underlying physical identity mapping in
550          * order to do a memory window-based bind. This base registration
551          * is protected from remote access - that is enabled only by binding
552          * for the specific bytes targeted during each RPC operation, and
553          * revoked after the corresponding completion similar to a storage
554          * adapter.
555          */
556         switch (memreg) {
557         case RPCRDMA_FRMR:
558                 break;
559 #if RPCRDMA_PERSISTENT_REGISTRATION
560         case RPCRDMA_ALLPHYSICAL:
561                 mem_priv = IB_ACCESS_LOCAL_WRITE |
562                                 IB_ACCESS_REMOTE_WRITE |
563                                 IB_ACCESS_REMOTE_READ;
564                 goto register_setup;
565 #endif
566         case RPCRDMA_MTHCAFMR:
567                 if (ia->ri_have_dma_lkey)
568                         break;
569                 mem_priv = IB_ACCESS_LOCAL_WRITE;
570 #if RPCRDMA_PERSISTENT_REGISTRATION
571         register_setup:
572 #endif
573                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
574                 if (IS_ERR(ia->ri_bind_mem)) {
575                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
576                                 "phys register failed with %lX\n",
577                                 __func__, PTR_ERR(ia->ri_bind_mem));
578                         rc = -ENOMEM;
579                         goto out2;
580                 }
581                 break;
582         default:
583                 printk(KERN_ERR "RPC: Unsupported memory "
584                                 "registration mode: %d\n", memreg);
585                 rc = -ENOMEM;
586                 goto out2;
587         }
588         dprintk("RPC:       %s: memory registration strategy is %d\n",
589                 __func__, memreg);
590
591         /* Else will do memory reg/dereg for each chunk */
592         ia->ri_memreg_strategy = memreg;
593
594         return 0;
595 out2:
596         rdma_destroy_id(ia->ri_id);
597         ia->ri_id = NULL;
598 out1:
599         return rc;
600 }
601
602 /*
603  * Clean up/close an IA.
604  *   o if event handles and PD have been initialized, free them.
605  *   o close the IA
606  */
607 void
608 rpcrdma_ia_close(struct rpcrdma_ia *ia)
609 {
610         int rc;
611
612         dprintk("RPC:       %s: entering\n", __func__);
613         if (ia->ri_bind_mem != NULL) {
614                 rc = ib_dereg_mr(ia->ri_bind_mem);
615                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
616                         __func__, rc);
617         }
618         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
619                 if (ia->ri_id->qp)
620                         rdma_destroy_qp(ia->ri_id);
621                 rdma_destroy_id(ia->ri_id);
622                 ia->ri_id = NULL;
623         }
624         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
625                 rc = ib_dealloc_pd(ia->ri_pd);
626                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
627                         __func__, rc);
628         }
629 }
630
631 /*
632  * Create unconnected endpoint.
633  */
634 int
635 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
636                                 struct rpcrdma_create_data_internal *cdata)
637 {
638         struct ib_device_attr devattr;
639         struct ib_cq *sendcq, *recvcq;
640         int rc, err;
641
642         rc = ib_query_device(ia->ri_id->device, &devattr);
643         if (rc) {
644                 dprintk("RPC:       %s: ib_query_device failed %d\n",
645                         __func__, rc);
646                 return rc;
647         }
648
649         /* check provider's send/recv wr limits */
650         if (cdata->max_requests > devattr.max_qp_wr)
651                 cdata->max_requests = devattr.max_qp_wr;
652
653         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
654         ep->rep_attr.qp_context = ep;
655         /* send_cq and recv_cq initialized below */
656         ep->rep_attr.srq = NULL;
657         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
658         switch (ia->ri_memreg_strategy) {
659         case RPCRDMA_FRMR: {
660                 int depth = 7;
661
662                 /* Add room for frmr register and invalidate WRs.
663                  * 1. FRMR reg WR for head
664                  * 2. FRMR invalidate WR for head
665                  * 3. N FRMR reg WRs for pagelist
666                  * 4. N FRMR invalidate WRs for pagelist
667                  * 5. FRMR reg WR for tail
668                  * 6. FRMR invalidate WR for tail
669                  * 7. The RDMA_SEND WR
670                  */
671
672                 /* Calculate N if the device max FRMR depth is smaller than
673                  * RPCRDMA_MAX_DATA_SEGS.
674                  */
675                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
676                         int delta = RPCRDMA_MAX_DATA_SEGS -
677                                     ia->ri_max_frmr_depth;
678
679                         do {
680                                 depth += 2; /* FRMR reg + invalidate */
681                                 delta -= ia->ri_max_frmr_depth;
682                         } while (delta > 0);
683
684                 }
685                 ep->rep_attr.cap.max_send_wr *= depth;
686                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
687                         cdata->max_requests = devattr.max_qp_wr / depth;
688                         if (!cdata->max_requests)
689                                 return -EINVAL;
690                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
691                                                        depth;
692                 }
693                 break;
694         }
695         default:
696                 break;
697         }
698         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
699         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
700         ep->rep_attr.cap.max_recv_sge = 1;
701         ep->rep_attr.cap.max_inline_data = 0;
702         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
703         ep->rep_attr.qp_type = IB_QPT_RC;
704         ep->rep_attr.port_num = ~0;
705
706         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
707                 "iovs: send %d recv %d\n",
708                 __func__,
709                 ep->rep_attr.cap.max_send_wr,
710                 ep->rep_attr.cap.max_recv_wr,
711                 ep->rep_attr.cap.max_send_sge,
712                 ep->rep_attr.cap.max_recv_sge);
713
714         /* set trigger for requesting send completion */
715         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
716         if (ep->rep_cqinit <= 2)
717                 ep->rep_cqinit = 0;
718         INIT_CQCOUNT(ep);
719         ep->rep_ia = ia;
720         init_waitqueue_head(&ep->rep_connect_wait);
721         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
722
723         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
724                                   rpcrdma_cq_async_error_upcall, NULL,
725                                   ep->rep_attr.cap.max_send_wr + 1, 0);
726         if (IS_ERR(sendcq)) {
727                 rc = PTR_ERR(sendcq);
728                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
729                         __func__, rc);
730                 goto out1;
731         }
732
733         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
734         if (rc) {
735                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
736                         __func__, rc);
737                 goto out2;
738         }
739
740         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
741                                   rpcrdma_cq_async_error_upcall, NULL,
742                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
743         if (IS_ERR(recvcq)) {
744                 rc = PTR_ERR(recvcq);
745                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
746                         __func__, rc);
747                 goto out2;
748         }
749
750         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
751         if (rc) {
752                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
753                         __func__, rc);
754                 ib_destroy_cq(recvcq);
755                 goto out2;
756         }
757
758         ep->rep_attr.send_cq = sendcq;
759         ep->rep_attr.recv_cq = recvcq;
760
761         /* Initialize cma parameters */
762
763         /* RPC/RDMA does not use private data */
764         ep->rep_remote_cma.private_data = NULL;
765         ep->rep_remote_cma.private_data_len = 0;
766
767         /* Client offers RDMA Read but does not initiate */
768         ep->rep_remote_cma.initiator_depth = 0;
769         if (devattr.max_qp_rd_atom > 32)        /* arbitrary but <= 255 */
770                 ep->rep_remote_cma.responder_resources = 32;
771         else
772                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
773
774         ep->rep_remote_cma.retry_count = 7;
775         ep->rep_remote_cma.flow_control = 0;
776         ep->rep_remote_cma.rnr_retry_count = 0;
777
778         return 0;
779
780 out2:
781         err = ib_destroy_cq(sendcq);
782         if (err)
783                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
784                         __func__, err);
785 out1:
786         return rc;
787 }
788
789 /*
790  * rpcrdma_ep_destroy
791  *
792  * Disconnect and destroy endpoint. After this, the only
793  * valid operations on the ep are to free it (if dynamically
794  * allocated) or re-create it.
795  */
796 void
797 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
798 {
799         int rc;
800
801         dprintk("RPC:       %s: entering, connected is %d\n",
802                 __func__, ep->rep_connected);
803
804         cancel_delayed_work_sync(&ep->rep_connect_worker);
805
806         if (ia->ri_id->qp) {
807                 rc = rpcrdma_ep_disconnect(ep, ia);
808                 if (rc)
809                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
810                                 " returned %i\n", __func__, rc);
811                 rdma_destroy_qp(ia->ri_id);
812                 ia->ri_id->qp = NULL;
813         }
814
815         /* padding - could be done in rpcrdma_buffer_destroy... */
816         if (ep->rep_pad_mr) {
817                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
818                 ep->rep_pad_mr = NULL;
819         }
820
821         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
822         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
823         if (rc)
824                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
825                         __func__, rc);
826
827         rpcrdma_clean_cq(ep->rep_attr.send_cq);
828         rc = ib_destroy_cq(ep->rep_attr.send_cq);
829         if (rc)
830                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
831                         __func__, rc);
832 }
833
834 /*
835  * Connect unconnected endpoint.
836  */
837 int
838 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
839 {
840         struct rdma_cm_id *id;
841         int rc = 0;
842         int retry_count = 0;
843
844         if (ep->rep_connected != 0) {
845                 struct rpcrdma_xprt *xprt;
846 retry:
847                 rc = rpcrdma_ep_disconnect(ep, ia);
848                 if (rc && rc != -ENOTCONN)
849                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
850                                 " status %i\n", __func__, rc);
851
852                 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
853                 rpcrdma_clean_cq(ep->rep_attr.send_cq);
854
855                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
856                 id = rpcrdma_create_id(xprt, ia,
857                                 (struct sockaddr *)&xprt->rx_data.addr);
858                 if (IS_ERR(id)) {
859                         rc = PTR_ERR(id);
860                         goto out;
861                 }
862                 /* TEMP TEMP TEMP - fail if new device:
863                  * Deregister/remarshal *all* requests!
864                  * Close and recreate adapter, pd, etc!
865                  * Re-determine all attributes still sane!
866                  * More stuff I haven't thought of!
867                  * Rrrgh!
868                  */
869                 if (ia->ri_id->device != id->device) {
870                         printk("RPC:       %s: can't reconnect on "
871                                 "different device!\n", __func__);
872                         rdma_destroy_id(id);
873                         rc = -ENETDOWN;
874                         goto out;
875                 }
876                 /* END TEMP */
877                 rdma_destroy_qp(ia->ri_id);
878                 rdma_destroy_id(ia->ri_id);
879                 ia->ri_id = id;
880         }
881
882         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
883         if (rc) {
884                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
885                         __func__, rc);
886                 goto out;
887         }
888
889 /* XXX Tavor device performs badly with 2K MTU! */
890 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
891         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
892         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
893             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
894              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
895                 struct ib_qp_attr attr = {
896                         .path_mtu = IB_MTU_1024
897                 };
898                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
899         }
900 }
901
902         ep->rep_connected = 0;
903
904         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
905         if (rc) {
906                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
907                                 __func__, rc);
908                 goto out;
909         }
910
911         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
912
913         /*
914          * Check state. A non-peer reject indicates no listener
915          * (ECONNREFUSED), which may be a transient state. All
916          * others indicate a transport condition which has already
917          * undergone a best-effort.
918          */
919         if (ep->rep_connected == -ECONNREFUSED &&
920             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
921                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
922                 goto retry;
923         }
924         if (ep->rep_connected <= 0) {
925                 /* Sometimes, the only way to reliably connect to remote
926                  * CMs is to use same nonzero values for ORD and IRD. */
927                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
928                     (ep->rep_remote_cma.responder_resources == 0 ||
929                      ep->rep_remote_cma.initiator_depth !=
930                                 ep->rep_remote_cma.responder_resources)) {
931                         if (ep->rep_remote_cma.responder_resources == 0)
932                                 ep->rep_remote_cma.responder_resources = 1;
933                         ep->rep_remote_cma.initiator_depth =
934                                 ep->rep_remote_cma.responder_resources;
935                         goto retry;
936                 }
937                 rc = ep->rep_connected;
938         } else {
939                 dprintk("RPC:       %s: connected\n", __func__);
940         }
941
942 out:
943         if (rc)
944                 ep->rep_connected = rc;
945         return rc;
946 }
947
948 /*
949  * rpcrdma_ep_disconnect
950  *
951  * This is separate from destroy to facilitate the ability
952  * to reconnect without recreating the endpoint.
953  *
954  * This call is not reentrant, and must not be made in parallel
955  * on the same endpoint.
956  */
957 int
958 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
959 {
960         int rc;
961
962         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
963         rpcrdma_clean_cq(ep->rep_attr.send_cq);
964         rc = rdma_disconnect(ia->ri_id);
965         if (!rc) {
966                 /* returns without wait if not connected */
967                 wait_event_interruptible(ep->rep_connect_wait,
968                                                         ep->rep_connected != 1);
969                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
970                         (ep->rep_connected == 1) ? "still " : "dis");
971         } else {
972                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
973                 ep->rep_connected = rc;
974         }
975         return rc;
976 }
977
978 /*
979  * Initialize buffer memory
980  */
981 int
982 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
983         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
984 {
985         char *p;
986         size_t len;
987         int i, rc;
988         struct rpcrdma_mw *r;
989
990         buf->rb_max_requests = cdata->max_requests;
991         spin_lock_init(&buf->rb_lock);
992         atomic_set(&buf->rb_credits, 1);
993
994         /* Need to allocate:
995          *   1.  arrays for send and recv pointers
996          *   2.  arrays of struct rpcrdma_req to fill in pointers
997          *   3.  array of struct rpcrdma_rep for replies
998          *   4.  padding, if any
999          *   5.  mw's, fmr's or frmr's, if any
1000          * Send/recv buffers in req/rep need to be registered
1001          */
1002
1003         len = buf->rb_max_requests *
1004                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1005         len += cdata->padding;
1006         switch (ia->ri_memreg_strategy) {
1007         case RPCRDMA_FRMR:
1008                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1009                                 sizeof(struct rpcrdma_mw);
1010                 break;
1011         case RPCRDMA_MTHCAFMR:
1012                 /* TBD we are perhaps overallocating here */
1013                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1014                                 sizeof(struct rpcrdma_mw);
1015                 break;
1016         default:
1017                 break;
1018         }
1019
1020         /* allocate 1, 4 and 5 in one shot */
1021         p = kzalloc(len, GFP_KERNEL);
1022         if (p == NULL) {
1023                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1024                         __func__, len);
1025                 rc = -ENOMEM;
1026                 goto out;
1027         }
1028         buf->rb_pool = p;       /* for freeing it later */
1029
1030         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1031         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1032         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1033         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1034
1035         /*
1036          * Register the zeroed pad buffer, if any.
1037          */
1038         if (cdata->padding) {
1039                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1040                                             &ep->rep_pad_mr, &ep->rep_pad);
1041                 if (rc)
1042                         goto out;
1043         }
1044         p += cdata->padding;
1045
1046         INIT_LIST_HEAD(&buf->rb_mws);
1047         r = (struct rpcrdma_mw *)p;
1048         switch (ia->ri_memreg_strategy) {
1049         case RPCRDMA_FRMR:
1050                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1051                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1052                                                 ia->ri_max_frmr_depth);
1053                         if (IS_ERR(r->r.frmr.fr_mr)) {
1054                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1055                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1056                                         " failed %i\n", __func__, rc);
1057                                 goto out;
1058                         }
1059                         r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1060                                                 ia->ri_id->device,
1061                                                 ia->ri_max_frmr_depth);
1062                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1063                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1064                                 dprintk("RPC:       %s: "
1065                                         "ib_alloc_fast_reg_page_list "
1066                                         "failed %i\n", __func__, rc);
1067
1068                                 ib_dereg_mr(r->r.frmr.fr_mr);
1069                                 goto out;
1070                         }
1071                         list_add(&r->mw_list, &buf->rb_mws);
1072                         ++r;
1073                 }
1074                 break;
1075         case RPCRDMA_MTHCAFMR:
1076                 /* TBD we are perhaps overallocating here */
1077                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1078                         static struct ib_fmr_attr fa =
1079                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1080                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1081                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1082                                 &fa);
1083                         if (IS_ERR(r->r.fmr)) {
1084                                 rc = PTR_ERR(r->r.fmr);
1085                                 dprintk("RPC:       %s: ib_alloc_fmr"
1086                                         " failed %i\n", __func__, rc);
1087                                 goto out;
1088                         }
1089                         list_add(&r->mw_list, &buf->rb_mws);
1090                         ++r;
1091                 }
1092                 break;
1093         default:
1094                 break;
1095         }
1096
1097         /*
1098          * Allocate/init the request/reply buffers. Doing this
1099          * using kmalloc for now -- one for each buf.
1100          */
1101         for (i = 0; i < buf->rb_max_requests; i++) {
1102                 struct rpcrdma_req *req;
1103                 struct rpcrdma_rep *rep;
1104
1105                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1106                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1107                 /* Typical ~2400b, so rounding up saves work later */
1108                 if (len < 4096)
1109                         len = 4096;
1110                 req = kmalloc(len, GFP_KERNEL);
1111                 if (req == NULL) {
1112                         dprintk("RPC:       %s: request buffer %d alloc"
1113                                 " failed\n", __func__, i);
1114                         rc = -ENOMEM;
1115                         goto out;
1116                 }
1117                 memset(req, 0, sizeof(struct rpcrdma_req));
1118                 buf->rb_send_bufs[i] = req;
1119                 buf->rb_send_bufs[i]->rl_buffer = buf;
1120
1121                 rc = rpcrdma_register_internal(ia, req->rl_base,
1122                                 len - offsetof(struct rpcrdma_req, rl_base),
1123                                 &buf->rb_send_bufs[i]->rl_handle,
1124                                 &buf->rb_send_bufs[i]->rl_iov);
1125                 if (rc)
1126                         goto out;
1127
1128                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1129
1130                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1131                 rep = kmalloc(len, GFP_KERNEL);
1132                 if (rep == NULL) {
1133                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1134                                 __func__, i);
1135                         rc = -ENOMEM;
1136                         goto out;
1137                 }
1138                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1139                 buf->rb_recv_bufs[i] = rep;
1140                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1141
1142                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1143                                 len - offsetof(struct rpcrdma_rep, rr_base),
1144                                 &buf->rb_recv_bufs[i]->rr_handle,
1145                                 &buf->rb_recv_bufs[i]->rr_iov);
1146                 if (rc)
1147                         goto out;
1148
1149         }
1150         dprintk("RPC:       %s: max_requests %d\n",
1151                 __func__, buf->rb_max_requests);
1152         /* done */
1153         return 0;
1154 out:
1155         rpcrdma_buffer_destroy(buf);
1156         return rc;
1157 }
1158
1159 /*
1160  * Unregister and destroy buffer memory. Need to deal with
1161  * partial initialization, so it's callable from failed create.
1162  * Must be called before destroying endpoint, as registrations
1163  * reference it.
1164  */
1165 void
1166 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1167 {
1168         int rc, i;
1169         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1170         struct rpcrdma_mw *r;
1171
1172         /* clean up in reverse order from create
1173          *   1.  recv mr memory (mr free, then kfree)
1174          *   2.  send mr memory (mr free, then kfree)
1175          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1176          *   4.  arrays
1177          */
1178         dprintk("RPC:       %s: entering\n", __func__);
1179
1180         for (i = 0; i < buf->rb_max_requests; i++) {
1181                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1182                         rpcrdma_deregister_internal(ia,
1183                                         buf->rb_recv_bufs[i]->rr_handle,
1184                                         &buf->rb_recv_bufs[i]->rr_iov);
1185                         kfree(buf->rb_recv_bufs[i]);
1186                 }
1187                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1188                         rpcrdma_deregister_internal(ia,
1189                                         buf->rb_send_bufs[i]->rl_handle,
1190                                         &buf->rb_send_bufs[i]->rl_iov);
1191                         kfree(buf->rb_send_bufs[i]);
1192                 }
1193         }
1194
1195         while (!list_empty(&buf->rb_mws)) {
1196                 r = list_entry(buf->rb_mws.next,
1197                         struct rpcrdma_mw, mw_list);
1198                 list_del(&r->mw_list);
1199                 switch (ia->ri_memreg_strategy) {
1200                 case RPCRDMA_FRMR:
1201                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1202                         if (rc)
1203                                 dprintk("RPC:       %s:"
1204                                         " ib_dereg_mr"
1205                                         " failed %i\n",
1206                                         __func__, rc);
1207                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1208                         break;
1209                 case RPCRDMA_MTHCAFMR:
1210                         rc = ib_dealloc_fmr(r->r.fmr);
1211                         if (rc)
1212                                 dprintk("RPC:       %s:"
1213                                         " ib_dealloc_fmr"
1214                                         " failed %i\n",
1215                                         __func__, rc);
1216                         break;
1217                 default:
1218                         break;
1219                 }
1220         }
1221
1222         kfree(buf->rb_pool);
1223 }
1224
1225 /*
1226  * Get a set of request/reply buffers.
1227  *
1228  * Reply buffer (if needed) is attached to send buffer upon return.
1229  * Rule:
1230  *    rb_send_index and rb_recv_index MUST always be pointing to the
1231  *    *next* available buffer (non-NULL). They are incremented after
1232  *    removing buffers, and decremented *before* returning them.
1233  */
1234 struct rpcrdma_req *
1235 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1236 {
1237         struct rpcrdma_req *req;
1238         unsigned long flags;
1239         int i;
1240         struct rpcrdma_mw *r;
1241
1242         spin_lock_irqsave(&buffers->rb_lock, flags);
1243         if (buffers->rb_send_index == buffers->rb_max_requests) {
1244                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1245                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1246                 return ((struct rpcrdma_req *)NULL);
1247         }
1248
1249         req = buffers->rb_send_bufs[buffers->rb_send_index];
1250         if (buffers->rb_send_index < buffers->rb_recv_index) {
1251                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1252                         __func__,
1253                         buffers->rb_recv_index - buffers->rb_send_index);
1254                 req->rl_reply = NULL;
1255         } else {
1256                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1257                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1258         }
1259         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1260         if (!list_empty(&buffers->rb_mws)) {
1261                 i = RPCRDMA_MAX_SEGS - 1;
1262                 do {
1263                         r = list_entry(buffers->rb_mws.next,
1264                                         struct rpcrdma_mw, mw_list);
1265                         list_del(&r->mw_list);
1266                         req->rl_segments[i].mr_chunk.rl_mw = r;
1267                 } while (--i >= 0);
1268         }
1269         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1270         return req;
1271 }
1272
1273 /*
1274  * Put request/reply buffers back into pool.
1275  * Pre-decrement counter/array index.
1276  */
1277 void
1278 rpcrdma_buffer_put(struct rpcrdma_req *req)
1279 {
1280         struct rpcrdma_buffer *buffers = req->rl_buffer;
1281         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1282         int i;
1283         unsigned long flags;
1284
1285         BUG_ON(req->rl_nchunks != 0);
1286         spin_lock_irqsave(&buffers->rb_lock, flags);
1287         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1288         req->rl_niovs = 0;
1289         if (req->rl_reply) {
1290                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1291                 req->rl_reply->rr_func = NULL;
1292                 req->rl_reply = NULL;
1293         }
1294         switch (ia->ri_memreg_strategy) {
1295         case RPCRDMA_FRMR:
1296         case RPCRDMA_MTHCAFMR:
1297                 /*
1298                  * Cycle mw's back in reverse order, and "spin" them.
1299                  * This delays and scrambles reuse as much as possible.
1300                  */
1301                 i = 1;
1302                 do {
1303                         struct rpcrdma_mw **mw;
1304                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1305                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1306                         *mw = NULL;
1307                 } while (++i < RPCRDMA_MAX_SEGS);
1308                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1309                                         &buffers->rb_mws);
1310                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1311                 break;
1312         default:
1313                 break;
1314         }
1315         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1316 }
1317
1318 /*
1319  * Recover reply buffers from pool.
1320  * This happens when recovering from error conditions.
1321  * Post-increment counter/array index.
1322  */
1323 void
1324 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1325 {
1326         struct rpcrdma_buffer *buffers = req->rl_buffer;
1327         unsigned long flags;
1328
1329         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1330                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1331         spin_lock_irqsave(&buffers->rb_lock, flags);
1332         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1333                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1334                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1335         }
1336         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1337 }
1338
1339 /*
1340  * Put reply buffers back into pool when not attached to
1341  * request. This happens in error conditions.
1342  */
1343 void
1344 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1345 {
1346         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1347         unsigned long flags;
1348
1349         rep->rr_func = NULL;
1350         spin_lock_irqsave(&buffers->rb_lock, flags);
1351         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1352         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1353 }
1354
1355 /*
1356  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1357  */
1358
1359 int
1360 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1361                                 struct ib_mr **mrp, struct ib_sge *iov)
1362 {
1363         struct ib_phys_buf ipb;
1364         struct ib_mr *mr;
1365         int rc;
1366
1367         /*
1368          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1369          */
1370         iov->addr = ib_dma_map_single(ia->ri_id->device,
1371                         va, len, DMA_BIDIRECTIONAL);
1372         iov->length = len;
1373
1374         if (ia->ri_have_dma_lkey) {
1375                 *mrp = NULL;
1376                 iov->lkey = ia->ri_dma_lkey;
1377                 return 0;
1378         } else if (ia->ri_bind_mem != NULL) {
1379                 *mrp = NULL;
1380                 iov->lkey = ia->ri_bind_mem->lkey;
1381                 return 0;
1382         }
1383
1384         ipb.addr = iov->addr;
1385         ipb.size = iov->length;
1386         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1387                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1388
1389         dprintk("RPC:       %s: phys convert: 0x%llx "
1390                         "registered 0x%llx length %d\n",
1391                         __func__, (unsigned long long)ipb.addr,
1392                         (unsigned long long)iov->addr, len);
1393
1394         if (IS_ERR(mr)) {
1395                 *mrp = NULL;
1396                 rc = PTR_ERR(mr);
1397                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1398         } else {
1399                 *mrp = mr;
1400                 iov->lkey = mr->lkey;
1401                 rc = 0;
1402         }
1403
1404         return rc;
1405 }
1406
1407 int
1408 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1409                                 struct ib_mr *mr, struct ib_sge *iov)
1410 {
1411         int rc;
1412
1413         ib_dma_unmap_single(ia->ri_id->device,
1414                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1415
1416         if (NULL == mr)
1417                 return 0;
1418
1419         rc = ib_dereg_mr(mr);
1420         if (rc)
1421                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1422         return rc;
1423 }
1424
1425 /*
1426  * Wrappers for chunk registration, shared by read/write chunk code.
1427  */
1428
1429 static void
1430 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1431 {
1432         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1433         seg->mr_dmalen = seg->mr_len;
1434         if (seg->mr_page)
1435                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1436                                 seg->mr_page, offset_in_page(seg->mr_offset),
1437                                 seg->mr_dmalen, seg->mr_dir);
1438         else
1439                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1440                                 seg->mr_offset,
1441                                 seg->mr_dmalen, seg->mr_dir);
1442         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1443                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1444                         __func__,
1445                         (unsigned long long)seg->mr_dma,
1446                         seg->mr_offset, seg->mr_dmalen);
1447         }
1448 }
1449
1450 static void
1451 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1452 {
1453         if (seg->mr_page)
1454                 ib_dma_unmap_page(ia->ri_id->device,
1455                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1456         else
1457                 ib_dma_unmap_single(ia->ri_id->device,
1458                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1459 }
1460
1461 static int
1462 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1463                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1464                         struct rpcrdma_xprt *r_xprt)
1465 {
1466         struct rpcrdma_mr_seg *seg1 = seg;
1467         struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1468
1469         u8 key;
1470         int len, pageoff;
1471         int i, rc;
1472         int seg_len;
1473         u64 pa;
1474         int page_no;
1475
1476         pageoff = offset_in_page(seg1->mr_offset);
1477         seg1->mr_offset -= pageoff;     /* start of page */
1478         seg1->mr_len += pageoff;
1479         len = -pageoff;
1480         if (*nsegs > ia->ri_max_frmr_depth)
1481                 *nsegs = ia->ri_max_frmr_depth;
1482         for (page_no = i = 0; i < *nsegs;) {
1483                 rpcrdma_map_one(ia, seg, writing);
1484                 pa = seg->mr_dma;
1485                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1486                         seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1487                                 page_list[page_no++] = pa;
1488                         pa += PAGE_SIZE;
1489                 }
1490                 len += seg->mr_len;
1491                 ++seg;
1492                 ++i;
1493                 /* Check for holes */
1494                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1495                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1496                         break;
1497         }
1498         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1499                 __func__, seg1->mr_chunk.rl_mw, i);
1500
1501         if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1502                 dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1503                         __func__,
1504                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1505                 /* Invalidate before using. */
1506                 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1507                 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1508                 invalidate_wr.next = &frmr_wr;
1509                 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1510                 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1511                 invalidate_wr.ex.invalidate_rkey =
1512                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1513                 DECR_CQCOUNT(&r_xprt->rx_ep);
1514                 post_wr = &invalidate_wr;
1515         } else
1516                 post_wr = &frmr_wr;
1517
1518         /* Bump the key */
1519         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1520         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1521
1522         /* Prepare FRMR WR */
1523         memset(&frmr_wr, 0, sizeof frmr_wr);
1524         frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1525         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1526         frmr_wr.send_flags = IB_SEND_SIGNALED;
1527         frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1528         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1529         frmr_wr.wr.fast_reg.page_list_len = page_no;
1530         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1531         frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1532         BUG_ON(frmr_wr.wr.fast_reg.length < len);
1533         frmr_wr.wr.fast_reg.access_flags = (writing ?
1534                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1535                                 IB_ACCESS_REMOTE_READ);
1536         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1537         DECR_CQCOUNT(&r_xprt->rx_ep);
1538
1539         rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1540
1541         if (rc) {
1542                 dprintk("RPC:       %s: failed ib_post_send for register,"
1543                         " status %i\n", __func__, rc);
1544                 while (i--)
1545                         rpcrdma_unmap_one(ia, --seg);
1546         } else {
1547                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1548                 seg1->mr_base = seg1->mr_dma + pageoff;
1549                 seg1->mr_nsegs = i;
1550                 seg1->mr_len = len;
1551         }
1552         *nsegs = i;
1553         return rc;
1554 }
1555
1556 static int
1557 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1558                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1559 {
1560         struct rpcrdma_mr_seg *seg1 = seg;
1561         struct ib_send_wr invalidate_wr, *bad_wr;
1562         int rc;
1563
1564         while (seg1->mr_nsegs--)
1565                 rpcrdma_unmap_one(ia, seg++);
1566
1567         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1568         invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1569         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1570         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1571         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1572         DECR_CQCOUNT(&r_xprt->rx_ep);
1573
1574         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1575         if (rc)
1576                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1577                         " status %i\n", __func__, rc);
1578         return rc;
1579 }
1580
1581 static int
1582 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1583                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1584 {
1585         struct rpcrdma_mr_seg *seg1 = seg;
1586         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1587         int len, pageoff, i, rc;
1588
1589         pageoff = offset_in_page(seg1->mr_offset);
1590         seg1->mr_offset -= pageoff;     /* start of page */
1591         seg1->mr_len += pageoff;
1592         len = -pageoff;
1593         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1594                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1595         for (i = 0; i < *nsegs;) {
1596                 rpcrdma_map_one(ia, seg, writing);
1597                 physaddrs[i] = seg->mr_dma;
1598                 len += seg->mr_len;
1599                 ++seg;
1600                 ++i;
1601                 /* Check for holes */
1602                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1603                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1604                         break;
1605         }
1606         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1607                                 physaddrs, i, seg1->mr_dma);
1608         if (rc) {
1609                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1610                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1611                         len, (unsigned long long)seg1->mr_dma,
1612                         pageoff, i, rc);
1613                 while (i--)
1614                         rpcrdma_unmap_one(ia, --seg);
1615         } else {
1616                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1617                 seg1->mr_base = seg1->mr_dma + pageoff;
1618                 seg1->mr_nsegs = i;
1619                 seg1->mr_len = len;
1620         }
1621         *nsegs = i;
1622         return rc;
1623 }
1624
1625 static int
1626 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1627                         struct rpcrdma_ia *ia)
1628 {
1629         struct rpcrdma_mr_seg *seg1 = seg;
1630         LIST_HEAD(l);
1631         int rc;
1632
1633         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1634         rc = ib_unmap_fmr(&l);
1635         while (seg1->mr_nsegs--)
1636                 rpcrdma_unmap_one(ia, seg++);
1637         if (rc)
1638                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1639                         " status %i\n", __func__, rc);
1640         return rc;
1641 }
1642
1643 int
1644 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1645                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1646 {
1647         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1648         int rc = 0;
1649
1650         switch (ia->ri_memreg_strategy) {
1651
1652 #if RPCRDMA_PERSISTENT_REGISTRATION
1653         case RPCRDMA_ALLPHYSICAL:
1654                 rpcrdma_map_one(ia, seg, writing);
1655                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1656                 seg->mr_base = seg->mr_dma;
1657                 seg->mr_nsegs = 1;
1658                 nsegs = 1;
1659                 break;
1660 #endif
1661
1662         /* Registration using frmr registration */
1663         case RPCRDMA_FRMR:
1664                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1665                 break;
1666
1667         /* Registration using fmr memory registration */
1668         case RPCRDMA_MTHCAFMR:
1669                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1670                 break;
1671
1672         default:
1673                 return -1;
1674         }
1675         if (rc)
1676                 return -1;
1677
1678         return nsegs;
1679 }
1680
1681 int
1682 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1683                 struct rpcrdma_xprt *r_xprt)
1684 {
1685         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1686         int nsegs = seg->mr_nsegs, rc;
1687
1688         switch (ia->ri_memreg_strategy) {
1689
1690 #if RPCRDMA_PERSISTENT_REGISTRATION
1691         case RPCRDMA_ALLPHYSICAL:
1692                 BUG_ON(nsegs != 1);
1693                 rpcrdma_unmap_one(ia, seg);
1694                 rc = 0;
1695                 break;
1696 #endif
1697
1698         case RPCRDMA_FRMR:
1699                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1700                 break;
1701
1702         case RPCRDMA_MTHCAFMR:
1703                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1704                 break;
1705
1706         default:
1707                 break;
1708         }
1709         return nsegs;
1710 }
1711
1712 /*
1713  * Prepost any receive buffer, then post send.
1714  *
1715  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1716  */
1717 int
1718 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1719                 struct rpcrdma_ep *ep,
1720                 struct rpcrdma_req *req)
1721 {
1722         struct ib_send_wr send_wr, *send_wr_fail;
1723         struct rpcrdma_rep *rep = req->rl_reply;
1724         int rc;
1725
1726         if (rep) {
1727                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1728                 if (rc)
1729                         goto out;
1730                 req->rl_reply = NULL;
1731         }
1732
1733         send_wr.next = NULL;
1734         send_wr.wr_id = 0ULL;   /* no send cookie */
1735         send_wr.sg_list = req->rl_send_iov;
1736         send_wr.num_sge = req->rl_niovs;
1737         send_wr.opcode = IB_WR_SEND;
1738         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1739                 ib_dma_sync_single_for_device(ia->ri_id->device,
1740                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1741                         DMA_TO_DEVICE);
1742         ib_dma_sync_single_for_device(ia->ri_id->device,
1743                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1744                 DMA_TO_DEVICE);
1745         ib_dma_sync_single_for_device(ia->ri_id->device,
1746                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1747                 DMA_TO_DEVICE);
1748
1749         if (DECR_CQCOUNT(ep) > 0)
1750                 send_wr.send_flags = 0;
1751         else { /* Provider must take a send completion every now and then */
1752                 INIT_CQCOUNT(ep);
1753                 send_wr.send_flags = IB_SEND_SIGNALED;
1754         }
1755
1756         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1757         if (rc)
1758                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1759                         rc);
1760 out:
1761         return rc;
1762 }
1763
1764 /*
1765  * (Re)post a receive buffer.
1766  */
1767 int
1768 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1769                      struct rpcrdma_ep *ep,
1770                      struct rpcrdma_rep *rep)
1771 {
1772         struct ib_recv_wr recv_wr, *recv_wr_fail;
1773         int rc;
1774
1775         recv_wr.next = NULL;
1776         recv_wr.wr_id = (u64) (unsigned long) rep;
1777         recv_wr.sg_list = &rep->rr_iov;
1778         recv_wr.num_sge = 1;
1779
1780         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1781                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1782
1783         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1784
1785         if (rc)
1786                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1787                         rc);
1788         return rc;
1789 }