2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * Encapsulates the major functions managing:
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
54 #include "xprt_rdma.h"
60 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
61 # define RPCDBG_FACILITY RPCDBG_TRANS
64 static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
65 static void rpcrdma_reset_fmrs(struct rpcrdma_ia *);
72 * handle replies in tasklet context, using a single, global list
73 * rdma tasklet function -- just turn around and call the func
74 * for all replies on the list
77 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
78 static LIST_HEAD(rpcrdma_tasklets_g);
81 rpcrdma_run_tasklet(unsigned long data)
83 struct rpcrdma_rep *rep;
84 void (*func)(struct rpcrdma_rep *);
88 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
89 while (!list_empty(&rpcrdma_tasklets_g)) {
90 rep = list_entry(rpcrdma_tasklets_g.next,
91 struct rpcrdma_rep, rr_list);
92 list_del(&rep->rr_list);
95 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 rpcrdma_recv_buffer_put(rep);
102 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
104 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
107 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
109 static const char * const async_event[] = {
114 "communication established",
115 "send queue drained",
116 "path migration successful",
118 "device fatal error",
131 #define ASYNC_MSG(status) \
132 ((status) < ARRAY_SIZE(async_event) ? \
133 async_event[(status)] : "unknown async error")
136 rpcrdma_schedule_tasklet(struct list_head *sched_list)
140 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
141 list_splice_tail(sched_list, &rpcrdma_tasklets_g);
142 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
143 tasklet_schedule(&rpcrdma_tasklet_g);
147 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
149 struct rpcrdma_ep *ep = context;
151 pr_err("RPC: %s: %s on device %s ep %p\n",
152 __func__, ASYNC_MSG(event->event),
153 event->device->name, context);
154 if (ep->rep_connected == 1) {
155 ep->rep_connected = -EIO;
157 wake_up_all(&ep->rep_connect_wait);
162 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
164 struct rpcrdma_ep *ep = context;
166 pr_err("RPC: %s: %s on device %s ep %p\n",
167 __func__, ASYNC_MSG(event->event),
168 event->device->name, context);
169 if (ep->rep_connected == 1) {
170 ep->rep_connected = -EIO;
172 wake_up_all(&ep->rep_connect_wait);
176 static const char * const wc_status[] = {
178 "local length error",
179 "local QP operation error",
180 "local EE context operation error",
181 "local protection error",
183 "memory management operation error",
184 "bad response error",
185 "local access error",
186 "remote invalid request error",
187 "remote access error",
188 "remote operation error",
189 "transport retry counter exceeded",
190 "RNR retrycounter exceeded",
191 "local RDD violation error",
192 "remove invalid RD request",
194 "invalid EE context number",
195 "invalid EE context state",
197 "response timeout error",
201 #define COMPLETION_MSG(status) \
202 ((status) < ARRAY_SIZE(wc_status) ? \
203 wc_status[(status)] : "unexpected completion error")
206 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
208 if (likely(wc->status == IB_WC_SUCCESS))
211 /* WARNING: Only wr_id and status are reliable at this point */
212 if (wc->wr_id == 0ULL) {
213 if (wc->status != IB_WC_WR_FLUSH_ERR)
214 pr_err("RPC: %s: SEND: %s\n",
215 __func__, COMPLETION_MSG(wc->status));
217 struct rpcrdma_mw *r;
219 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
220 r->r.frmr.fr_state = FRMR_IS_STALE;
221 pr_err("RPC: %s: frmr %p (stale): %s\n",
222 __func__, r, COMPLETION_MSG(wc->status));
227 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
230 int budget, count, rc;
232 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
234 wcs = ep->rep_send_wcs;
236 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
242 rpcrdma_sendcq_process_wc(wcs++);
243 } while (rc == RPCRDMA_POLLSIZE && --budget);
248 * Handle send, fast_reg_mr, and local_inv completions.
250 * Send events are typically suppressed and thus do not result
251 * in an upcall. Occasionally one is signaled, however. This
252 * prevents the provider's completion queue from wrapping and
253 * losing a completion.
256 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
258 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
261 rc = rpcrdma_sendcq_poll(cq, ep);
263 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
268 rc = ib_req_notify_cq(cq,
269 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
273 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
278 rpcrdma_sendcq_poll(cq, ep);
282 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
284 struct rpcrdma_rep *rep =
285 (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
287 /* WARNING: Only wr_id and status are reliable at this point */
288 if (wc->status != IB_WC_SUCCESS)
291 /* status == SUCCESS means all fields in wc are trustworthy */
292 if (wc->opcode != IB_WC_RECV)
295 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
296 __func__, rep, wc->byte_len);
298 rep->rr_len = wc->byte_len;
299 ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
300 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
302 if (rep->rr_len >= 16) {
303 struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
304 unsigned int credits = ntohl(p->rm_credit);
307 credits = 1; /* don't deadlock */
308 else if (credits > rep->rr_buffer->rb_max_requests)
309 credits = rep->rr_buffer->rb_max_requests;
310 atomic_set(&rep->rr_buffer->rb_credits, credits);
314 list_add_tail(&rep->rr_list, sched_list);
317 if (wc->status != IB_WC_WR_FLUSH_ERR)
318 pr_err("RPC: %s: rep %p: %s\n",
319 __func__, rep, COMPLETION_MSG(wc->status));
325 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
327 struct list_head sched_list;
329 int budget, count, rc;
331 INIT_LIST_HEAD(&sched_list);
332 budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
334 wcs = ep->rep_recv_wcs;
336 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
342 rpcrdma_recvcq_process_wc(wcs++, &sched_list);
343 } while (rc == RPCRDMA_POLLSIZE && --budget);
347 rpcrdma_schedule_tasklet(&sched_list);
352 * Handle receive completions.
354 * It is reentrant but processes single events in order to maintain
355 * ordering of receives to keep server credits.
357 * It is the responsibility of the scheduled tasklet to return
358 * recv buffers to the pool. NOTE: this affects synchronization of
359 * connection shutdown. That is, the structures required for
360 * the completion of the reply handler must remain intact until
361 * all memory has been reclaimed.
364 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
366 struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
369 rc = rpcrdma_recvcq_poll(cq, ep);
371 dprintk("RPC: %s: ib_poll_cq failed: %i\n",
376 rc = ib_req_notify_cq(cq,
377 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
381 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
386 rpcrdma_recvcq_poll(cq, ep);
390 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
393 LIST_HEAD(sched_list);
395 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
396 rpcrdma_recvcq_process_wc(&wc, &sched_list);
397 if (!list_empty(&sched_list))
398 rpcrdma_schedule_tasklet(&sched_list);
399 while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
400 rpcrdma_sendcq_process_wc(&wc);
403 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
404 static const char * const conn[] = {
423 #define CONNECTION_MSG(status) \
424 ((status) < ARRAY_SIZE(conn) ? \
425 conn[(status)] : "unrecognized connection error")
429 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
431 struct rpcrdma_xprt *xprt = id->context;
432 struct rpcrdma_ia *ia = &xprt->rx_ia;
433 struct rpcrdma_ep *ep = &xprt->rx_ep;
434 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
435 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
437 struct ib_qp_attr attr;
438 struct ib_qp_init_attr iattr;
441 switch (event->event) {
442 case RDMA_CM_EVENT_ADDR_RESOLVED:
443 case RDMA_CM_EVENT_ROUTE_RESOLVED:
445 complete(&ia->ri_done);
447 case RDMA_CM_EVENT_ADDR_ERROR:
448 ia->ri_async_rc = -EHOSTUNREACH;
449 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
451 complete(&ia->ri_done);
453 case RDMA_CM_EVENT_ROUTE_ERROR:
454 ia->ri_async_rc = -ENETUNREACH;
455 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
457 complete(&ia->ri_done);
459 case RDMA_CM_EVENT_ESTABLISHED:
461 ib_query_qp(ia->ri_id->qp, &attr,
462 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
464 dprintk("RPC: %s: %d responder resources"
466 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
468 case RDMA_CM_EVENT_CONNECT_ERROR:
469 connstate = -ENOTCONN;
471 case RDMA_CM_EVENT_UNREACHABLE:
472 connstate = -ENETDOWN;
474 case RDMA_CM_EVENT_REJECTED:
475 connstate = -ECONNREFUSED;
477 case RDMA_CM_EVENT_DISCONNECTED:
478 connstate = -ECONNABORTED;
480 case RDMA_CM_EVENT_DEVICE_REMOVAL:
483 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
484 dprintk("RPC: %s: %sconnected\n",
485 __func__, connstate > 0 ? "" : "dis");
486 ep->rep_connected = connstate;
488 wake_up_all(&ep->rep_connect_wait);
491 dprintk("RPC: %s: %pI4:%u (ep 0x%p): %s\n",
492 __func__, &addr->sin_addr.s_addr,
493 ntohs(addr->sin_port), ep,
494 CONNECTION_MSG(event->event));
498 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
499 if (connstate == 1) {
500 int ird = attr.max_dest_rd_atomic;
501 int tird = ep->rep_remote_cma.responder_resources;
502 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
503 "on %s, memreg %d slots %d ird %d%s\n",
504 &addr->sin_addr.s_addr,
505 ntohs(addr->sin_port),
506 ia->ri_id->device->name,
507 ia->ri_memreg_strategy,
508 xprt->rx_buf.rb_max_requests,
509 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
510 } else if (connstate < 0) {
511 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
512 &addr->sin_addr.s_addr,
513 ntohs(addr->sin_port),
521 static struct rdma_cm_id *
522 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
523 struct rpcrdma_ia *ia, struct sockaddr *addr)
525 struct rdma_cm_id *id;
528 init_completion(&ia->ri_done);
530 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
533 dprintk("RPC: %s: rdma_create_id() failed %i\n",
538 ia->ri_async_rc = -ETIMEDOUT;
539 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
541 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
545 wait_for_completion_interruptible_timeout(&ia->ri_done,
546 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
547 rc = ia->ri_async_rc;
551 ia->ri_async_rc = -ETIMEDOUT;
552 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
554 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
558 wait_for_completion_interruptible_timeout(&ia->ri_done,
559 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
560 rc = ia->ri_async_rc;
572 * Drain any cq, prior to teardown.
575 rpcrdma_clean_cq(struct ib_cq *cq)
580 while (1 == ib_poll_cq(cq, 1, &wc))
584 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
585 __func__, count, wc.opcode);
589 * Exported functions.
593 * Open and initialize an Interface Adapter.
594 * o initializes fields of struct rpcrdma_ia, including
595 * interface and provider attributes and protection zone.
598 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
601 struct ib_device_attr devattr;
602 struct rpcrdma_ia *ia = &xprt->rx_ia;
604 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
605 if (IS_ERR(ia->ri_id)) {
606 rc = PTR_ERR(ia->ri_id);
610 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
611 if (IS_ERR(ia->ri_pd)) {
612 rc = PTR_ERR(ia->ri_pd);
613 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
619 * Query the device to determine if the requested memory
620 * registration strategy is supported. If it isn't, set the
621 * strategy to a globally supported model.
623 rc = ib_query_device(ia->ri_id->device, &devattr);
625 dprintk("RPC: %s: ib_query_device failed %d\n",
630 if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
631 ia->ri_have_dma_lkey = 1;
632 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
635 if (memreg == RPCRDMA_FRMR) {
636 /* Requires both frmr reg and local dma lkey */
637 if ((devattr.device_cap_flags &
638 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
639 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
640 dprintk("RPC: %s: FRMR registration "
641 "not supported by HCA\n", __func__);
642 memreg = RPCRDMA_MTHCAFMR;
644 /* Mind the ia limit on FRMR page list depth */
645 ia->ri_max_frmr_depth = min_t(unsigned int,
646 RPCRDMA_MAX_DATA_SEGS,
647 devattr.max_fast_reg_page_list_len);
650 if (memreg == RPCRDMA_MTHCAFMR) {
651 if (!ia->ri_id->device->alloc_fmr) {
652 dprintk("RPC: %s: MTHCAFMR registration "
653 "not supported by HCA\n", __func__);
654 memreg = RPCRDMA_ALLPHYSICAL;
659 * Optionally obtain an underlying physical identity mapping in
660 * order to do a memory window-based bind. This base registration
661 * is protected from remote access - that is enabled only by binding
662 * for the specific bytes targeted during each RPC operation, and
663 * revoked after the corresponding completion similar to a storage
669 case RPCRDMA_ALLPHYSICAL:
670 mem_priv = IB_ACCESS_LOCAL_WRITE |
671 IB_ACCESS_REMOTE_WRITE |
672 IB_ACCESS_REMOTE_READ;
674 case RPCRDMA_MTHCAFMR:
675 if (ia->ri_have_dma_lkey)
677 mem_priv = IB_ACCESS_LOCAL_WRITE;
679 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
680 if (IS_ERR(ia->ri_bind_mem)) {
681 printk(KERN_ALERT "%s: ib_get_dma_mr for "
682 "phys register failed with %lX\n",
683 __func__, PTR_ERR(ia->ri_bind_mem));
689 printk(KERN_ERR "RPC: Unsupported memory "
690 "registration mode: %d\n", memreg);
694 dprintk("RPC: %s: memory registration strategy is %d\n",
697 /* Else will do memory reg/dereg for each chunk */
698 ia->ri_memreg_strategy = memreg;
700 rwlock_init(&ia->ri_qplock);
703 rdma_destroy_id(ia->ri_id);
710 * Clean up/close an IA.
711 * o if event handles and PD have been initialized, free them.
715 rpcrdma_ia_close(struct rpcrdma_ia *ia)
719 dprintk("RPC: %s: entering\n", __func__);
720 if (ia->ri_bind_mem != NULL) {
721 rc = ib_dereg_mr(ia->ri_bind_mem);
722 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
725 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
727 rdma_destroy_qp(ia->ri_id);
728 rdma_destroy_id(ia->ri_id);
731 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
732 rc = ib_dealloc_pd(ia->ri_pd);
733 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
739 * Create unconnected endpoint.
742 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
743 struct rpcrdma_create_data_internal *cdata)
745 struct ib_device_attr devattr;
746 struct ib_cq *sendcq, *recvcq;
749 rc = ib_query_device(ia->ri_id->device, &devattr);
751 dprintk("RPC: %s: ib_query_device failed %d\n",
756 /* check provider's send/recv wr limits */
757 if (cdata->max_requests > devattr.max_qp_wr)
758 cdata->max_requests = devattr.max_qp_wr;
760 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
761 ep->rep_attr.qp_context = ep;
762 /* send_cq and recv_cq initialized below */
763 ep->rep_attr.srq = NULL;
764 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
765 switch (ia->ri_memreg_strategy) {
769 /* Add room for frmr register and invalidate WRs.
770 * 1. FRMR reg WR for head
771 * 2. FRMR invalidate WR for head
772 * 3. N FRMR reg WRs for pagelist
773 * 4. N FRMR invalidate WRs for pagelist
774 * 5. FRMR reg WR for tail
775 * 6. FRMR invalidate WR for tail
776 * 7. The RDMA_SEND WR
779 /* Calculate N if the device max FRMR depth is smaller than
780 * RPCRDMA_MAX_DATA_SEGS.
782 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
783 int delta = RPCRDMA_MAX_DATA_SEGS -
784 ia->ri_max_frmr_depth;
787 depth += 2; /* FRMR reg + invalidate */
788 delta -= ia->ri_max_frmr_depth;
792 ep->rep_attr.cap.max_send_wr *= depth;
793 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
794 cdata->max_requests = devattr.max_qp_wr / depth;
795 if (!cdata->max_requests)
797 ep->rep_attr.cap.max_send_wr = cdata->max_requests *
805 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
806 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
807 ep->rep_attr.cap.max_recv_sge = 1;
808 ep->rep_attr.cap.max_inline_data = 0;
809 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
810 ep->rep_attr.qp_type = IB_QPT_RC;
811 ep->rep_attr.port_num = ~0;
813 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
814 "iovs: send %d recv %d\n",
816 ep->rep_attr.cap.max_send_wr,
817 ep->rep_attr.cap.max_recv_wr,
818 ep->rep_attr.cap.max_send_sge,
819 ep->rep_attr.cap.max_recv_sge);
821 /* set trigger for requesting send completion */
822 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
823 if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
824 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
825 else if (ep->rep_cqinit <= 2)
828 init_waitqueue_head(&ep->rep_connect_wait);
829 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
831 sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
832 rpcrdma_cq_async_error_upcall, ep,
833 ep->rep_attr.cap.max_send_wr + 1, 0);
834 if (IS_ERR(sendcq)) {
835 rc = PTR_ERR(sendcq);
836 dprintk("RPC: %s: failed to create send CQ: %i\n",
841 rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
843 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
848 recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
849 rpcrdma_cq_async_error_upcall, ep,
850 ep->rep_attr.cap.max_recv_wr + 1, 0);
851 if (IS_ERR(recvcq)) {
852 rc = PTR_ERR(recvcq);
853 dprintk("RPC: %s: failed to create recv CQ: %i\n",
858 rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
860 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
862 ib_destroy_cq(recvcq);
866 ep->rep_attr.send_cq = sendcq;
867 ep->rep_attr.recv_cq = recvcq;
869 /* Initialize cma parameters */
871 /* RPC/RDMA does not use private data */
872 ep->rep_remote_cma.private_data = NULL;
873 ep->rep_remote_cma.private_data_len = 0;
875 /* Client offers RDMA Read but does not initiate */
876 ep->rep_remote_cma.initiator_depth = 0;
877 if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
878 ep->rep_remote_cma.responder_resources = 32;
880 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
882 ep->rep_remote_cma.retry_count = 7;
883 ep->rep_remote_cma.flow_control = 0;
884 ep->rep_remote_cma.rnr_retry_count = 0;
889 err = ib_destroy_cq(sendcq);
891 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
900 * Disconnect and destroy endpoint. After this, the only
901 * valid operations on the ep are to free it (if dynamically
902 * allocated) or re-create it.
905 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
909 dprintk("RPC: %s: entering, connected is %d\n",
910 __func__, ep->rep_connected);
912 cancel_delayed_work_sync(&ep->rep_connect_worker);
915 rpcrdma_ep_disconnect(ep, ia);
916 rdma_destroy_qp(ia->ri_id);
917 ia->ri_id->qp = NULL;
920 /* padding - could be done in rpcrdma_buffer_destroy... */
921 if (ep->rep_pad_mr) {
922 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
923 ep->rep_pad_mr = NULL;
926 rpcrdma_clean_cq(ep->rep_attr.recv_cq);
927 rc = ib_destroy_cq(ep->rep_attr.recv_cq);
929 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
932 rpcrdma_clean_cq(ep->rep_attr.send_cq);
933 rc = ib_destroy_cq(ep->rep_attr.send_cq);
935 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
940 * Connect unconnected endpoint.
943 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
945 struct rdma_cm_id *id, *old;
949 if (ep->rep_connected != 0) {
950 struct rpcrdma_xprt *xprt;
952 dprintk("RPC: %s: reconnecting...\n", __func__);
954 rpcrdma_ep_disconnect(ep, ia);
955 rpcrdma_flush_cqs(ep);
957 switch (ia->ri_memreg_strategy) {
959 rpcrdma_reset_frmrs(ia);
961 case RPCRDMA_MTHCAFMR:
962 rpcrdma_reset_fmrs(ia);
964 case RPCRDMA_ALLPHYSICAL:
971 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
972 id = rpcrdma_create_id(xprt, ia,
973 (struct sockaddr *)&xprt->rx_data.addr);
978 /* TEMP TEMP TEMP - fail if new device:
979 * Deregister/remarshal *all* requests!
980 * Close and recreate adapter, pd, etc!
981 * Re-determine all attributes still sane!
982 * More stuff I haven't thought of!
985 if (ia->ri_id->device != id->device) {
986 printk("RPC: %s: can't reconnect on "
987 "different device!\n", __func__);
993 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
995 dprintk("RPC: %s: rdma_create_qp failed %i\n",
1002 write_lock(&ia->ri_qplock);
1005 write_unlock(&ia->ri_qplock);
1007 rdma_destroy_qp(old);
1008 rdma_destroy_id(old);
1010 dprintk("RPC: %s: connecting...\n", __func__);
1011 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
1013 dprintk("RPC: %s: rdma_create_qp failed %i\n",
1015 /* do not update ep->rep_connected */
1016 return -ENETUNREACH;
1020 ep->rep_connected = 0;
1022 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
1024 dprintk("RPC: %s: rdma_connect() failed with %i\n",
1029 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
1032 * Check state. A non-peer reject indicates no listener
1033 * (ECONNREFUSED), which may be a transient state. All
1034 * others indicate a transport condition which has already
1035 * undergone a best-effort.
1037 if (ep->rep_connected == -ECONNREFUSED &&
1038 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
1039 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
1042 if (ep->rep_connected <= 0) {
1043 /* Sometimes, the only way to reliably connect to remote
1044 * CMs is to use same nonzero values for ORD and IRD. */
1045 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
1046 (ep->rep_remote_cma.responder_resources == 0 ||
1047 ep->rep_remote_cma.initiator_depth !=
1048 ep->rep_remote_cma.responder_resources)) {
1049 if (ep->rep_remote_cma.responder_resources == 0)
1050 ep->rep_remote_cma.responder_resources = 1;
1051 ep->rep_remote_cma.initiator_depth =
1052 ep->rep_remote_cma.responder_resources;
1055 rc = ep->rep_connected;
1057 dprintk("RPC: %s: connected\n", __func__);
1062 ep->rep_connected = rc;
1067 * rpcrdma_ep_disconnect
1069 * This is separate from destroy to facilitate the ability
1070 * to reconnect without recreating the endpoint.
1072 * This call is not reentrant, and must not be made in parallel
1073 * on the same endpoint.
1076 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1080 rpcrdma_flush_cqs(ep);
1081 rc = rdma_disconnect(ia->ri_id);
1083 /* returns without wait if not connected */
1084 wait_event_interruptible(ep->rep_connect_wait,
1085 ep->rep_connected != 1);
1086 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
1087 (ep->rep_connected == 1) ? "still " : "dis");
1089 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
1090 ep->rep_connected = rc;
1095 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1097 int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
1098 struct ib_fmr_attr fmr_attr = {
1099 .max_pages = RPCRDMA_MAX_DATA_SEGS,
1101 .page_shift = PAGE_SHIFT
1103 struct rpcrdma_mw *r;
1106 i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1107 dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i);
1110 r = kzalloc(sizeof(*r), GFP_KERNEL);
1114 r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1115 if (IS_ERR(r->r.fmr)) {
1116 rc = PTR_ERR(r->r.fmr);
1117 dprintk("RPC: %s: ib_alloc_fmr failed %i\n",
1122 list_add(&r->mw_list, &buf->rb_mws);
1123 list_add(&r->mw_all, &buf->rb_all);
1133 rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1135 struct rpcrdma_frmr *f;
1136 struct rpcrdma_mw *r;
1139 i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1140 dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i);
1143 r = kzalloc(sizeof(*r), GFP_KERNEL);
1148 f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1149 ia->ri_max_frmr_depth);
1150 if (IS_ERR(f->fr_mr)) {
1151 rc = PTR_ERR(f->fr_mr);
1152 dprintk("RPC: %s: ib_alloc_fast_reg_mr "
1153 "failed %i\n", __func__, rc);
1157 f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1158 ia->ri_max_frmr_depth);
1159 if (IS_ERR(f->fr_pgl)) {
1160 rc = PTR_ERR(f->fr_pgl);
1161 dprintk("RPC: %s: ib_alloc_fast_reg_page_list "
1162 "failed %i\n", __func__, rc);
1164 ib_dereg_mr(f->fr_mr);
1168 list_add(&r->mw_list, &buf->rb_mws);
1169 list_add(&r->mw_all, &buf->rb_all);
1180 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1181 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1184 size_t len, rlen, wlen;
1187 buf->rb_max_requests = cdata->max_requests;
1188 spin_lock_init(&buf->rb_lock);
1189 atomic_set(&buf->rb_credits, 1);
1191 /* Need to allocate:
1192 * 1. arrays for send and recv pointers
1193 * 2. arrays of struct rpcrdma_req to fill in pointers
1194 * 3. array of struct rpcrdma_rep for replies
1195 * 4. padding, if any
1196 * Send/recv buffers in req/rep need to be registered
1198 len = buf->rb_max_requests *
1199 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1200 len += cdata->padding;
1202 p = kzalloc(len, GFP_KERNEL);
1204 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1209 buf->rb_pool = p; /* for freeing it later */
1211 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1212 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1213 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1214 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1217 * Register the zeroed pad buffer, if any.
1219 if (cdata->padding) {
1220 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1221 &ep->rep_pad_mr, &ep->rep_pad);
1225 p += cdata->padding;
1227 INIT_LIST_HEAD(&buf->rb_mws);
1228 INIT_LIST_HEAD(&buf->rb_all);
1229 switch (ia->ri_memreg_strategy) {
1231 rc = rpcrdma_init_frmrs(ia, buf);
1235 case RPCRDMA_MTHCAFMR:
1236 rc = rpcrdma_init_fmrs(ia, buf);
1245 * Allocate/init the request/reply buffers. Doing this
1246 * using kmalloc for now -- one for each buf.
1248 wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1249 rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1250 dprintk("RPC: %s: wlen = %zu, rlen = %zu\n",
1251 __func__, wlen, rlen);
1253 for (i = 0; i < buf->rb_max_requests; i++) {
1254 struct rpcrdma_req *req;
1255 struct rpcrdma_rep *rep;
1257 req = kmalloc(wlen, GFP_KERNEL);
1259 dprintk("RPC: %s: request buffer %d alloc"
1260 " failed\n", __func__, i);
1264 memset(req, 0, sizeof(struct rpcrdma_req));
1265 buf->rb_send_bufs[i] = req;
1266 buf->rb_send_bufs[i]->rl_buffer = buf;
1268 rc = rpcrdma_register_internal(ia, req->rl_base,
1269 wlen - offsetof(struct rpcrdma_req, rl_base),
1270 &buf->rb_send_bufs[i]->rl_handle,
1271 &buf->rb_send_bufs[i]->rl_iov);
1275 buf->rb_send_bufs[i]->rl_size = wlen -
1276 sizeof(struct rpcrdma_req);
1278 rep = kmalloc(rlen, GFP_KERNEL);
1280 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1285 memset(rep, 0, sizeof(struct rpcrdma_rep));
1286 buf->rb_recv_bufs[i] = rep;
1287 buf->rb_recv_bufs[i]->rr_buffer = buf;
1289 rc = rpcrdma_register_internal(ia, rep->rr_base,
1290 rlen - offsetof(struct rpcrdma_rep, rr_base),
1291 &buf->rb_recv_bufs[i]->rr_handle,
1292 &buf->rb_recv_bufs[i]->rr_iov);
1297 dprintk("RPC: %s: max_requests %d\n",
1298 __func__, buf->rb_max_requests);
1302 rpcrdma_buffer_destroy(buf);
1307 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1309 struct rpcrdma_mw *r;
1312 while (!list_empty(&buf->rb_all)) {
1313 r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1314 list_del(&r->mw_all);
1315 list_del(&r->mw_list);
1317 rc = ib_dealloc_fmr(r->r.fmr);
1319 dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
1327 rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1329 struct rpcrdma_mw *r;
1332 while (!list_empty(&buf->rb_all)) {
1333 r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1334 list_del(&r->mw_all);
1335 list_del(&r->mw_list);
1337 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1339 dprintk("RPC: %s: ib_dereg_mr failed %i\n",
1341 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1348 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1350 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1353 /* clean up in reverse order from create
1354 * 1. recv mr memory (mr free, then kfree)
1355 * 2. send mr memory (mr free, then kfree)
1358 dprintk("RPC: %s: entering\n", __func__);
1360 for (i = 0; i < buf->rb_max_requests; i++) {
1361 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1362 rpcrdma_deregister_internal(ia,
1363 buf->rb_recv_bufs[i]->rr_handle,
1364 &buf->rb_recv_bufs[i]->rr_iov);
1365 kfree(buf->rb_recv_bufs[i]);
1367 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1368 rpcrdma_deregister_internal(ia,
1369 buf->rb_send_bufs[i]->rl_handle,
1370 &buf->rb_send_bufs[i]->rl_iov);
1371 kfree(buf->rb_send_bufs[i]);
1375 switch (ia->ri_memreg_strategy) {
1377 rpcrdma_destroy_frmrs(buf);
1379 case RPCRDMA_MTHCAFMR:
1380 rpcrdma_destroy_fmrs(buf);
1386 kfree(buf->rb_pool);
1389 /* After a disconnect, unmap all FMRs.
1391 * This is invoked only in the transport connect worker in order
1392 * to serialize with rpcrdma_register_fmr_external().
1395 rpcrdma_reset_fmrs(struct rpcrdma_ia *ia)
1397 struct rpcrdma_xprt *r_xprt =
1398 container_of(ia, struct rpcrdma_xprt, rx_ia);
1399 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1400 struct list_head *pos;
1401 struct rpcrdma_mw *r;
1405 list_for_each(pos, &buf->rb_all) {
1406 r = list_entry(pos, struct rpcrdma_mw, mw_all);
1409 list_add(&r->r.fmr->list, &l);
1410 rc = ib_unmap_fmr(&l);
1412 dprintk("RPC: %s: ib_unmap_fmr failed %i\n",
1417 /* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1418 * an unusable state. Find FRMRs in this state and dereg / reg
1419 * each. FRMRs that are VALID and attached to an rpcrdma_req are
1422 * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1424 * This is invoked only in the transport connect worker in order
1425 * to serialize with rpcrdma_register_frmr_external().
1428 rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1430 struct rpcrdma_xprt *r_xprt =
1431 container_of(ia, struct rpcrdma_xprt, rx_ia);
1432 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1433 struct list_head *pos;
1434 struct rpcrdma_mw *r;
1437 list_for_each(pos, &buf->rb_all) {
1438 r = list_entry(pos, struct rpcrdma_mw, mw_all);
1440 if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1443 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1445 dprintk("RPC: %s: ib_dereg_mr failed %i\n",
1447 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1449 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1450 ia->ri_max_frmr_depth);
1451 if (IS_ERR(r->r.frmr.fr_mr)) {
1452 rc = PTR_ERR(r->r.frmr.fr_mr);
1453 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1454 " failed %i\n", __func__, rc);
1457 r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1459 ia->ri_max_frmr_depth);
1460 if (IS_ERR(r->r.frmr.fr_pgl)) {
1461 rc = PTR_ERR(r->r.frmr.fr_pgl);
1463 "ib_alloc_fast_reg_page_list "
1464 "failed %i\n", __func__, rc);
1466 ib_dereg_mr(r->r.frmr.fr_mr);
1469 r->r.frmr.fr_state = FRMR_IS_INVALID;
1473 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1474 * some req segments uninitialized.
1477 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1480 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1485 /* Cycle mw's back in reverse order, and "spin" them.
1486 * This delays and scrambles reuse as much as possible.
1489 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1491 struct rpcrdma_mr_seg *seg = req->rl_segments;
1492 struct rpcrdma_mr_seg *seg1 = seg;
1495 for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1496 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1497 rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1501 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1503 buf->rb_send_bufs[--buf->rb_send_index] = req;
1505 if (req->rl_reply) {
1506 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1507 req->rl_reply->rr_func = NULL;
1508 req->rl_reply = NULL;
1512 /* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1513 * Redo only the ib_post_send().
1516 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1518 struct rpcrdma_xprt *r_xprt =
1519 container_of(ia, struct rpcrdma_xprt, rx_ia);
1520 struct ib_send_wr invalidate_wr, *bad_wr;
1523 dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
1525 /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1526 r->r.frmr.fr_state = FRMR_IS_INVALID;
1528 memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1529 invalidate_wr.wr_id = (unsigned long)(void *)r;
1530 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1531 invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1532 DECR_CQCOUNT(&r_xprt->rx_ep);
1534 dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
1535 __func__, r, r->r.frmr.fr_mr->rkey);
1537 read_lock(&ia->ri_qplock);
1538 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1539 read_unlock(&ia->ri_qplock);
1541 /* Force rpcrdma_buffer_get() to retry */
1542 r->r.frmr.fr_state = FRMR_IS_STALE;
1543 dprintk("RPC: %s: ib_post_send failed, %i\n",
1549 rpcrdma_retry_flushed_linv(struct list_head *stale,
1550 struct rpcrdma_buffer *buf)
1552 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1553 struct list_head *pos;
1554 struct rpcrdma_mw *r;
1555 unsigned long flags;
1557 list_for_each(pos, stale) {
1558 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1559 rpcrdma_retry_local_inv(r, ia);
1562 spin_lock_irqsave(&buf->rb_lock, flags);
1563 list_splice_tail(stale, &buf->rb_mws);
1564 spin_unlock_irqrestore(&buf->rb_lock, flags);
1567 static struct rpcrdma_req *
1568 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1569 struct list_head *stale)
1571 struct rpcrdma_mw *r;
1574 i = RPCRDMA_MAX_SEGS - 1;
1575 while (!list_empty(&buf->rb_mws)) {
1576 r = list_entry(buf->rb_mws.next,
1577 struct rpcrdma_mw, mw_list);
1578 list_del(&r->mw_list);
1579 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1580 list_add(&r->mw_list, stale);
1583 req->rl_segments[i].rl_mw = r;
1584 if (unlikely(i-- == 0))
1585 return req; /* Success */
1588 /* Not enough entries on rb_mws for this req */
1589 rpcrdma_buffer_put_sendbuf(req, buf);
1590 rpcrdma_buffer_put_mrs(req, buf);
1594 static struct rpcrdma_req *
1595 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1597 struct rpcrdma_mw *r;
1600 i = RPCRDMA_MAX_SEGS - 1;
1601 while (!list_empty(&buf->rb_mws)) {
1602 r = list_entry(buf->rb_mws.next,
1603 struct rpcrdma_mw, mw_list);
1604 list_del(&r->mw_list);
1605 req->rl_segments[i].rl_mw = r;
1606 if (unlikely(i-- == 0))
1607 return req; /* Success */
1610 /* Not enough entries on rb_mws for this req */
1611 rpcrdma_buffer_put_sendbuf(req, buf);
1612 rpcrdma_buffer_put_mrs(req, buf);
1617 * Get a set of request/reply buffers.
1619 * Reply buffer (if needed) is attached to send buffer upon return.
1621 * rb_send_index and rb_recv_index MUST always be pointing to the
1622 * *next* available buffer (non-NULL). They are incremented after
1623 * removing buffers, and decremented *before* returning them.
1625 struct rpcrdma_req *
1626 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1628 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1629 struct list_head stale;
1630 struct rpcrdma_req *req;
1631 unsigned long flags;
1633 spin_lock_irqsave(&buffers->rb_lock, flags);
1634 if (buffers->rb_send_index == buffers->rb_max_requests) {
1635 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1636 dprintk("RPC: %s: out of request buffers\n", __func__);
1637 return ((struct rpcrdma_req *)NULL);
1640 req = buffers->rb_send_bufs[buffers->rb_send_index];
1641 if (buffers->rb_send_index < buffers->rb_recv_index) {
1642 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1644 buffers->rb_recv_index - buffers->rb_send_index);
1645 req->rl_reply = NULL;
1647 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1648 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1650 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1652 INIT_LIST_HEAD(&stale);
1653 switch (ia->ri_memreg_strategy) {
1655 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1657 case RPCRDMA_MTHCAFMR:
1658 req = rpcrdma_buffer_get_fmrs(req, buffers);
1663 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1664 if (!list_empty(&stale))
1665 rpcrdma_retry_flushed_linv(&stale, buffers);
1670 * Put request/reply buffers back into pool.
1671 * Pre-decrement counter/array index.
1674 rpcrdma_buffer_put(struct rpcrdma_req *req)
1676 struct rpcrdma_buffer *buffers = req->rl_buffer;
1677 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1678 unsigned long flags;
1680 spin_lock_irqsave(&buffers->rb_lock, flags);
1681 rpcrdma_buffer_put_sendbuf(req, buffers);
1682 switch (ia->ri_memreg_strategy) {
1684 case RPCRDMA_MTHCAFMR:
1685 rpcrdma_buffer_put_mrs(req, buffers);
1690 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1694 * Recover reply buffers from pool.
1695 * This happens when recovering from error conditions.
1696 * Post-increment counter/array index.
1699 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1701 struct rpcrdma_buffer *buffers = req->rl_buffer;
1702 unsigned long flags;
1704 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1705 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1706 spin_lock_irqsave(&buffers->rb_lock, flags);
1707 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1708 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1709 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1711 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1715 * Put reply buffers back into pool when not attached to
1716 * request. This happens in error conditions.
1719 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1721 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1722 unsigned long flags;
1724 rep->rr_func = NULL;
1725 spin_lock_irqsave(&buffers->rb_lock, flags);
1726 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1727 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1731 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1735 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1736 struct ib_mr **mrp, struct ib_sge *iov)
1738 struct ib_phys_buf ipb;
1743 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1745 iov->addr = ib_dma_map_single(ia->ri_id->device,
1746 va, len, DMA_BIDIRECTIONAL);
1747 if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1752 if (ia->ri_have_dma_lkey) {
1754 iov->lkey = ia->ri_dma_lkey;
1756 } else if (ia->ri_bind_mem != NULL) {
1758 iov->lkey = ia->ri_bind_mem->lkey;
1762 ipb.addr = iov->addr;
1763 ipb.size = iov->length;
1764 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1765 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1767 dprintk("RPC: %s: phys convert: 0x%llx "
1768 "registered 0x%llx length %d\n",
1769 __func__, (unsigned long long)ipb.addr,
1770 (unsigned long long)iov->addr, len);
1775 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1778 iov->lkey = mr->lkey;
1786 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1787 struct ib_mr *mr, struct ib_sge *iov)
1791 ib_dma_unmap_single(ia->ri_id->device,
1792 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1797 rc = ib_dereg_mr(mr);
1799 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1804 * Wrappers for chunk registration, shared by read/write chunk code.
1808 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1810 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1811 seg->mr_dmalen = seg->mr_len;
1813 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1814 seg->mr_page, offset_in_page(seg->mr_offset),
1815 seg->mr_dmalen, seg->mr_dir);
1817 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1819 seg->mr_dmalen, seg->mr_dir);
1820 if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1821 dprintk("RPC: %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1823 (unsigned long long)seg->mr_dma,
1824 seg->mr_offset, seg->mr_dmalen);
1829 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1832 ib_dma_unmap_page(ia->ri_id->device,
1833 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1835 ib_dma_unmap_single(ia->ri_id->device,
1836 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1840 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1841 int *nsegs, int writing, struct rpcrdma_ia *ia,
1842 struct rpcrdma_xprt *r_xprt)
1844 struct rpcrdma_mr_seg *seg1 = seg;
1845 struct rpcrdma_mw *mw = seg1->rl_mw;
1846 struct rpcrdma_frmr *frmr = &mw->r.frmr;
1847 struct ib_mr *mr = frmr->fr_mr;
1848 struct ib_send_wr fastreg_wr, *bad_wr;
1856 pageoff = offset_in_page(seg1->mr_offset);
1857 seg1->mr_offset -= pageoff; /* start of page */
1858 seg1->mr_len += pageoff;
1860 if (*nsegs > ia->ri_max_frmr_depth)
1861 *nsegs = ia->ri_max_frmr_depth;
1862 for (page_no = i = 0; i < *nsegs;) {
1863 rpcrdma_map_one(ia, seg, writing);
1865 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1866 frmr->fr_pgl->page_list[page_no++] = pa;
1872 /* Check for holes */
1873 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1874 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1877 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1880 frmr->fr_state = FRMR_IS_VALID;
1882 memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1883 fastreg_wr.wr_id = (unsigned long)(void *)mw;
1884 fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1885 fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1886 fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1887 fastreg_wr.wr.fast_reg.page_list_len = page_no;
1888 fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1889 fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1890 if (fastreg_wr.wr.fast_reg.length < len) {
1896 key = (u8)(mr->rkey & 0x000000FF);
1897 ib_update_fast_reg_key(mr, ++key);
1899 fastreg_wr.wr.fast_reg.access_flags = (writing ?
1900 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1901 IB_ACCESS_REMOTE_READ);
1902 fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1903 DECR_CQCOUNT(&r_xprt->rx_ep);
1905 rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1907 dprintk("RPC: %s: failed ib_post_send for register,"
1908 " status %i\n", __func__, rc);
1909 ib_update_fast_reg_key(mr, --key);
1912 seg1->mr_rkey = mr->rkey;
1913 seg1->mr_base = seg1->mr_dma + pageoff;
1920 frmr->fr_state = FRMR_IS_INVALID;
1922 rpcrdma_unmap_one(ia, --seg);
1927 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1928 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1930 struct rpcrdma_mr_seg *seg1 = seg;
1931 struct ib_send_wr invalidate_wr, *bad_wr;
1934 seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1936 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1937 invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
1938 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1939 invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
1940 DECR_CQCOUNT(&r_xprt->rx_ep);
1942 read_lock(&ia->ri_qplock);
1943 while (seg1->mr_nsegs--)
1944 rpcrdma_unmap_one(ia, seg++);
1945 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1946 read_unlock(&ia->ri_qplock);
1948 /* Force rpcrdma_buffer_get() to retry */
1949 seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
1950 dprintk("RPC: %s: failed ib_post_send for invalidate,"
1951 " status %i\n", __func__, rc);
1957 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1958 int *nsegs, int writing, struct rpcrdma_ia *ia)
1960 struct rpcrdma_mr_seg *seg1 = seg;
1961 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1962 int len, pageoff, i, rc;
1964 pageoff = offset_in_page(seg1->mr_offset);
1965 seg1->mr_offset -= pageoff; /* start of page */
1966 seg1->mr_len += pageoff;
1968 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1969 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1970 for (i = 0; i < *nsegs;) {
1971 rpcrdma_map_one(ia, seg, writing);
1972 physaddrs[i] = seg->mr_dma;
1976 /* Check for holes */
1977 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1978 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1981 rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
1983 dprintk("RPC: %s: failed ib_map_phys_fmr "
1984 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1985 len, (unsigned long long)seg1->mr_dma,
1988 rpcrdma_unmap_one(ia, --seg);
1990 seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
1991 seg1->mr_base = seg1->mr_dma + pageoff;
2000 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
2001 struct rpcrdma_ia *ia)
2003 struct rpcrdma_mr_seg *seg1 = seg;
2007 list_add(&seg1->rl_mw->r.fmr->list, &l);
2008 rc = ib_unmap_fmr(&l);
2009 read_lock(&ia->ri_qplock);
2010 while (seg1->mr_nsegs--)
2011 rpcrdma_unmap_one(ia, seg++);
2012 read_unlock(&ia->ri_qplock);
2014 dprintk("RPC: %s: failed ib_unmap_fmr,"
2015 " status %i\n", __func__, rc);
2020 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
2021 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
2023 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2026 switch (ia->ri_memreg_strategy) {
2028 case RPCRDMA_ALLPHYSICAL:
2029 rpcrdma_map_one(ia, seg, writing);
2030 seg->mr_rkey = ia->ri_bind_mem->rkey;
2031 seg->mr_base = seg->mr_dma;
2036 /* Registration using frmr registration */
2038 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
2041 /* Registration using fmr memory registration */
2042 case RPCRDMA_MTHCAFMR:
2043 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
2056 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
2057 struct rpcrdma_xprt *r_xprt)
2059 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2060 int nsegs = seg->mr_nsegs, rc;
2062 switch (ia->ri_memreg_strategy) {
2064 case RPCRDMA_ALLPHYSICAL:
2065 read_lock(&ia->ri_qplock);
2066 rpcrdma_unmap_one(ia, seg);
2067 read_unlock(&ia->ri_qplock);
2071 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
2074 case RPCRDMA_MTHCAFMR:
2075 rc = rpcrdma_deregister_fmr_external(seg, ia);
2085 * Prepost any receive buffer, then post send.
2087 * Receive buffer is donated to hardware, reclaimed upon recv completion.
2090 rpcrdma_ep_post(struct rpcrdma_ia *ia,
2091 struct rpcrdma_ep *ep,
2092 struct rpcrdma_req *req)
2094 struct ib_send_wr send_wr, *send_wr_fail;
2095 struct rpcrdma_rep *rep = req->rl_reply;
2099 rc = rpcrdma_ep_post_recv(ia, ep, rep);
2102 req->rl_reply = NULL;
2105 send_wr.next = NULL;
2106 send_wr.wr_id = 0ULL; /* no send cookie */
2107 send_wr.sg_list = req->rl_send_iov;
2108 send_wr.num_sge = req->rl_niovs;
2109 send_wr.opcode = IB_WR_SEND;
2110 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
2111 ib_dma_sync_single_for_device(ia->ri_id->device,
2112 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
2114 ib_dma_sync_single_for_device(ia->ri_id->device,
2115 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
2117 ib_dma_sync_single_for_device(ia->ri_id->device,
2118 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
2121 if (DECR_CQCOUNT(ep) > 0)
2122 send_wr.send_flags = 0;
2123 else { /* Provider must take a send completion every now and then */
2125 send_wr.send_flags = IB_SEND_SIGNALED;
2128 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2130 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
2137 * (Re)post a receive buffer.
2140 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2141 struct rpcrdma_ep *ep,
2142 struct rpcrdma_rep *rep)
2144 struct ib_recv_wr recv_wr, *recv_wr_fail;
2147 recv_wr.next = NULL;
2148 recv_wr.wr_id = (u64) (unsigned long) rep;
2149 recv_wr.sg_list = &rep->rr_iov;
2150 recv_wr.num_sge = 1;
2152 ib_dma_sync_single_for_cpu(ia->ri_id->device,
2153 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
2155 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2158 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
2163 /* Physical mapping means one Read/Write list entry per-page.
2164 * All list entries must fit within an inline buffer
2166 * NB: The server must return a Write list for NFS READ,
2167 * which has the same constraint. Factor in the inline
2171 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2173 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2174 unsigned int inline_size, pages;
2176 inline_size = min_t(unsigned int,
2177 cdata->inline_wsize, cdata->inline_rsize);
2178 inline_size -= RPCRDMA_HDRLEN_MIN;
2179 pages = inline_size / sizeof(struct rpcrdma_segment);
2180 return pages << PAGE_SHIFT;
2184 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2186 return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2190 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2194 switch (r_xprt->rx_ia.ri_memreg_strategy) {
2195 case RPCRDMA_ALLPHYSICAL:
2196 result = rpcrdma_physical_max_payload(r_xprt);
2199 result = rpcrdma_mr_max_payload(r_xprt);