xprtrdma: mind the device's max fast register page list depth
[linux-2.6-block.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/pci.h>  /* for Tavor hack below */
52 #include <linux/slab.h>
53
54 #include "xprt_rdma.h"
55
56 /*
57  * Globals/Macros
58  */
59
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY        RPCDBG_TRANS
62 #endif
63
64 /*
65  * internal functions
66  */
67
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80         struct rpcrdma_rep *rep;
81         void (*func)(struct rpcrdma_rep *);
82         unsigned long flags;
83
84         data = data;
85         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86         while (!list_empty(&rpcrdma_tasklets_g)) {
87                 rep = list_entry(rpcrdma_tasklets_g.next,
88                                  struct rpcrdma_rep, rr_list);
89                 list_del(&rep->rr_list);
90                 func = rep->rr_func;
91                 rep->rr_func = NULL;
92                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93
94                 if (func)
95                         func(rep);
96                 else
97                         rpcrdma_recv_buffer_put(rep);
98
99                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100         }
101         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109         unsigned long flags;
110
111         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112         list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114         tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120         struct rpcrdma_ep *ep = context;
121
122         dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123                 __func__, event->event, event->device->name, context);
124         if (ep->rep_connected == 1) {
125                 ep->rep_connected = -EIO;
126                 ep->rep_func(ep);
127                 wake_up_all(&ep->rep_connect_wait);
128         }
129 }
130
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134         struct rpcrdma_ep *ep = context;
135
136         dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137                 __func__, event->event, event->device->name, context);
138         if (ep->rep_connected == 1) {
139                 ep->rep_connected = -EIO;
140                 ep->rep_func(ep);
141                 wake_up_all(&ep->rep_connect_wait);
142         }
143 }
144
145 static inline
146 void rpcrdma_event_process(struct ib_wc *wc)
147 {
148         struct rpcrdma_mw *frmr;
149         struct rpcrdma_rep *rep =
150                         (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
151
152         dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
153                 __func__, rep, wc->status, wc->opcode, wc->byte_len);
154
155         if (!rep) /* send or bind completion that we don't care about */
156                 return;
157
158         if (IB_WC_SUCCESS != wc->status) {
159                 dprintk("RPC:       %s: WC opcode %d status %X, connection lost\n",
160                         __func__, wc->opcode, wc->status);
161                 rep->rr_len = ~0U;
162                 if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
163                         rpcrdma_schedule_tasklet(rep);
164                 return;
165         }
166
167         switch (wc->opcode) {
168         case IB_WC_FAST_REG_MR:
169                 frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
170                 frmr->r.frmr.state = FRMR_IS_VALID;
171                 break;
172         case IB_WC_LOCAL_INV:
173                 frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
174                 frmr->r.frmr.state = FRMR_IS_INVALID;
175                 break;
176         case IB_WC_RECV:
177                 rep->rr_len = wc->byte_len;
178                 ib_dma_sync_single_for_cpu(
179                         rdmab_to_ia(rep->rr_buffer)->ri_id->device,
180                         rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
181                 /* Keep (only) the most recent credits, after check validity */
182                 if (rep->rr_len >= 16) {
183                         struct rpcrdma_msg *p =
184                                         (struct rpcrdma_msg *) rep->rr_base;
185                         unsigned int credits = ntohl(p->rm_credit);
186                         if (credits == 0) {
187                                 dprintk("RPC:       %s: server"
188                                         " dropped credits to 0!\n", __func__);
189                                 /* don't deadlock */
190                                 credits = 1;
191                         } else if (credits > rep->rr_buffer->rb_max_requests) {
192                                 dprintk("RPC:       %s: server"
193                                         " over-crediting: %d (%d)\n",
194                                         __func__, credits,
195                                         rep->rr_buffer->rb_max_requests);
196                                 credits = rep->rr_buffer->rb_max_requests;
197                         }
198                         atomic_set(&rep->rr_buffer->rb_credits, credits);
199                 }
200                 /* fall through */
201         case IB_WC_BIND_MW:
202                 rpcrdma_schedule_tasklet(rep);
203                 break;
204         default:
205                 dprintk("RPC:       %s: unexpected WC event %X\n",
206                         __func__, wc->opcode);
207                 break;
208         }
209 }
210
211 static inline int
212 rpcrdma_cq_poll(struct ib_cq *cq)
213 {
214         struct ib_wc wc;
215         int rc;
216
217         for (;;) {
218                 rc = ib_poll_cq(cq, 1, &wc);
219                 if (rc < 0) {
220                         dprintk("RPC:       %s: ib_poll_cq failed %i\n",
221                                 __func__, rc);
222                         return rc;
223                 }
224                 if (rc == 0)
225                         break;
226
227                 rpcrdma_event_process(&wc);
228         }
229
230         return 0;
231 }
232
233 /*
234  * rpcrdma_cq_event_upcall
235  *
236  * This upcall handles recv, send, bind and unbind events.
237  * It is reentrant but processes single events in order to maintain
238  * ordering of receives to keep server credits.
239  *
240  * It is the responsibility of the scheduled tasklet to return
241  * recv buffers to the pool. NOTE: this affects synchronization of
242  * connection shutdown. That is, the structures required for
243  * the completion of the reply handler must remain intact until
244  * all memory has been reclaimed.
245  *
246  * Note that send events are suppressed and do not result in an upcall.
247  */
248 static void
249 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
250 {
251         int rc;
252
253         rc = rpcrdma_cq_poll(cq);
254         if (rc)
255                 return;
256
257         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
258         if (rc) {
259                 dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
260                         __func__, rc);
261                 return;
262         }
263
264         rpcrdma_cq_poll(cq);
265 }
266
267 #ifdef RPC_DEBUG
268 static const char * const conn[] = {
269         "address resolved",
270         "address error",
271         "route resolved",
272         "route error",
273         "connect request",
274         "connect response",
275         "connect error",
276         "unreachable",
277         "rejected",
278         "established",
279         "disconnected",
280         "device removal"
281 };
282 #endif
283
284 static int
285 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
286 {
287         struct rpcrdma_xprt *xprt = id->context;
288         struct rpcrdma_ia *ia = &xprt->rx_ia;
289         struct rpcrdma_ep *ep = &xprt->rx_ep;
290 #ifdef RPC_DEBUG
291         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
292 #endif
293         struct ib_qp_attr attr;
294         struct ib_qp_init_attr iattr;
295         int connstate = 0;
296
297         switch (event->event) {
298         case RDMA_CM_EVENT_ADDR_RESOLVED:
299         case RDMA_CM_EVENT_ROUTE_RESOLVED:
300                 ia->ri_async_rc = 0;
301                 complete(&ia->ri_done);
302                 break;
303         case RDMA_CM_EVENT_ADDR_ERROR:
304                 ia->ri_async_rc = -EHOSTUNREACH;
305                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
306                         __func__, ep);
307                 complete(&ia->ri_done);
308                 break;
309         case RDMA_CM_EVENT_ROUTE_ERROR:
310                 ia->ri_async_rc = -ENETUNREACH;
311                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
312                         __func__, ep);
313                 complete(&ia->ri_done);
314                 break;
315         case RDMA_CM_EVENT_ESTABLISHED:
316                 connstate = 1;
317                 ib_query_qp(ia->ri_id->qp, &attr,
318                         IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
319                         &iattr);
320                 dprintk("RPC:       %s: %d responder resources"
321                         " (%d initiator)\n",
322                         __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
323                 goto connected;
324         case RDMA_CM_EVENT_CONNECT_ERROR:
325                 connstate = -ENOTCONN;
326                 goto connected;
327         case RDMA_CM_EVENT_UNREACHABLE:
328                 connstate = -ENETDOWN;
329                 goto connected;
330         case RDMA_CM_EVENT_REJECTED:
331                 connstate = -ECONNREFUSED;
332                 goto connected;
333         case RDMA_CM_EVENT_DISCONNECTED:
334                 connstate = -ECONNABORTED;
335                 goto connected;
336         case RDMA_CM_EVENT_DEVICE_REMOVAL:
337                 connstate = -ENODEV;
338 connected:
339                 dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
340                         __func__,
341                         (event->event <= 11) ? conn[event->event] :
342                                                 "unknown connection error",
343                         &addr->sin_addr.s_addr,
344                         ntohs(addr->sin_port),
345                         ep, event->event);
346                 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
347                 dprintk("RPC:       %s: %sconnected\n",
348                                         __func__, connstate > 0 ? "" : "dis");
349                 ep->rep_connected = connstate;
350                 ep->rep_func(ep);
351                 wake_up_all(&ep->rep_connect_wait);
352                 break;
353         default:
354                 dprintk("RPC:       %s: unexpected CM event %d\n",
355                         __func__, event->event);
356                 break;
357         }
358
359 #ifdef RPC_DEBUG
360         if (connstate == 1) {
361                 int ird = attr.max_dest_rd_atomic;
362                 int tird = ep->rep_remote_cma.responder_resources;
363                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
364                         "on %s, memreg %d slots %d ird %d%s\n",
365                         &addr->sin_addr.s_addr,
366                         ntohs(addr->sin_port),
367                         ia->ri_id->device->name,
368                         ia->ri_memreg_strategy,
369                         xprt->rx_buf.rb_max_requests,
370                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
371         } else if (connstate < 0) {
372                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
373                         &addr->sin_addr.s_addr,
374                         ntohs(addr->sin_port),
375                         connstate);
376         }
377 #endif
378
379         return 0;
380 }
381
382 static struct rdma_cm_id *
383 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
384                         struct rpcrdma_ia *ia, struct sockaddr *addr)
385 {
386         struct rdma_cm_id *id;
387         int rc;
388
389         init_completion(&ia->ri_done);
390
391         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
392         if (IS_ERR(id)) {
393                 rc = PTR_ERR(id);
394                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
395                         __func__, rc);
396                 return id;
397         }
398
399         ia->ri_async_rc = -ETIMEDOUT;
400         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
401         if (rc) {
402                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
403                         __func__, rc);
404                 goto out;
405         }
406         wait_for_completion_interruptible_timeout(&ia->ri_done,
407                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
408         rc = ia->ri_async_rc;
409         if (rc)
410                 goto out;
411
412         ia->ri_async_rc = -ETIMEDOUT;
413         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
414         if (rc) {
415                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
416                         __func__, rc);
417                 goto out;
418         }
419         wait_for_completion_interruptible_timeout(&ia->ri_done,
420                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
421         rc = ia->ri_async_rc;
422         if (rc)
423                 goto out;
424
425         return id;
426
427 out:
428         rdma_destroy_id(id);
429         return ERR_PTR(rc);
430 }
431
432 /*
433  * Drain any cq, prior to teardown.
434  */
435 static void
436 rpcrdma_clean_cq(struct ib_cq *cq)
437 {
438         struct ib_wc wc;
439         int count = 0;
440
441         while (1 == ib_poll_cq(cq, 1, &wc))
442                 ++count;
443
444         if (count)
445                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
446                         __func__, count, wc.opcode);
447 }
448
449 /*
450  * Exported functions.
451  */
452
453 /*
454  * Open and initialize an Interface Adapter.
455  *  o initializes fields of struct rpcrdma_ia, including
456  *    interface and provider attributes and protection zone.
457  */
458 int
459 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
460 {
461         int rc, mem_priv;
462         struct ib_device_attr devattr;
463         struct rpcrdma_ia *ia = &xprt->rx_ia;
464
465         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
466         if (IS_ERR(ia->ri_id)) {
467                 rc = PTR_ERR(ia->ri_id);
468                 goto out1;
469         }
470
471         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
472         if (IS_ERR(ia->ri_pd)) {
473                 rc = PTR_ERR(ia->ri_pd);
474                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
475                         __func__, rc);
476                 goto out2;
477         }
478
479         /*
480          * Query the device to determine if the requested memory
481          * registration strategy is supported. If it isn't, set the
482          * strategy to a globally supported model.
483          */
484         rc = ib_query_device(ia->ri_id->device, &devattr);
485         if (rc) {
486                 dprintk("RPC:       %s: ib_query_device failed %d\n",
487                         __func__, rc);
488                 goto out2;
489         }
490
491         if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
492                 ia->ri_have_dma_lkey = 1;
493                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
494         }
495
496         switch (memreg) {
497         case RPCRDMA_MEMWINDOWS:
498         case RPCRDMA_MEMWINDOWS_ASYNC:
499                 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
500                         dprintk("RPC:       %s: MEMWINDOWS registration "
501                                 "specified but not supported by adapter, "
502                                 "using slower RPCRDMA_REGISTER\n",
503                                 __func__);
504                         memreg = RPCRDMA_REGISTER;
505                 }
506                 break;
507         case RPCRDMA_MTHCAFMR:
508                 if (!ia->ri_id->device->alloc_fmr) {
509 #if RPCRDMA_PERSISTENT_REGISTRATION
510                         dprintk("RPC:       %s: MTHCAFMR registration "
511                                 "specified but not supported by adapter, "
512                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
513                                 __func__);
514                         memreg = RPCRDMA_ALLPHYSICAL;
515 #else
516                         dprintk("RPC:       %s: MTHCAFMR registration "
517                                 "specified but not supported by adapter, "
518                                 "using slower RPCRDMA_REGISTER\n",
519                                 __func__);
520                         memreg = RPCRDMA_REGISTER;
521 #endif
522                 }
523                 break;
524         case RPCRDMA_FRMR:
525                 /* Requires both frmr reg and local dma lkey */
526                 if ((devattr.device_cap_flags &
527                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
528                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
529 #if RPCRDMA_PERSISTENT_REGISTRATION
530                         dprintk("RPC:       %s: FRMR registration "
531                                 "specified but not supported by adapter, "
532                                 "using riskier RPCRDMA_ALLPHYSICAL\n",
533                                 __func__);
534                         memreg = RPCRDMA_ALLPHYSICAL;
535 #else
536                         dprintk("RPC:       %s: FRMR registration "
537                                 "specified but not supported by adapter, "
538                                 "using slower RPCRDMA_REGISTER\n",
539                                 __func__);
540                         memreg = RPCRDMA_REGISTER;
541 #endif
542                 } else {
543                         /* Mind the ia limit on FRMR page list depth */
544                         ia->ri_max_frmr_depth = min_t(unsigned int,
545                                 RPCRDMA_MAX_DATA_SEGS,
546                                 devattr.max_fast_reg_page_list_len);
547                 }
548                 break;
549         }
550
551         /*
552          * Optionally obtain an underlying physical identity mapping in
553          * order to do a memory window-based bind. This base registration
554          * is protected from remote access - that is enabled only by binding
555          * for the specific bytes targeted during each RPC operation, and
556          * revoked after the corresponding completion similar to a storage
557          * adapter.
558          */
559         switch (memreg) {
560         case RPCRDMA_BOUNCEBUFFERS:
561         case RPCRDMA_REGISTER:
562         case RPCRDMA_FRMR:
563                 break;
564 #if RPCRDMA_PERSISTENT_REGISTRATION
565         case RPCRDMA_ALLPHYSICAL:
566                 mem_priv = IB_ACCESS_LOCAL_WRITE |
567                                 IB_ACCESS_REMOTE_WRITE |
568                                 IB_ACCESS_REMOTE_READ;
569                 goto register_setup;
570 #endif
571         case RPCRDMA_MEMWINDOWS_ASYNC:
572         case RPCRDMA_MEMWINDOWS:
573                 mem_priv = IB_ACCESS_LOCAL_WRITE |
574                                 IB_ACCESS_MW_BIND;
575                 goto register_setup;
576         case RPCRDMA_MTHCAFMR:
577                 if (ia->ri_have_dma_lkey)
578                         break;
579                 mem_priv = IB_ACCESS_LOCAL_WRITE;
580         register_setup:
581                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
582                 if (IS_ERR(ia->ri_bind_mem)) {
583                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
584                                 "phys register failed with %lX\n\t"
585                                 "Will continue with degraded performance\n",
586                                 __func__, PTR_ERR(ia->ri_bind_mem));
587                         memreg = RPCRDMA_REGISTER;
588                         ia->ri_bind_mem = NULL;
589                 }
590                 break;
591         default:
592                 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
593                                 __func__, memreg);
594                 rc = -EINVAL;
595                 goto out2;
596         }
597         dprintk("RPC:       %s: memory registration strategy is %d\n",
598                 __func__, memreg);
599
600         /* Else will do memory reg/dereg for each chunk */
601         ia->ri_memreg_strategy = memreg;
602
603         return 0;
604 out2:
605         rdma_destroy_id(ia->ri_id);
606         ia->ri_id = NULL;
607 out1:
608         return rc;
609 }
610
611 /*
612  * Clean up/close an IA.
613  *   o if event handles and PD have been initialized, free them.
614  *   o close the IA
615  */
616 void
617 rpcrdma_ia_close(struct rpcrdma_ia *ia)
618 {
619         int rc;
620
621         dprintk("RPC:       %s: entering\n", __func__);
622         if (ia->ri_bind_mem != NULL) {
623                 rc = ib_dereg_mr(ia->ri_bind_mem);
624                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
625                         __func__, rc);
626         }
627         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
628                 if (ia->ri_id->qp)
629                         rdma_destroy_qp(ia->ri_id);
630                 rdma_destroy_id(ia->ri_id);
631                 ia->ri_id = NULL;
632         }
633         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
634                 rc = ib_dealloc_pd(ia->ri_pd);
635                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
636                         __func__, rc);
637         }
638 }
639
640 /*
641  * Create unconnected endpoint.
642  */
643 int
644 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
645                                 struct rpcrdma_create_data_internal *cdata)
646 {
647         struct ib_device_attr devattr;
648         int rc, err;
649
650         rc = ib_query_device(ia->ri_id->device, &devattr);
651         if (rc) {
652                 dprintk("RPC:       %s: ib_query_device failed %d\n",
653                         __func__, rc);
654                 return rc;
655         }
656
657         /* check provider's send/recv wr limits */
658         if (cdata->max_requests > devattr.max_qp_wr)
659                 cdata->max_requests = devattr.max_qp_wr;
660
661         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
662         ep->rep_attr.qp_context = ep;
663         /* send_cq and recv_cq initialized below */
664         ep->rep_attr.srq = NULL;
665         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
666         switch (ia->ri_memreg_strategy) {
667         case RPCRDMA_FRMR: {
668                 int depth = 7;
669
670                 /* Add room for frmr register and invalidate WRs.
671                  * 1. FRMR reg WR for head
672                  * 2. FRMR invalidate WR for head
673                  * 3. N FRMR reg WRs for pagelist
674                  * 4. N FRMR invalidate WRs for pagelist
675                  * 5. FRMR reg WR for tail
676                  * 6. FRMR invalidate WR for tail
677                  * 7. The RDMA_SEND WR
678                  */
679
680                 /* Calculate N if the device max FRMR depth is smaller than
681                  * RPCRDMA_MAX_DATA_SEGS.
682                  */
683                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
684                         int delta = RPCRDMA_MAX_DATA_SEGS -
685                                     ia->ri_max_frmr_depth;
686
687                         do {
688                                 depth += 2; /* FRMR reg + invalidate */
689                                 delta -= ia->ri_max_frmr_depth;
690                         } while (delta > 0);
691
692                 }
693                 ep->rep_attr.cap.max_send_wr *= depth;
694                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
695                         cdata->max_requests = devattr.max_qp_wr / depth;
696                         if (!cdata->max_requests)
697                                 return -EINVAL;
698                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
699                                                        depth;
700                 }
701                 break;
702         }
703         case RPCRDMA_MEMWINDOWS_ASYNC:
704         case RPCRDMA_MEMWINDOWS:
705                 /* Add room for mw_binds+unbinds - overkill! */
706                 ep->rep_attr.cap.max_send_wr++;
707                 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
708                 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
709                         return -EINVAL;
710                 break;
711         default:
712                 break;
713         }
714         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
715         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
716         ep->rep_attr.cap.max_recv_sge = 1;
717         ep->rep_attr.cap.max_inline_data = 0;
718         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
719         ep->rep_attr.qp_type = IB_QPT_RC;
720         ep->rep_attr.port_num = ~0;
721
722         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
723                 "iovs: send %d recv %d\n",
724                 __func__,
725                 ep->rep_attr.cap.max_send_wr,
726                 ep->rep_attr.cap.max_recv_wr,
727                 ep->rep_attr.cap.max_send_sge,
728                 ep->rep_attr.cap.max_recv_sge);
729
730         /* set trigger for requesting send completion */
731         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
732         switch (ia->ri_memreg_strategy) {
733         case RPCRDMA_MEMWINDOWS_ASYNC:
734         case RPCRDMA_MEMWINDOWS:
735                 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
736                 break;
737         default:
738                 break;
739         }
740         if (ep->rep_cqinit <= 2)
741                 ep->rep_cqinit = 0;
742         INIT_CQCOUNT(ep);
743         ep->rep_ia = ia;
744         init_waitqueue_head(&ep->rep_connect_wait);
745
746         /*
747          * Create a single cq for receive dto and mw_bind (only ever
748          * care about unbind, really). Send completions are suppressed.
749          * Use single threaded tasklet upcalls to maintain ordering.
750          */
751         ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
752                                   rpcrdma_cq_async_error_upcall, NULL,
753                                   ep->rep_attr.cap.max_recv_wr +
754                                   ep->rep_attr.cap.max_send_wr + 1, 0);
755         if (IS_ERR(ep->rep_cq)) {
756                 rc = PTR_ERR(ep->rep_cq);
757                 dprintk("RPC:       %s: ib_create_cq failed: %i\n",
758                         __func__, rc);
759                 goto out1;
760         }
761
762         rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
763         if (rc) {
764                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
765                         __func__, rc);
766                 goto out2;
767         }
768
769         ep->rep_attr.send_cq = ep->rep_cq;
770         ep->rep_attr.recv_cq = ep->rep_cq;
771
772         /* Initialize cma parameters */
773
774         /* RPC/RDMA does not use private data */
775         ep->rep_remote_cma.private_data = NULL;
776         ep->rep_remote_cma.private_data_len = 0;
777
778         /* Client offers RDMA Read but does not initiate */
779         ep->rep_remote_cma.initiator_depth = 0;
780         if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
781                 ep->rep_remote_cma.responder_resources = 0;
782         else if (devattr.max_qp_rd_atom > 32)   /* arbitrary but <= 255 */
783                 ep->rep_remote_cma.responder_resources = 32;
784         else
785                 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
786
787         ep->rep_remote_cma.retry_count = 7;
788         ep->rep_remote_cma.flow_control = 0;
789         ep->rep_remote_cma.rnr_retry_count = 0;
790
791         return 0;
792
793 out2:
794         err = ib_destroy_cq(ep->rep_cq);
795         if (err)
796                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
797                         __func__, err);
798 out1:
799         return rc;
800 }
801
802 /*
803  * rpcrdma_ep_destroy
804  *
805  * Disconnect and destroy endpoint. After this, the only
806  * valid operations on the ep are to free it (if dynamically
807  * allocated) or re-create it.
808  *
809  * The caller's error handling must be sure to not leak the endpoint
810  * if this function fails.
811  */
812 int
813 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
814 {
815         int rc;
816
817         dprintk("RPC:       %s: entering, connected is %d\n",
818                 __func__, ep->rep_connected);
819
820         if (ia->ri_id->qp) {
821                 rc = rpcrdma_ep_disconnect(ep, ia);
822                 if (rc)
823                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
824                                 " returned %i\n", __func__, rc);
825                 rdma_destroy_qp(ia->ri_id);
826                 ia->ri_id->qp = NULL;
827         }
828
829         /* padding - could be done in rpcrdma_buffer_destroy... */
830         if (ep->rep_pad_mr) {
831                 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
832                 ep->rep_pad_mr = NULL;
833         }
834
835         rpcrdma_clean_cq(ep->rep_cq);
836         rc = ib_destroy_cq(ep->rep_cq);
837         if (rc)
838                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
839                         __func__, rc);
840
841         return rc;
842 }
843
844 /*
845  * Connect unconnected endpoint.
846  */
847 int
848 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
849 {
850         struct rdma_cm_id *id;
851         int rc = 0;
852         int retry_count = 0;
853
854         if (ep->rep_connected != 0) {
855                 struct rpcrdma_xprt *xprt;
856 retry:
857                 rc = rpcrdma_ep_disconnect(ep, ia);
858                 if (rc && rc != -ENOTCONN)
859                         dprintk("RPC:       %s: rpcrdma_ep_disconnect"
860                                 " status %i\n", __func__, rc);
861                 rpcrdma_clean_cq(ep->rep_cq);
862
863                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
864                 id = rpcrdma_create_id(xprt, ia,
865                                 (struct sockaddr *)&xprt->rx_data.addr);
866                 if (IS_ERR(id)) {
867                         rc = PTR_ERR(id);
868                         goto out;
869                 }
870                 /* TEMP TEMP TEMP - fail if new device:
871                  * Deregister/remarshal *all* requests!
872                  * Close and recreate adapter, pd, etc!
873                  * Re-determine all attributes still sane!
874                  * More stuff I haven't thought of!
875                  * Rrrgh!
876                  */
877                 if (ia->ri_id->device != id->device) {
878                         printk("RPC:       %s: can't reconnect on "
879                                 "different device!\n", __func__);
880                         rdma_destroy_id(id);
881                         rc = -ENETDOWN;
882                         goto out;
883                 }
884                 /* END TEMP */
885                 rdma_destroy_qp(ia->ri_id);
886                 rdma_destroy_id(ia->ri_id);
887                 ia->ri_id = id;
888         }
889
890         rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
891         if (rc) {
892                 dprintk("RPC:       %s: rdma_create_qp failed %i\n",
893                         __func__, rc);
894                 goto out;
895         }
896
897 /* XXX Tavor device performs badly with 2K MTU! */
898 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
899         struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
900         if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
901             (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
902              pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
903                 struct ib_qp_attr attr = {
904                         .path_mtu = IB_MTU_1024
905                 };
906                 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
907         }
908 }
909
910         ep->rep_connected = 0;
911
912         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
913         if (rc) {
914                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
915                                 __func__, rc);
916                 goto out;
917         }
918
919         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
920
921         /*
922          * Check state. A non-peer reject indicates no listener
923          * (ECONNREFUSED), which may be a transient state. All
924          * others indicate a transport condition which has already
925          * undergone a best-effort.
926          */
927         if (ep->rep_connected == -ECONNREFUSED &&
928             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
929                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
930                 goto retry;
931         }
932         if (ep->rep_connected <= 0) {
933                 /* Sometimes, the only way to reliably connect to remote
934                  * CMs is to use same nonzero values for ORD and IRD. */
935                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
936                     (ep->rep_remote_cma.responder_resources == 0 ||
937                      ep->rep_remote_cma.initiator_depth !=
938                                 ep->rep_remote_cma.responder_resources)) {
939                         if (ep->rep_remote_cma.responder_resources == 0)
940                                 ep->rep_remote_cma.responder_resources = 1;
941                         ep->rep_remote_cma.initiator_depth =
942                                 ep->rep_remote_cma.responder_resources;
943                         goto retry;
944                 }
945                 rc = ep->rep_connected;
946         } else {
947                 dprintk("RPC:       %s: connected\n", __func__);
948         }
949
950 out:
951         if (rc)
952                 ep->rep_connected = rc;
953         return rc;
954 }
955
956 /*
957  * rpcrdma_ep_disconnect
958  *
959  * This is separate from destroy to facilitate the ability
960  * to reconnect without recreating the endpoint.
961  *
962  * This call is not reentrant, and must not be made in parallel
963  * on the same endpoint.
964  */
965 int
966 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
967 {
968         int rc;
969
970         rpcrdma_clean_cq(ep->rep_cq);
971         rc = rdma_disconnect(ia->ri_id);
972         if (!rc) {
973                 /* returns without wait if not connected */
974                 wait_event_interruptible(ep->rep_connect_wait,
975                                                         ep->rep_connected != 1);
976                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
977                         (ep->rep_connected == 1) ? "still " : "dis");
978         } else {
979                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
980                 ep->rep_connected = rc;
981         }
982         return rc;
983 }
984
985 /*
986  * Initialize buffer memory
987  */
988 int
989 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
990         struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
991 {
992         char *p;
993         size_t len;
994         int i, rc;
995         struct rpcrdma_mw *r;
996
997         buf->rb_max_requests = cdata->max_requests;
998         spin_lock_init(&buf->rb_lock);
999         atomic_set(&buf->rb_credits, 1);
1000
1001         /* Need to allocate:
1002          *   1.  arrays for send and recv pointers
1003          *   2.  arrays of struct rpcrdma_req to fill in pointers
1004          *   3.  array of struct rpcrdma_rep for replies
1005          *   4.  padding, if any
1006          *   5.  mw's, fmr's or frmr's, if any
1007          * Send/recv buffers in req/rep need to be registered
1008          */
1009
1010         len = buf->rb_max_requests *
1011                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1012         len += cdata->padding;
1013         switch (ia->ri_memreg_strategy) {
1014         case RPCRDMA_FRMR:
1015                 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1016                                 sizeof(struct rpcrdma_mw);
1017                 break;
1018         case RPCRDMA_MTHCAFMR:
1019                 /* TBD we are perhaps overallocating here */
1020                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1021                                 sizeof(struct rpcrdma_mw);
1022                 break;
1023         case RPCRDMA_MEMWINDOWS_ASYNC:
1024         case RPCRDMA_MEMWINDOWS:
1025                 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1026                                 sizeof(struct rpcrdma_mw);
1027                 break;
1028         default:
1029                 break;
1030         }
1031
1032         /* allocate 1, 4 and 5 in one shot */
1033         p = kzalloc(len, GFP_KERNEL);
1034         if (p == NULL) {
1035                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1036                         __func__, len);
1037                 rc = -ENOMEM;
1038                 goto out;
1039         }
1040         buf->rb_pool = p;       /* for freeing it later */
1041
1042         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1043         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1044         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1045         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1046
1047         /*
1048          * Register the zeroed pad buffer, if any.
1049          */
1050         if (cdata->padding) {
1051                 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1052                                             &ep->rep_pad_mr, &ep->rep_pad);
1053                 if (rc)
1054                         goto out;
1055         }
1056         p += cdata->padding;
1057
1058         /*
1059          * Allocate the fmr's, or mw's for mw_bind chunk registration.
1060          * We "cycle" the mw's in order to minimize rkey reuse,
1061          * and also reduce unbind-to-bind collision.
1062          */
1063         INIT_LIST_HEAD(&buf->rb_mws);
1064         r = (struct rpcrdma_mw *)p;
1065         switch (ia->ri_memreg_strategy) {
1066         case RPCRDMA_FRMR:
1067                 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1068                         r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1069                                                 ia->ri_max_frmr_depth);
1070                         if (IS_ERR(r->r.frmr.fr_mr)) {
1071                                 rc = PTR_ERR(r->r.frmr.fr_mr);
1072                                 dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1073                                         " failed %i\n", __func__, rc);
1074                                 goto out;
1075                         }
1076                         r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1077                                                 ia->ri_id->device,
1078                                                 ia->ri_max_frmr_depth);
1079                         if (IS_ERR(r->r.frmr.fr_pgl)) {
1080                                 rc = PTR_ERR(r->r.frmr.fr_pgl);
1081                                 dprintk("RPC:       %s: "
1082                                         "ib_alloc_fast_reg_page_list "
1083                                         "failed %i\n", __func__, rc);
1084                                 goto out;
1085                         }
1086                         list_add(&r->mw_list, &buf->rb_mws);
1087                         ++r;
1088                 }
1089                 break;
1090         case RPCRDMA_MTHCAFMR:
1091                 /* TBD we are perhaps overallocating here */
1092                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1093                         static struct ib_fmr_attr fa =
1094                                 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1095                         r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1096                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1097                                 &fa);
1098                         if (IS_ERR(r->r.fmr)) {
1099                                 rc = PTR_ERR(r->r.fmr);
1100                                 dprintk("RPC:       %s: ib_alloc_fmr"
1101                                         " failed %i\n", __func__, rc);
1102                                 goto out;
1103                         }
1104                         list_add(&r->mw_list, &buf->rb_mws);
1105                         ++r;
1106                 }
1107                 break;
1108         case RPCRDMA_MEMWINDOWS_ASYNC:
1109         case RPCRDMA_MEMWINDOWS:
1110                 /* Allocate one extra request's worth, for full cycling */
1111                 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1112                         r->r.mw = ib_alloc_mw(ia->ri_pd, IB_MW_TYPE_1);
1113                         if (IS_ERR(r->r.mw)) {
1114                                 rc = PTR_ERR(r->r.mw);
1115                                 dprintk("RPC:       %s: ib_alloc_mw"
1116                                         " failed %i\n", __func__, rc);
1117                                 goto out;
1118                         }
1119                         list_add(&r->mw_list, &buf->rb_mws);
1120                         ++r;
1121                 }
1122                 break;
1123         default:
1124                 break;
1125         }
1126
1127         /*
1128          * Allocate/init the request/reply buffers. Doing this
1129          * using kmalloc for now -- one for each buf.
1130          */
1131         for (i = 0; i < buf->rb_max_requests; i++) {
1132                 struct rpcrdma_req *req;
1133                 struct rpcrdma_rep *rep;
1134
1135                 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1136                 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1137                 /* Typical ~2400b, so rounding up saves work later */
1138                 if (len < 4096)
1139                         len = 4096;
1140                 req = kmalloc(len, GFP_KERNEL);
1141                 if (req == NULL) {
1142                         dprintk("RPC:       %s: request buffer %d alloc"
1143                                 " failed\n", __func__, i);
1144                         rc = -ENOMEM;
1145                         goto out;
1146                 }
1147                 memset(req, 0, sizeof(struct rpcrdma_req));
1148                 buf->rb_send_bufs[i] = req;
1149                 buf->rb_send_bufs[i]->rl_buffer = buf;
1150
1151                 rc = rpcrdma_register_internal(ia, req->rl_base,
1152                                 len - offsetof(struct rpcrdma_req, rl_base),
1153                                 &buf->rb_send_bufs[i]->rl_handle,
1154                                 &buf->rb_send_bufs[i]->rl_iov);
1155                 if (rc)
1156                         goto out;
1157
1158                 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1159
1160                 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1161                 rep = kmalloc(len, GFP_KERNEL);
1162                 if (rep == NULL) {
1163                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1164                                 __func__, i);
1165                         rc = -ENOMEM;
1166                         goto out;
1167                 }
1168                 memset(rep, 0, sizeof(struct rpcrdma_rep));
1169                 buf->rb_recv_bufs[i] = rep;
1170                 buf->rb_recv_bufs[i]->rr_buffer = buf;
1171                 init_waitqueue_head(&rep->rr_unbind);
1172
1173                 rc = rpcrdma_register_internal(ia, rep->rr_base,
1174                                 len - offsetof(struct rpcrdma_rep, rr_base),
1175                                 &buf->rb_recv_bufs[i]->rr_handle,
1176                                 &buf->rb_recv_bufs[i]->rr_iov);
1177                 if (rc)
1178                         goto out;
1179
1180         }
1181         dprintk("RPC:       %s: max_requests %d\n",
1182                 __func__, buf->rb_max_requests);
1183         /* done */
1184         return 0;
1185 out:
1186         rpcrdma_buffer_destroy(buf);
1187         return rc;
1188 }
1189
1190 /*
1191  * Unregister and destroy buffer memory. Need to deal with
1192  * partial initialization, so it's callable from failed create.
1193  * Must be called before destroying endpoint, as registrations
1194  * reference it.
1195  */
1196 void
1197 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1198 {
1199         int rc, i;
1200         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1201         struct rpcrdma_mw *r;
1202
1203         /* clean up in reverse order from create
1204          *   1.  recv mr memory (mr free, then kfree)
1205          *   1a. bind mw memory
1206          *   2.  send mr memory (mr free, then kfree)
1207          *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1208          *   4.  arrays
1209          */
1210         dprintk("RPC:       %s: entering\n", __func__);
1211
1212         for (i = 0; i < buf->rb_max_requests; i++) {
1213                 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1214                         rpcrdma_deregister_internal(ia,
1215                                         buf->rb_recv_bufs[i]->rr_handle,
1216                                         &buf->rb_recv_bufs[i]->rr_iov);
1217                         kfree(buf->rb_recv_bufs[i]);
1218                 }
1219                 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1220                         while (!list_empty(&buf->rb_mws)) {
1221                                 r = list_entry(buf->rb_mws.next,
1222                                         struct rpcrdma_mw, mw_list);
1223                                 list_del(&r->mw_list);
1224                                 switch (ia->ri_memreg_strategy) {
1225                                 case RPCRDMA_FRMR:
1226                                         rc = ib_dereg_mr(r->r.frmr.fr_mr);
1227                                         if (rc)
1228                                                 dprintk("RPC:       %s:"
1229                                                         " ib_dereg_mr"
1230                                                         " failed %i\n",
1231                                                         __func__, rc);
1232                                         ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1233                                         break;
1234                                 case RPCRDMA_MTHCAFMR:
1235                                         rc = ib_dealloc_fmr(r->r.fmr);
1236                                         if (rc)
1237                                                 dprintk("RPC:       %s:"
1238                                                         " ib_dealloc_fmr"
1239                                                         " failed %i\n",
1240                                                         __func__, rc);
1241                                         break;
1242                                 case RPCRDMA_MEMWINDOWS_ASYNC:
1243                                 case RPCRDMA_MEMWINDOWS:
1244                                         rc = ib_dealloc_mw(r->r.mw);
1245                                         if (rc)
1246                                                 dprintk("RPC:       %s:"
1247                                                         " ib_dealloc_mw"
1248                                                         " failed %i\n",
1249                                                         __func__, rc);
1250                                         break;
1251                                 default:
1252                                         break;
1253                                 }
1254                         }
1255                         rpcrdma_deregister_internal(ia,
1256                                         buf->rb_send_bufs[i]->rl_handle,
1257                                         &buf->rb_send_bufs[i]->rl_iov);
1258                         kfree(buf->rb_send_bufs[i]);
1259                 }
1260         }
1261
1262         kfree(buf->rb_pool);
1263 }
1264
1265 /*
1266  * Get a set of request/reply buffers.
1267  *
1268  * Reply buffer (if needed) is attached to send buffer upon return.
1269  * Rule:
1270  *    rb_send_index and rb_recv_index MUST always be pointing to the
1271  *    *next* available buffer (non-NULL). They are incremented after
1272  *    removing buffers, and decremented *before* returning them.
1273  */
1274 struct rpcrdma_req *
1275 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1276 {
1277         struct rpcrdma_req *req;
1278         unsigned long flags;
1279         int i;
1280         struct rpcrdma_mw *r;
1281
1282         spin_lock_irqsave(&buffers->rb_lock, flags);
1283         if (buffers->rb_send_index == buffers->rb_max_requests) {
1284                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1285                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1286                 return ((struct rpcrdma_req *)NULL);
1287         }
1288
1289         req = buffers->rb_send_bufs[buffers->rb_send_index];
1290         if (buffers->rb_send_index < buffers->rb_recv_index) {
1291                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1292                         __func__,
1293                         buffers->rb_recv_index - buffers->rb_send_index);
1294                 req->rl_reply = NULL;
1295         } else {
1296                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1297                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1298         }
1299         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1300         if (!list_empty(&buffers->rb_mws)) {
1301                 i = RPCRDMA_MAX_SEGS - 1;
1302                 do {
1303                         r = list_entry(buffers->rb_mws.next,
1304                                         struct rpcrdma_mw, mw_list);
1305                         list_del(&r->mw_list);
1306                         req->rl_segments[i].mr_chunk.rl_mw = r;
1307                 } while (--i >= 0);
1308         }
1309         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1310         return req;
1311 }
1312
1313 /*
1314  * Put request/reply buffers back into pool.
1315  * Pre-decrement counter/array index.
1316  */
1317 void
1318 rpcrdma_buffer_put(struct rpcrdma_req *req)
1319 {
1320         struct rpcrdma_buffer *buffers = req->rl_buffer;
1321         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1322         int i;
1323         unsigned long flags;
1324
1325         BUG_ON(req->rl_nchunks != 0);
1326         spin_lock_irqsave(&buffers->rb_lock, flags);
1327         buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1328         req->rl_niovs = 0;
1329         if (req->rl_reply) {
1330                 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1331                 init_waitqueue_head(&req->rl_reply->rr_unbind);
1332                 req->rl_reply->rr_func = NULL;
1333                 req->rl_reply = NULL;
1334         }
1335         switch (ia->ri_memreg_strategy) {
1336         case RPCRDMA_FRMR:
1337         case RPCRDMA_MTHCAFMR:
1338         case RPCRDMA_MEMWINDOWS_ASYNC:
1339         case RPCRDMA_MEMWINDOWS:
1340                 /*
1341                  * Cycle mw's back in reverse order, and "spin" them.
1342                  * This delays and scrambles reuse as much as possible.
1343                  */
1344                 i = 1;
1345                 do {
1346                         struct rpcrdma_mw **mw;
1347                         mw = &req->rl_segments[i].mr_chunk.rl_mw;
1348                         list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1349                         *mw = NULL;
1350                 } while (++i < RPCRDMA_MAX_SEGS);
1351                 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1352                                         &buffers->rb_mws);
1353                 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1354                 break;
1355         default:
1356                 break;
1357         }
1358         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1359 }
1360
1361 /*
1362  * Recover reply buffers from pool.
1363  * This happens when recovering from error conditions.
1364  * Post-increment counter/array index.
1365  */
1366 void
1367 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1368 {
1369         struct rpcrdma_buffer *buffers = req->rl_buffer;
1370         unsigned long flags;
1371
1372         if (req->rl_iov.length == 0)    /* special case xprt_rdma_allocate() */
1373                 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1374         spin_lock_irqsave(&buffers->rb_lock, flags);
1375         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1376                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1377                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1378         }
1379         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1380 }
1381
1382 /*
1383  * Put reply buffers back into pool when not attached to
1384  * request. This happens in error conditions, and when
1385  * aborting unbinds. Pre-decrement counter/array index.
1386  */
1387 void
1388 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1389 {
1390         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1391         unsigned long flags;
1392
1393         rep->rr_func = NULL;
1394         spin_lock_irqsave(&buffers->rb_lock, flags);
1395         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1396         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1397 }
1398
1399 /*
1400  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1401  */
1402
1403 int
1404 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1405                                 struct ib_mr **mrp, struct ib_sge *iov)
1406 {
1407         struct ib_phys_buf ipb;
1408         struct ib_mr *mr;
1409         int rc;
1410
1411         /*
1412          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1413          */
1414         iov->addr = ib_dma_map_single(ia->ri_id->device,
1415                         va, len, DMA_BIDIRECTIONAL);
1416         iov->length = len;
1417
1418         if (ia->ri_have_dma_lkey) {
1419                 *mrp = NULL;
1420                 iov->lkey = ia->ri_dma_lkey;
1421                 return 0;
1422         } else if (ia->ri_bind_mem != NULL) {
1423                 *mrp = NULL;
1424                 iov->lkey = ia->ri_bind_mem->lkey;
1425                 return 0;
1426         }
1427
1428         ipb.addr = iov->addr;
1429         ipb.size = iov->length;
1430         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1431                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1432
1433         dprintk("RPC:       %s: phys convert: 0x%llx "
1434                         "registered 0x%llx length %d\n",
1435                         __func__, (unsigned long long)ipb.addr,
1436                         (unsigned long long)iov->addr, len);
1437
1438         if (IS_ERR(mr)) {
1439                 *mrp = NULL;
1440                 rc = PTR_ERR(mr);
1441                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1442         } else {
1443                 *mrp = mr;
1444                 iov->lkey = mr->lkey;
1445                 rc = 0;
1446         }
1447
1448         return rc;
1449 }
1450
1451 int
1452 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1453                                 struct ib_mr *mr, struct ib_sge *iov)
1454 {
1455         int rc;
1456
1457         ib_dma_unmap_single(ia->ri_id->device,
1458                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1459
1460         if (NULL == mr)
1461                 return 0;
1462
1463         rc = ib_dereg_mr(mr);
1464         if (rc)
1465                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1466         return rc;
1467 }
1468
1469 /*
1470  * Wrappers for chunk registration, shared by read/write chunk code.
1471  */
1472
1473 static void
1474 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1475 {
1476         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1477         seg->mr_dmalen = seg->mr_len;
1478         if (seg->mr_page)
1479                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1480                                 seg->mr_page, offset_in_page(seg->mr_offset),
1481                                 seg->mr_dmalen, seg->mr_dir);
1482         else
1483                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1484                                 seg->mr_offset,
1485                                 seg->mr_dmalen, seg->mr_dir);
1486         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1487                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1488                         __func__,
1489                         (unsigned long long)seg->mr_dma,
1490                         seg->mr_offset, seg->mr_dmalen);
1491         }
1492 }
1493
1494 static void
1495 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1496 {
1497         if (seg->mr_page)
1498                 ib_dma_unmap_page(ia->ri_id->device,
1499                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1500         else
1501                 ib_dma_unmap_single(ia->ri_id->device,
1502                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1503 }
1504
1505 static int
1506 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1507                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1508                         struct rpcrdma_xprt *r_xprt)
1509 {
1510         struct rpcrdma_mr_seg *seg1 = seg;
1511         struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1512
1513         u8 key;
1514         int len, pageoff;
1515         int i, rc;
1516         int seg_len;
1517         u64 pa;
1518         int page_no;
1519
1520         pageoff = offset_in_page(seg1->mr_offset);
1521         seg1->mr_offset -= pageoff;     /* start of page */
1522         seg1->mr_len += pageoff;
1523         len = -pageoff;
1524         if (*nsegs > ia->ri_max_frmr_depth)
1525                 *nsegs = ia->ri_max_frmr_depth;
1526         for (page_no = i = 0; i < *nsegs;) {
1527                 rpcrdma_map_one(ia, seg, writing);
1528                 pa = seg->mr_dma;
1529                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1530                         seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1531                                 page_list[page_no++] = pa;
1532                         pa += PAGE_SIZE;
1533                 }
1534                 len += seg->mr_len;
1535                 ++seg;
1536                 ++i;
1537                 /* Check for holes */
1538                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1539                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1540                         break;
1541         }
1542         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1543                 __func__, seg1->mr_chunk.rl_mw, i);
1544
1545         if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1546                 dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1547                         __func__,
1548                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1549                 /* Invalidate before using. */
1550                 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1551                 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1552                 invalidate_wr.next = &frmr_wr;
1553                 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1554                 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1555                 invalidate_wr.ex.invalidate_rkey =
1556                         seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1557                 DECR_CQCOUNT(&r_xprt->rx_ep);
1558                 post_wr = &invalidate_wr;
1559         } else
1560                 post_wr = &frmr_wr;
1561
1562         /* Bump the key */
1563         key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1564         ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1565
1566         /* Prepare FRMR WR */
1567         memset(&frmr_wr, 0, sizeof frmr_wr);
1568         frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1569         frmr_wr.opcode = IB_WR_FAST_REG_MR;
1570         frmr_wr.send_flags = IB_SEND_SIGNALED;
1571         frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1572         frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1573         frmr_wr.wr.fast_reg.page_list_len = page_no;
1574         frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1575         frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1576         BUG_ON(frmr_wr.wr.fast_reg.length < len);
1577         frmr_wr.wr.fast_reg.access_flags = (writing ?
1578                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1579                                 IB_ACCESS_REMOTE_READ);
1580         frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1581         DECR_CQCOUNT(&r_xprt->rx_ep);
1582
1583         rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1584
1585         if (rc) {
1586                 dprintk("RPC:       %s: failed ib_post_send for register,"
1587                         " status %i\n", __func__, rc);
1588                 while (i--)
1589                         rpcrdma_unmap_one(ia, --seg);
1590         } else {
1591                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1592                 seg1->mr_base = seg1->mr_dma + pageoff;
1593                 seg1->mr_nsegs = i;
1594                 seg1->mr_len = len;
1595         }
1596         *nsegs = i;
1597         return rc;
1598 }
1599
1600 static int
1601 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1602                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1603 {
1604         struct rpcrdma_mr_seg *seg1 = seg;
1605         struct ib_send_wr invalidate_wr, *bad_wr;
1606         int rc;
1607
1608         while (seg1->mr_nsegs--)
1609                 rpcrdma_unmap_one(ia, seg++);
1610
1611         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1612         invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1613         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1614         invalidate_wr.send_flags = IB_SEND_SIGNALED;
1615         invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1616         DECR_CQCOUNT(&r_xprt->rx_ep);
1617
1618         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1619         if (rc)
1620                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1621                         " status %i\n", __func__, rc);
1622         return rc;
1623 }
1624
1625 static int
1626 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1627                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1628 {
1629         struct rpcrdma_mr_seg *seg1 = seg;
1630         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1631         int len, pageoff, i, rc;
1632
1633         pageoff = offset_in_page(seg1->mr_offset);
1634         seg1->mr_offset -= pageoff;     /* start of page */
1635         seg1->mr_len += pageoff;
1636         len = -pageoff;
1637         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1638                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1639         for (i = 0; i < *nsegs;) {
1640                 rpcrdma_map_one(ia, seg, writing);
1641                 physaddrs[i] = seg->mr_dma;
1642                 len += seg->mr_len;
1643                 ++seg;
1644                 ++i;
1645                 /* Check for holes */
1646                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1647                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1648                         break;
1649         }
1650         rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1651                                 physaddrs, i, seg1->mr_dma);
1652         if (rc) {
1653                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
1654                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
1655                         len, (unsigned long long)seg1->mr_dma,
1656                         pageoff, i, rc);
1657                 while (i--)
1658                         rpcrdma_unmap_one(ia, --seg);
1659         } else {
1660                 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1661                 seg1->mr_base = seg1->mr_dma + pageoff;
1662                 seg1->mr_nsegs = i;
1663                 seg1->mr_len = len;
1664         }
1665         *nsegs = i;
1666         return rc;
1667 }
1668
1669 static int
1670 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1671                         struct rpcrdma_ia *ia)
1672 {
1673         struct rpcrdma_mr_seg *seg1 = seg;
1674         LIST_HEAD(l);
1675         int rc;
1676
1677         list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1678         rc = ib_unmap_fmr(&l);
1679         while (seg1->mr_nsegs--)
1680                 rpcrdma_unmap_one(ia, seg++);
1681         if (rc)
1682                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
1683                         " status %i\n", __func__, rc);
1684         return rc;
1685 }
1686
1687 static int
1688 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1689                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1690                         struct rpcrdma_xprt *r_xprt)
1691 {
1692         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1693                                   IB_ACCESS_REMOTE_READ);
1694         struct ib_mw_bind param;
1695         int rc;
1696
1697         *nsegs = 1;
1698         rpcrdma_map_one(ia, seg, writing);
1699         param.bind_info.mr = ia->ri_bind_mem;
1700         param.wr_id = 0ULL;     /* no send cookie */
1701         param.bind_info.addr = seg->mr_dma;
1702         param.bind_info.length = seg->mr_len;
1703         param.send_flags = 0;
1704         param.bind_info.mw_access_flags = mem_priv;
1705
1706         DECR_CQCOUNT(&r_xprt->rx_ep);
1707         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1708         if (rc) {
1709                 dprintk("RPC:       %s: failed ib_bind_mw "
1710                         "%u@0x%llx status %i\n",
1711                         __func__, seg->mr_len,
1712                         (unsigned long long)seg->mr_dma, rc);
1713                 rpcrdma_unmap_one(ia, seg);
1714         } else {
1715                 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1716                 seg->mr_base = param.bind_info.addr;
1717                 seg->mr_nsegs = 1;
1718         }
1719         return rc;
1720 }
1721
1722 static int
1723 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1724                         struct rpcrdma_ia *ia,
1725                         struct rpcrdma_xprt *r_xprt, void **r)
1726 {
1727         struct ib_mw_bind param;
1728         LIST_HEAD(l);
1729         int rc;
1730
1731         BUG_ON(seg->mr_nsegs != 1);
1732         param.bind_info.mr = ia->ri_bind_mem;
1733         param.bind_info.addr = 0ULL;    /* unbind */
1734         param.bind_info.length = 0;
1735         param.bind_info.mw_access_flags = 0;
1736         if (*r) {
1737                 param.wr_id = (u64) (unsigned long) *r;
1738                 param.send_flags = IB_SEND_SIGNALED;
1739                 INIT_CQCOUNT(&r_xprt->rx_ep);
1740         } else {
1741                 param.wr_id = 0ULL;
1742                 param.send_flags = 0;
1743                 DECR_CQCOUNT(&r_xprt->rx_ep);
1744         }
1745         rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1746         rpcrdma_unmap_one(ia, seg);
1747         if (rc)
1748                 dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1749                         " status %i\n", __func__, rc);
1750         else
1751                 *r = NULL;      /* will upcall on completion */
1752         return rc;
1753 }
1754
1755 static int
1756 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1757                         int *nsegs, int writing, struct rpcrdma_ia *ia)
1758 {
1759         int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1760                                   IB_ACCESS_REMOTE_READ);
1761         struct rpcrdma_mr_seg *seg1 = seg;
1762         struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1763         int len, i, rc = 0;
1764
1765         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1766                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1767         for (len = 0, i = 0; i < *nsegs;) {
1768                 rpcrdma_map_one(ia, seg, writing);
1769                 ipb[i].addr = seg->mr_dma;
1770                 ipb[i].size = seg->mr_len;
1771                 len += seg->mr_len;
1772                 ++seg;
1773                 ++i;
1774                 /* Check for holes */
1775                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1776                     offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1777                         break;
1778         }
1779         seg1->mr_base = seg1->mr_dma;
1780         seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1781                                 ipb, i, mem_priv, &seg1->mr_base);
1782         if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1783                 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1784                 dprintk("RPC:       %s: failed ib_reg_phys_mr "
1785                         "%u@0x%llx (%d)... status %i\n",
1786                         __func__, len,
1787                         (unsigned long long)seg1->mr_dma, i, rc);
1788                 while (i--)
1789                         rpcrdma_unmap_one(ia, --seg);
1790         } else {
1791                 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1792                 seg1->mr_nsegs = i;
1793                 seg1->mr_len = len;
1794         }
1795         *nsegs = i;
1796         return rc;
1797 }
1798
1799 static int
1800 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1801                         struct rpcrdma_ia *ia)
1802 {
1803         struct rpcrdma_mr_seg *seg1 = seg;
1804         int rc;
1805
1806         rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1807         seg1->mr_chunk.rl_mr = NULL;
1808         while (seg1->mr_nsegs--)
1809                 rpcrdma_unmap_one(ia, seg++);
1810         if (rc)
1811                 dprintk("RPC:       %s: failed ib_dereg_mr,"
1812                         " status %i\n", __func__, rc);
1813         return rc;
1814 }
1815
1816 int
1817 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1818                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1819 {
1820         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1821         int rc = 0;
1822
1823         switch (ia->ri_memreg_strategy) {
1824
1825 #if RPCRDMA_PERSISTENT_REGISTRATION
1826         case RPCRDMA_ALLPHYSICAL:
1827                 rpcrdma_map_one(ia, seg, writing);
1828                 seg->mr_rkey = ia->ri_bind_mem->rkey;
1829                 seg->mr_base = seg->mr_dma;
1830                 seg->mr_nsegs = 1;
1831                 nsegs = 1;
1832                 break;
1833 #endif
1834
1835         /* Registration using frmr registration */
1836         case RPCRDMA_FRMR:
1837                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1838                 break;
1839
1840         /* Registration using fmr memory registration */
1841         case RPCRDMA_MTHCAFMR:
1842                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1843                 break;
1844
1845         /* Registration using memory windows */
1846         case RPCRDMA_MEMWINDOWS_ASYNC:
1847         case RPCRDMA_MEMWINDOWS:
1848                 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1849                 break;
1850
1851         /* Default registration each time */
1852         default:
1853                 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1854                 break;
1855         }
1856         if (rc)
1857                 return -1;
1858
1859         return nsegs;
1860 }
1861
1862 int
1863 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1864                 struct rpcrdma_xprt *r_xprt, void *r)
1865 {
1866         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1867         int nsegs = seg->mr_nsegs, rc;
1868
1869         switch (ia->ri_memreg_strategy) {
1870
1871 #if RPCRDMA_PERSISTENT_REGISTRATION
1872         case RPCRDMA_ALLPHYSICAL:
1873                 BUG_ON(nsegs != 1);
1874                 rpcrdma_unmap_one(ia, seg);
1875                 rc = 0;
1876                 break;
1877 #endif
1878
1879         case RPCRDMA_FRMR:
1880                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1881                 break;
1882
1883         case RPCRDMA_MTHCAFMR:
1884                 rc = rpcrdma_deregister_fmr_external(seg, ia);
1885                 break;
1886
1887         case RPCRDMA_MEMWINDOWS_ASYNC:
1888         case RPCRDMA_MEMWINDOWS:
1889                 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1890                 break;
1891
1892         default:
1893                 rc = rpcrdma_deregister_default_external(seg, ia);
1894                 break;
1895         }
1896         if (r) {
1897                 struct rpcrdma_rep *rep = r;
1898                 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1899                 rep->rr_func = NULL;
1900                 func(rep);      /* dereg done, callback now */
1901         }
1902         return nsegs;
1903 }
1904
1905 /*
1906  * Prepost any receive buffer, then post send.
1907  *
1908  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1909  */
1910 int
1911 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1912                 struct rpcrdma_ep *ep,
1913                 struct rpcrdma_req *req)
1914 {
1915         struct ib_send_wr send_wr, *send_wr_fail;
1916         struct rpcrdma_rep *rep = req->rl_reply;
1917         int rc;
1918
1919         if (rep) {
1920                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1921                 if (rc)
1922                         goto out;
1923                 req->rl_reply = NULL;
1924         }
1925
1926         send_wr.next = NULL;
1927         send_wr.wr_id = 0ULL;   /* no send cookie */
1928         send_wr.sg_list = req->rl_send_iov;
1929         send_wr.num_sge = req->rl_niovs;
1930         send_wr.opcode = IB_WR_SEND;
1931         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
1932                 ib_dma_sync_single_for_device(ia->ri_id->device,
1933                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1934                         DMA_TO_DEVICE);
1935         ib_dma_sync_single_for_device(ia->ri_id->device,
1936                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1937                 DMA_TO_DEVICE);
1938         ib_dma_sync_single_for_device(ia->ri_id->device,
1939                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1940                 DMA_TO_DEVICE);
1941
1942         if (DECR_CQCOUNT(ep) > 0)
1943                 send_wr.send_flags = 0;
1944         else { /* Provider must take a send completion every now and then */
1945                 INIT_CQCOUNT(ep);
1946                 send_wr.send_flags = IB_SEND_SIGNALED;
1947         }
1948
1949         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1950         if (rc)
1951                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1952                         rc);
1953 out:
1954         return rc;
1955 }
1956
1957 /*
1958  * (Re)post a receive buffer.
1959  */
1960 int
1961 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1962                      struct rpcrdma_ep *ep,
1963                      struct rpcrdma_rep *rep)
1964 {
1965         struct ib_recv_wr recv_wr, *recv_wr_fail;
1966         int rc;
1967
1968         recv_wr.next = NULL;
1969         recv_wr.wr_id = (u64) (unsigned long) rep;
1970         recv_wr.sg_list = &rep->rr_iov;
1971         recv_wr.num_sge = 1;
1972
1973         ib_dma_sync_single_for_cpu(ia->ri_id->device,
1974                 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1975
1976         DECR_CQCOUNT(ep);
1977         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1978
1979         if (rc)
1980                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1981                         rc);
1982         return rc;
1983 }