Merge git://git.kernel.org/pub/scm/linux/kernel/git/pablo/nf
[linux-block.git] / net / sunrpc / xprtrdma / rpc_rdma.c
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2020, Oracle and/or its affiliates.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41
42 /*
43  * rpc_rdma.c
44  *
45  * This file contains the guts of the RPC RDMA protocol, and
46  * does marshaling/unmarshaling, etc. It is also where interfacing
47  * to the Linux RPC framework lives.
48  */
49
50 #include <linux/highmem.h>
51
52 #include <linux/sunrpc/svc_rdma.h>
53
54 #include "xprt_rdma.h"
55 #include <trace/events/rpcrdma.h>
56
57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
58 # define RPCDBG_FACILITY        RPCDBG_TRANS
59 #endif
60
61 /* Returns size of largest RPC-over-RDMA header in a Call message
62  *
63  * The largest Call header contains a full-size Read list and a
64  * minimal Reply chunk.
65  */
66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
67 {
68         unsigned int size;
69
70         /* Fixed header fields and list discriminators */
71         size = RPCRDMA_HDRLEN_MIN;
72
73         /* Maximum Read list size */
74         size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
75
76         /* Minimal Read chunk size */
77         size += sizeof(__be32); /* segment count */
78         size += rpcrdma_segment_maxsz * sizeof(__be32);
79         size += sizeof(__be32); /* list discriminator */
80
81         return size;
82 }
83
84 /* Returns size of largest RPC-over-RDMA header in a Reply message
85  *
86  * There is only one Write list or one Reply chunk per Reply
87  * message.  The larger list is the Write list.
88  */
89 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
90 {
91         unsigned int size;
92
93         /* Fixed header fields and list discriminators */
94         size = RPCRDMA_HDRLEN_MIN;
95
96         /* Maximum Write list size */
97         size += sizeof(__be32);         /* segment count */
98         size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
99         size += sizeof(__be32); /* list discriminator */
100
101         return size;
102 }
103
104 /**
105  * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
106  * @ep: endpoint to initialize
107  *
108  * The max_inline fields contain the maximum size of an RPC message
109  * so the marshaling code doesn't have to repeat this calculation
110  * for every RPC.
111  */
112 void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
113 {
114         unsigned int maxsegs = ep->re_max_rdma_segs;
115
116         ep->re_max_inline_send =
117                 ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs);
118         ep->re_max_inline_recv =
119                 ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
120 }
121
122 /* The client can send a request inline as long as the RPCRDMA header
123  * plus the RPC call fit under the transport's inline limit. If the
124  * combined call message size exceeds that limit, the client must use
125  * a Read chunk for this operation.
126  *
127  * A Read chunk is also required if sending the RPC call inline would
128  * exceed this device's max_sge limit.
129  */
130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
131                                 struct rpc_rqst *rqst)
132 {
133         struct xdr_buf *xdr = &rqst->rq_snd_buf;
134         struct rpcrdma_ep *ep = r_xprt->rx_ep;
135         unsigned int count, remaining, offset;
136
137         if (xdr->len > ep->re_max_inline_send)
138                 return false;
139
140         if (xdr->page_len) {
141                 remaining = xdr->page_len;
142                 offset = offset_in_page(xdr->page_base);
143                 count = RPCRDMA_MIN_SEND_SGES;
144                 while (remaining) {
145                         remaining -= min_t(unsigned int,
146                                            PAGE_SIZE - offset, remaining);
147                         offset = 0;
148                         if (++count > ep->re_attr.cap.max_send_sge)
149                                 return false;
150                 }
151         }
152
153         return true;
154 }
155
156 /* The client can't know how large the actual reply will be. Thus it
157  * plans for the largest possible reply for that particular ULP
158  * operation. If the maximum combined reply message size exceeds that
159  * limit, the client must provide a write list or a reply chunk for
160  * this request.
161  */
162 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
163                                    struct rpc_rqst *rqst)
164 {
165         return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
166 }
167
168 /* The client is required to provide a Reply chunk if the maximum
169  * size of the non-payload part of the RPC Reply is larger than
170  * the inline threshold.
171  */
172 static bool
173 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
174                           const struct rpc_rqst *rqst)
175 {
176         const struct xdr_buf *buf = &rqst->rq_rcv_buf;
177
178         return (buf->head[0].iov_len + buf->tail[0].iov_len) <
179                 r_xprt->rx_ep->re_max_inline_recv;
180 }
181
182 /* ACL likes to be lazy in allocating pages. For TCP, these
183  * pages can be allocated during receive processing. Not true
184  * for RDMA, which must always provision receive buffers
185  * up front.
186  */
187 static noinline int
188 rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
189 {
190         struct page **ppages;
191         int len;
192
193         len = buf->page_len;
194         ppages = buf->pages + (buf->page_base >> PAGE_SHIFT);
195         while (len > 0) {
196                 if (!*ppages)
197                         *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
198                 if (!*ppages)
199                         return -ENOBUFS;
200                 ppages++;
201                 len -= PAGE_SIZE;
202         }
203
204         return 0;
205 }
206
207 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
208  * a byte range. Other modes coalesce these SGEs into a single MR
209  * when they can.
210  *
211  * Returns pointer to next available SGE, and bumps the total number
212  * of SGEs consumed.
213  */
214 static struct rpcrdma_mr_seg *
215 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
216                      unsigned int *n)
217 {
218         u32 remaining, page_offset;
219         char *base;
220
221         base = vec->iov_base;
222         page_offset = offset_in_page(base);
223         remaining = vec->iov_len;
224         while (remaining) {
225                 seg->mr_page = NULL;
226                 seg->mr_offset = base;
227                 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
228                 remaining -= seg->mr_len;
229                 base += seg->mr_len;
230                 ++seg;
231                 ++(*n);
232                 page_offset = 0;
233         }
234         return seg;
235 }
236
237 /* Convert @xdrbuf into SGEs no larger than a page each. As they
238  * are registered, these SGEs are then coalesced into RDMA segments
239  * when the selected memreg mode supports it.
240  *
241  * Returns positive number of SGEs consumed, or a negative errno.
242  */
243
244 static int
245 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
246                      unsigned int pos, enum rpcrdma_chunktype type,
247                      struct rpcrdma_mr_seg *seg)
248 {
249         unsigned long page_base;
250         unsigned int len, n;
251         struct page **ppages;
252
253         n = 0;
254         if (pos == 0)
255                 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
256
257         len = xdrbuf->page_len;
258         ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
259         page_base = offset_in_page(xdrbuf->page_base);
260         while (len) {
261                 seg->mr_page = *ppages;
262                 seg->mr_offset = (char *)page_base;
263                 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
264                 len -= seg->mr_len;
265                 ++ppages;
266                 ++seg;
267                 ++n;
268                 page_base = 0;
269         }
270
271         /* When encoding a Read chunk, the tail iovec contains an
272          * XDR pad and may be omitted.
273          */
274         if (type == rpcrdma_readch && r_xprt->rx_ep->re_implicit_roundup)
275                 goto out;
276
277         /* When encoding a Write chunk, some servers need to see an
278          * extra segment for non-XDR-aligned Write chunks. The upper
279          * layer provides space in the tail iovec that may be used
280          * for this purpose.
281          */
282         if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup)
283                 goto out;
284
285         if (xdrbuf->tail[0].iov_len)
286                 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
287
288 out:
289         if (unlikely(n > RPCRDMA_MAX_SEGS))
290                 return -EIO;
291         return n;
292 }
293
294 static int
295 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
296 {
297         __be32 *p;
298
299         p = xdr_reserve_space(xdr, 4 * sizeof(*p));
300         if (unlikely(!p))
301                 return -EMSGSIZE;
302
303         xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset);
304         return 0;
305 }
306
307 static int
308 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
309                     u32 position)
310 {
311         __be32 *p;
312
313         p = xdr_reserve_space(xdr, 6 * sizeof(*p));
314         if (unlikely(!p))
315                 return -EMSGSIZE;
316
317         *p++ = xdr_one;                 /* Item present */
318         xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length,
319                                 mr->mr_offset);
320         return 0;
321 }
322
323 static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
324                                                  struct rpcrdma_req *req,
325                                                  struct rpcrdma_mr_seg *seg,
326                                                  int nsegs, bool writing,
327                                                  struct rpcrdma_mr **mr)
328 {
329         *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
330         if (!*mr) {
331                 *mr = rpcrdma_mr_get(r_xprt);
332                 if (!*mr)
333                         goto out_getmr_err;
334                 (*mr)->mr_req = req;
335         }
336
337         rpcrdma_mr_push(*mr, &req->rl_registered);
338         return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
339
340 out_getmr_err:
341         trace_xprtrdma_nomrs_err(r_xprt, req);
342         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
343         rpcrdma_mrs_refresh(r_xprt);
344         return ERR_PTR(-EAGAIN);
345 }
346
347 /* Register and XDR encode the Read list. Supports encoding a list of read
348  * segments that belong to a single read chunk.
349  *
350  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
351  *
352  *  Read chunklist (a linked list):
353  *   N elements, position P (same P for all chunks of same arg!):
354  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
355  *
356  * Returns zero on success, or a negative errno if a failure occurred.
357  * @xdr is advanced to the next position in the stream.
358  *
359  * Only a single @pos value is currently supported.
360  */
361 static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
362                                     struct rpcrdma_req *req,
363                                     struct rpc_rqst *rqst,
364                                     enum rpcrdma_chunktype rtype)
365 {
366         struct xdr_stream *xdr = &req->rl_stream;
367         struct rpcrdma_mr_seg *seg;
368         struct rpcrdma_mr *mr;
369         unsigned int pos;
370         int nsegs;
371
372         if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
373                 goto done;
374
375         pos = rqst->rq_snd_buf.head[0].iov_len;
376         if (rtype == rpcrdma_areadch)
377                 pos = 0;
378         seg = req->rl_segments;
379         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
380                                      rtype, seg);
381         if (nsegs < 0)
382                 return nsegs;
383
384         do {
385                 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
386                 if (IS_ERR(seg))
387                         return PTR_ERR(seg);
388
389                 if (encode_read_segment(xdr, mr, pos) < 0)
390                         return -EMSGSIZE;
391
392                 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
393                 r_xprt->rx_stats.read_chunk_count++;
394                 nsegs -= mr->mr_nents;
395         } while (nsegs);
396
397 done:
398         if (xdr_stream_encode_item_absent(xdr) < 0)
399                 return -EMSGSIZE;
400         return 0;
401 }
402
403 /* Register and XDR encode the Write list. Supports encoding a list
404  * containing one array of plain segments that belong to a single
405  * write chunk.
406  *
407  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
408  *
409  *  Write chunklist (a list of (one) counted array):
410  *   N elements:
411  *    1 - N - HLOO - HLOO - ... - HLOO - 0
412  *
413  * Returns zero on success, or a negative errno if a failure occurred.
414  * @xdr is advanced to the next position in the stream.
415  *
416  * Only a single Write chunk is currently supported.
417  */
418 static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
419                                      struct rpcrdma_req *req,
420                                      struct rpc_rqst *rqst,
421                                      enum rpcrdma_chunktype wtype)
422 {
423         struct xdr_stream *xdr = &req->rl_stream;
424         struct rpcrdma_mr_seg *seg;
425         struct rpcrdma_mr *mr;
426         int nsegs, nchunks;
427         __be32 *segcount;
428
429         if (wtype != rpcrdma_writech)
430                 goto done;
431
432         seg = req->rl_segments;
433         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
434                                      rqst->rq_rcv_buf.head[0].iov_len,
435                                      wtype, seg);
436         if (nsegs < 0)
437                 return nsegs;
438
439         if (xdr_stream_encode_item_present(xdr) < 0)
440                 return -EMSGSIZE;
441         segcount = xdr_reserve_space(xdr, sizeof(*segcount));
442         if (unlikely(!segcount))
443                 return -EMSGSIZE;
444         /* Actual value encoded below */
445
446         nchunks = 0;
447         do {
448                 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
449                 if (IS_ERR(seg))
450                         return PTR_ERR(seg);
451
452                 if (encode_rdma_segment(xdr, mr) < 0)
453                         return -EMSGSIZE;
454
455                 trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
456                 r_xprt->rx_stats.write_chunk_count++;
457                 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
458                 nchunks++;
459                 nsegs -= mr->mr_nents;
460         } while (nsegs);
461
462         /* Update count of segments in this Write chunk */
463         *segcount = cpu_to_be32(nchunks);
464
465 done:
466         if (xdr_stream_encode_item_absent(xdr) < 0)
467                 return -EMSGSIZE;
468         return 0;
469 }
470
471 /* Register and XDR encode the Reply chunk. Supports encoding an array
472  * of plain segments that belong to a single write (reply) chunk.
473  *
474  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
475  *
476  *  Reply chunk (a counted array):
477  *   N elements:
478  *    1 - N - HLOO - HLOO - ... - HLOO
479  *
480  * Returns zero on success, or a negative errno if a failure occurred.
481  * @xdr is advanced to the next position in the stream.
482  */
483 static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
484                                       struct rpcrdma_req *req,
485                                       struct rpc_rqst *rqst,
486                                       enum rpcrdma_chunktype wtype)
487 {
488         struct xdr_stream *xdr = &req->rl_stream;
489         struct rpcrdma_mr_seg *seg;
490         struct rpcrdma_mr *mr;
491         int nsegs, nchunks;
492         __be32 *segcount;
493
494         if (wtype != rpcrdma_replych) {
495                 if (xdr_stream_encode_item_absent(xdr) < 0)
496                         return -EMSGSIZE;
497                 return 0;
498         }
499
500         seg = req->rl_segments;
501         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
502         if (nsegs < 0)
503                 return nsegs;
504
505         if (xdr_stream_encode_item_present(xdr) < 0)
506                 return -EMSGSIZE;
507         segcount = xdr_reserve_space(xdr, sizeof(*segcount));
508         if (unlikely(!segcount))
509                 return -EMSGSIZE;
510         /* Actual value encoded below */
511
512         nchunks = 0;
513         do {
514                 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
515                 if (IS_ERR(seg))
516                         return PTR_ERR(seg);
517
518                 if (encode_rdma_segment(xdr, mr) < 0)
519                         return -EMSGSIZE;
520
521                 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
522                 r_xprt->rx_stats.reply_chunk_count++;
523                 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
524                 nchunks++;
525                 nsegs -= mr->mr_nents;
526         } while (nsegs);
527
528         /* Update count of segments in the Reply chunk */
529         *segcount = cpu_to_be32(nchunks);
530
531         return 0;
532 }
533
534 static void rpcrdma_sendctx_done(struct kref *kref)
535 {
536         struct rpcrdma_req *req =
537                 container_of(kref, struct rpcrdma_req, rl_kref);
538         struct rpcrdma_rep *rep = req->rl_reply;
539
540         rpcrdma_complete_rqst(rep);
541         rep->rr_rxprt->rx_stats.reply_waits_for_send++;
542 }
543
544 /**
545  * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
546  * @sc: sendctx containing SGEs to unmap
547  *
548  */
549 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
550 {
551         struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
552         struct ib_sge *sge;
553
554         if (!sc->sc_unmap_count)
555                 return;
556
557         /* The first two SGEs contain the transport header and
558          * the inline buffer. These are always left mapped so
559          * they can be cheaply re-used.
560          */
561         for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
562              ++sge, --sc->sc_unmap_count)
563                 ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
564                                   DMA_TO_DEVICE);
565
566         kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
567 }
568
569 /* Prepare an SGE for the RPC-over-RDMA transport header.
570  */
571 static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
572                                     struct rpcrdma_req *req, u32 len)
573 {
574         struct rpcrdma_sendctx *sc = req->rl_sendctx;
575         struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
576         struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
577
578         sge->addr = rdmab_addr(rb);
579         sge->length = len;
580         sge->lkey = rdmab_lkey(rb);
581
582         ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
583                                       DMA_TO_DEVICE);
584 }
585
586 /* The head iovec is straightforward, as it is usually already
587  * DMA-mapped. Sync the content that has changed.
588  */
589 static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt,
590                                      struct rpcrdma_req *req, unsigned int len)
591 {
592         struct rpcrdma_sendctx *sc = req->rl_sendctx;
593         struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
594         struct rpcrdma_regbuf *rb = req->rl_sendbuf;
595
596         if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
597                 return false;
598
599         sge->addr = rdmab_addr(rb);
600         sge->length = len;
601         sge->lkey = rdmab_lkey(rb);
602
603         ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
604                                       DMA_TO_DEVICE);
605         return true;
606 }
607
608 /* If there is a page list present, DMA map and prepare an
609  * SGE for each page to be sent.
610  */
611 static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req,
612                                      struct xdr_buf *xdr)
613 {
614         struct rpcrdma_sendctx *sc = req->rl_sendctx;
615         struct rpcrdma_regbuf *rb = req->rl_sendbuf;
616         unsigned int page_base, len, remaining;
617         struct page **ppages;
618         struct ib_sge *sge;
619
620         ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
621         page_base = offset_in_page(xdr->page_base);
622         remaining = xdr->page_len;
623         while (remaining) {
624                 sge = &sc->sc_sges[req->rl_wr.num_sge++];
625                 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
626                 sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages,
627                                             page_base, len, DMA_TO_DEVICE);
628                 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
629                         goto out_mapping_err;
630
631                 sge->length = len;
632                 sge->lkey = rdmab_lkey(rb);
633
634                 sc->sc_unmap_count++;
635                 ppages++;
636                 remaining -= len;
637                 page_base = 0;
638         }
639
640         return true;
641
642 out_mapping_err:
643         trace_xprtrdma_dma_maperr(sge->addr);
644         return false;
645 }
646
647 /* The tail iovec may include an XDR pad for the page list,
648  * as well as additional content, and may not reside in the
649  * same page as the head iovec.
650  */
651 static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
652                                      struct xdr_buf *xdr,
653                                      unsigned int page_base, unsigned int len)
654 {
655         struct rpcrdma_sendctx *sc = req->rl_sendctx;
656         struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
657         struct rpcrdma_regbuf *rb = req->rl_sendbuf;
658         struct page *page = virt_to_page(xdr->tail[0].iov_base);
659
660         sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len,
661                                     DMA_TO_DEVICE);
662         if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
663                 goto out_mapping_err;
664
665         sge->length = len;
666         sge->lkey = rdmab_lkey(rb);
667         ++sc->sc_unmap_count;
668         return true;
669
670 out_mapping_err:
671         trace_xprtrdma_dma_maperr(sge->addr);
672         return false;
673 }
674
675 /* Copy the tail to the end of the head buffer.
676  */
677 static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
678                                     struct rpcrdma_req *req,
679                                     struct xdr_buf *xdr)
680 {
681         unsigned char *dst;
682
683         dst = (unsigned char *)xdr->head[0].iov_base;
684         dst += xdr->head[0].iov_len + xdr->page_len;
685         memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
686         r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
687 }
688
689 /* Copy pagelist content into the head buffer.
690  */
691 static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
692                                     struct rpcrdma_req *req,
693                                     struct xdr_buf *xdr)
694 {
695         unsigned int len, page_base, remaining;
696         struct page **ppages;
697         unsigned char *src, *dst;
698
699         dst = (unsigned char *)xdr->head[0].iov_base;
700         dst += xdr->head[0].iov_len;
701         ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
702         page_base = offset_in_page(xdr->page_base);
703         remaining = xdr->page_len;
704         while (remaining) {
705                 src = page_address(*ppages);
706                 src += page_base;
707                 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
708                 memcpy(dst, src, len);
709                 r_xprt->rx_stats.pullup_copy_count += len;
710
711                 ppages++;
712                 dst += len;
713                 remaining -= len;
714                 page_base = 0;
715         }
716 }
717
718 /* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
719  * When the head, pagelist, and tail are small, a pull-up copy
720  * is considerably less costly than DMA mapping the components
721  * of @xdr.
722  *
723  * Assumptions:
724  *  - the caller has already verified that the total length
725  *    of the RPC Call body will fit into @rl_sendbuf.
726  */
727 static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
728                                         struct rpcrdma_req *req,
729                                         struct xdr_buf *xdr)
730 {
731         if (unlikely(xdr->tail[0].iov_len))
732                 rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
733
734         if (unlikely(xdr->page_len))
735                 rpcrdma_pullup_pagelist(r_xprt, req, xdr);
736
737         /* The whole RPC message resides in the head iovec now */
738         return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
739 }
740
741 static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
742                                         struct rpcrdma_req *req,
743                                         struct xdr_buf *xdr)
744 {
745         struct kvec *tail = &xdr->tail[0];
746
747         if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
748                 return false;
749         if (xdr->page_len)
750                 if (!rpcrdma_prepare_pagelist(req, xdr))
751                         return false;
752         if (tail->iov_len)
753                 if (!rpcrdma_prepare_tail_iov(req, xdr,
754                                               offset_in_page(tail->iov_base),
755                                               tail->iov_len))
756                         return false;
757
758         if (req->rl_sendctx->sc_unmap_count)
759                 kref_get(&req->rl_kref);
760         return true;
761 }
762
763 static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
764                                    struct rpcrdma_req *req,
765                                    struct xdr_buf *xdr)
766 {
767         if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
768                 return false;
769
770         /* If there is a Read chunk, the page list is being handled
771          * via explicit RDMA, and thus is skipped here.
772          */
773
774         /* Do not include the tail if it is only an XDR pad */
775         if (xdr->tail[0].iov_len > 3) {
776                 unsigned int page_base, len;
777
778                 /* If the content in the page list is an odd length,
779                  * xdr_write_pages() adds a pad at the beginning of
780                  * the tail iovec. Force the tail's non-pad content to
781                  * land at the next XDR position in the Send message.
782                  */
783                 page_base = offset_in_page(xdr->tail[0].iov_base);
784                 len = xdr->tail[0].iov_len;
785                 page_base += len & 3;
786                 len -= len & 3;
787                 if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
788                         return false;
789                 kref_get(&req->rl_kref);
790         }
791
792         return true;
793 }
794
795 /**
796  * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
797  * @r_xprt: controlling transport
798  * @req: context of RPC Call being marshalled
799  * @hdrlen: size of transport header, in bytes
800  * @xdr: xdr_buf containing RPC Call
801  * @rtype: chunk type being encoded
802  *
803  * Returns 0 on success; otherwise a negative errno is returned.
804  */
805 inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
806                                      struct rpcrdma_req *req, u32 hdrlen,
807                                      struct xdr_buf *xdr,
808                                      enum rpcrdma_chunktype rtype)
809 {
810         int ret;
811
812         ret = -EAGAIN;
813         req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
814         if (!req->rl_sendctx)
815                 goto out_nosc;
816         req->rl_sendctx->sc_unmap_count = 0;
817         req->rl_sendctx->sc_req = req;
818         kref_init(&req->rl_kref);
819         req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
820         req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
821         req->rl_wr.num_sge = 0;
822         req->rl_wr.opcode = IB_WR_SEND;
823
824         rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
825
826         ret = -EIO;
827         switch (rtype) {
828         case rpcrdma_noch_pullup:
829                 if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
830                         goto out_unmap;
831                 break;
832         case rpcrdma_noch_mapped:
833                 if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
834                         goto out_unmap;
835                 break;
836         case rpcrdma_readch:
837                 if (!rpcrdma_prepare_readch(r_xprt, req, xdr))
838                         goto out_unmap;
839                 break;
840         case rpcrdma_areadch:
841                 break;
842         default:
843                 goto out_unmap;
844         }
845
846         return 0;
847
848 out_unmap:
849         rpcrdma_sendctx_unmap(req->rl_sendctx);
850 out_nosc:
851         trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
852         return ret;
853 }
854
855 /**
856  * rpcrdma_marshal_req - Marshal and send one RPC request
857  * @r_xprt: controlling transport
858  * @rqst: RPC request to be marshaled
859  *
860  * For the RPC in "rqst", this function:
861  *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
862  *  - Registers Read, Write, and Reply chunks
863  *  - Constructs the transport header
864  *  - Posts a Send WR to send the transport header and request
865  *
866  * Returns:
867  *      %0 if the RPC was sent successfully,
868  *      %-ENOTCONN if the connection was lost,
869  *      %-EAGAIN if the caller should call again with the same arguments,
870  *      %-ENOBUFS if the caller should call again after a delay,
871  *      %-EMSGSIZE if the transport header is too small,
872  *      %-EIO if a permanent problem occurred while marshaling.
873  */
874 int
875 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
876 {
877         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
878         struct xdr_stream *xdr = &req->rl_stream;
879         enum rpcrdma_chunktype rtype, wtype;
880         struct xdr_buf *buf = &rqst->rq_snd_buf;
881         bool ddp_allowed;
882         __be32 *p;
883         int ret;
884
885         if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) {
886                 ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf);
887                 if (ret)
888                         return ret;
889         }
890
891         rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
892         xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
893                         rqst);
894
895         /* Fixed header fields */
896         ret = -EMSGSIZE;
897         p = xdr_reserve_space(xdr, 4 * sizeof(*p));
898         if (!p)
899                 goto out_err;
900         *p++ = rqst->rq_xid;
901         *p++ = rpcrdma_version;
902         *p++ = r_xprt->rx_buf.rb_max_requests;
903
904         /* When the ULP employs a GSS flavor that guarantees integrity
905          * or privacy, direct data placement of individual data items
906          * is not allowed.
907          */
908         ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
909                                 &rqst->rq_cred->cr_auth->au_flags);
910
911         /*
912          * Chunks needed for results?
913          *
914          * o If the expected result is under the inline threshold, all ops
915          *   return as inline.
916          * o Large read ops return data as write chunk(s), header as
917          *   inline.
918          * o Large non-read ops return as a single reply chunk.
919          */
920         if (rpcrdma_results_inline(r_xprt, rqst))
921                 wtype = rpcrdma_noch;
922         else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
923                  rpcrdma_nonpayload_inline(r_xprt, rqst))
924                 wtype = rpcrdma_writech;
925         else
926                 wtype = rpcrdma_replych;
927
928         /*
929          * Chunks needed for arguments?
930          *
931          * o If the total request is under the inline threshold, all ops
932          *   are sent as inline.
933          * o Large write ops transmit data as read chunk(s), header as
934          *   inline.
935          * o Large non-write ops are sent with the entire message as a
936          *   single read chunk (protocol 0-position special case).
937          *
938          * This assumes that the upper layer does not present a request
939          * that both has a data payload, and whose non-data arguments
940          * by themselves are larger than the inline threshold.
941          */
942         if (rpcrdma_args_inline(r_xprt, rqst)) {
943                 *p++ = rdma_msg;
944                 rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
945                         rpcrdma_noch_pullup : rpcrdma_noch_mapped;
946         } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
947                 *p++ = rdma_msg;
948                 rtype = rpcrdma_readch;
949         } else {
950                 r_xprt->rx_stats.nomsg_call_count++;
951                 *p++ = rdma_nomsg;
952                 rtype = rpcrdma_areadch;
953         }
954
955         /* This implementation supports the following combinations
956          * of chunk lists in one RPC-over-RDMA Call message:
957          *
958          *   - Read list
959          *   - Write list
960          *   - Reply chunk
961          *   - Read list + Reply chunk
962          *
963          * It might not yet support the following combinations:
964          *
965          *   - Read list + Write list
966          *
967          * It does not support the following combinations:
968          *
969          *   - Write list + Reply chunk
970          *   - Read list + Write list + Reply chunk
971          *
972          * This implementation supports only a single chunk in each
973          * Read or Write list. Thus for example the client cannot
974          * send a Call message with a Position Zero Read chunk and a
975          * regular Read chunk at the same time.
976          */
977         ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
978         if (ret)
979                 goto out_err;
980         ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
981         if (ret)
982                 goto out_err;
983         ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
984         if (ret)
985                 goto out_err;
986
987         ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
988                                         buf, rtype);
989         if (ret)
990                 goto out_err;
991
992         trace_xprtrdma_marshal(req, rtype, wtype);
993         return 0;
994
995 out_err:
996         trace_xprtrdma_marshal_failed(rqst, ret);
997         r_xprt->rx_stats.failed_marshal_count++;
998         frwr_reset(req);
999         return ret;
1000 }
1001
1002 static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt,
1003                                          struct rpcrdma_buffer *buf,
1004                                          u32 grant)
1005 {
1006         buf->rb_credits = grant;
1007         xprt->cwnd = grant << RPC_CWNDSHIFT;
1008 }
1009
1010 static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant)
1011 {
1012         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1013
1014         spin_lock(&xprt->transport_lock);
1015         __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant);
1016         spin_unlock(&xprt->transport_lock);
1017 }
1018
1019 /**
1020  * rpcrdma_reset_cwnd - Reset the xprt's congestion window
1021  * @r_xprt: controlling transport instance
1022  *
1023  * Prepare @r_xprt for the next connection by reinitializing
1024  * its credit grant to one (see RFC 8166, Section 3.3.3).
1025  */
1026 void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
1027 {
1028         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1029
1030         spin_lock(&xprt->transport_lock);
1031         xprt->cong = 0;
1032         __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1);
1033         spin_unlock(&xprt->transport_lock);
1034 }
1035
1036 /**
1037  * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
1038  * @rqst: controlling RPC request
1039  * @srcp: points to RPC message payload in receive buffer
1040  * @copy_len: remaining length of receive buffer content
1041  * @pad: Write chunk pad bytes needed (zero for pure inline)
1042  *
1043  * The upper layer has set the maximum number of bytes it can
1044  * receive in each component of rq_rcv_buf. These values are set in
1045  * the head.iov_len, page_len, tail.iov_len, and buflen fields.
1046  *
1047  * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
1048  * many cases this function simply updates iov_base pointers in
1049  * rq_rcv_buf to point directly to the received reply data, to
1050  * avoid copying reply data.
1051  *
1052  * Returns the count of bytes which had to be memcopied.
1053  */
1054 static unsigned long
1055 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
1056 {
1057         unsigned long fixup_copy_count;
1058         int i, npages, curlen;
1059         char *destp;
1060         struct page **ppages;
1061         int page_base;
1062
1063         /* The head iovec is redirected to the RPC reply message
1064          * in the receive buffer, to avoid a memcopy.
1065          */
1066         rqst->rq_rcv_buf.head[0].iov_base = srcp;
1067         rqst->rq_private_buf.head[0].iov_base = srcp;
1068
1069         /* The contents of the receive buffer that follow
1070          * head.iov_len bytes are copied into the page list.
1071          */
1072         curlen = rqst->rq_rcv_buf.head[0].iov_len;
1073         if (curlen > copy_len)
1074                 curlen = copy_len;
1075         srcp += curlen;
1076         copy_len -= curlen;
1077
1078         ppages = rqst->rq_rcv_buf.pages +
1079                 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
1080         page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
1081         fixup_copy_count = 0;
1082         if (copy_len && rqst->rq_rcv_buf.page_len) {
1083                 int pagelist_len;
1084
1085                 pagelist_len = rqst->rq_rcv_buf.page_len;
1086                 if (pagelist_len > copy_len)
1087                         pagelist_len = copy_len;
1088                 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
1089                 for (i = 0; i < npages; i++) {
1090                         curlen = PAGE_SIZE - page_base;
1091                         if (curlen > pagelist_len)
1092                                 curlen = pagelist_len;
1093
1094                         destp = kmap_atomic(ppages[i]);
1095                         memcpy(destp + page_base, srcp, curlen);
1096                         flush_dcache_page(ppages[i]);
1097                         kunmap_atomic(destp);
1098                         srcp += curlen;
1099                         copy_len -= curlen;
1100                         fixup_copy_count += curlen;
1101                         pagelist_len -= curlen;
1102                         if (!pagelist_len)
1103                                 break;
1104                         page_base = 0;
1105                 }
1106
1107                 /* Implicit padding for the last segment in a Write
1108                  * chunk is inserted inline at the front of the tail
1109                  * iovec. The upper layer ignores the content of
1110                  * the pad. Simply ensure inline content in the tail
1111                  * that follows the Write chunk is properly aligned.
1112                  */
1113                 if (pad)
1114                         srcp -= pad;
1115         }
1116
1117         /* The tail iovec is redirected to the remaining data
1118          * in the receive buffer, to avoid a memcopy.
1119          */
1120         if (copy_len || pad) {
1121                 rqst->rq_rcv_buf.tail[0].iov_base = srcp;
1122                 rqst->rq_private_buf.tail[0].iov_base = srcp;
1123         }
1124
1125         if (fixup_copy_count)
1126                 trace_xprtrdma_fixup(rqst, fixup_copy_count);
1127         return fixup_copy_count;
1128 }
1129
1130 /* By convention, backchannel calls arrive via rdma_msg type
1131  * messages, and never populate the chunk lists. This makes
1132  * the RPC/RDMA header small and fixed in size, so it is
1133  * straightforward to check the RPC header's direction field.
1134  */
1135 static bool
1136 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1137 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1138 {
1139         struct xdr_stream *xdr = &rep->rr_stream;
1140         __be32 *p;
1141
1142         if (rep->rr_proc != rdma_msg)
1143                 return false;
1144
1145         /* Peek at stream contents without advancing. */
1146         p = xdr_inline_decode(xdr, 0);
1147
1148         /* Chunk lists */
1149         if (xdr_item_is_present(p++))
1150                 return false;
1151         if (xdr_item_is_present(p++))
1152                 return false;
1153         if (xdr_item_is_present(p++))
1154                 return false;
1155
1156         /* RPC header */
1157         if (*p++ != rep->rr_xid)
1158                 return false;
1159         if (*p != cpu_to_be32(RPC_CALL))
1160                 return false;
1161
1162         /* Now that we are sure this is a backchannel call,
1163          * advance to the RPC header.
1164          */
1165         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1166         if (unlikely(!p))
1167                 goto out_short;
1168
1169         rpcrdma_bc_receive_call(r_xprt, rep);
1170         return true;
1171
1172 out_short:
1173         pr_warn("RPC/RDMA short backward direction call\n");
1174         return true;
1175 }
1176 #else   /* CONFIG_SUNRPC_BACKCHANNEL */
1177 {
1178         return false;
1179 }
1180 #endif  /* CONFIG_SUNRPC_BACKCHANNEL */
1181
1182 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1183 {
1184         u32 handle;
1185         u64 offset;
1186         __be32 *p;
1187
1188         p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1189         if (unlikely(!p))
1190                 return -EIO;
1191
1192         xdr_decode_rdma_segment(p, &handle, length, &offset);
1193         trace_xprtrdma_decode_seg(handle, *length, offset);
1194         return 0;
1195 }
1196
1197 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1198 {
1199         u32 segcount, seglength;
1200         __be32 *p;
1201
1202         p = xdr_inline_decode(xdr, sizeof(*p));
1203         if (unlikely(!p))
1204                 return -EIO;
1205
1206         *length = 0;
1207         segcount = be32_to_cpup(p);
1208         while (segcount--) {
1209                 if (decode_rdma_segment(xdr, &seglength))
1210                         return -EIO;
1211                 *length += seglength;
1212         }
1213
1214         return 0;
1215 }
1216
1217 /* In RPC-over-RDMA Version One replies, a Read list is never
1218  * expected. This decoder is a stub that returns an error if
1219  * a Read list is present.
1220  */
1221 static int decode_read_list(struct xdr_stream *xdr)
1222 {
1223         __be32 *p;
1224
1225         p = xdr_inline_decode(xdr, sizeof(*p));
1226         if (unlikely(!p))
1227                 return -EIO;
1228         if (unlikely(xdr_item_is_present(p)))
1229                 return -EIO;
1230         return 0;
1231 }
1232
1233 /* Supports only one Write chunk in the Write list
1234  */
1235 static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1236 {
1237         u32 chunklen;
1238         bool first;
1239         __be32 *p;
1240
1241         *length = 0;
1242         first = true;
1243         do {
1244                 p = xdr_inline_decode(xdr, sizeof(*p));
1245                 if (unlikely(!p))
1246                         return -EIO;
1247                 if (xdr_item_is_absent(p))
1248                         break;
1249                 if (!first)
1250                         return -EIO;
1251
1252                 if (decode_write_chunk(xdr, &chunklen))
1253                         return -EIO;
1254                 *length += chunklen;
1255                 first = false;
1256         } while (true);
1257         return 0;
1258 }
1259
1260 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1261 {
1262         __be32 *p;
1263
1264         p = xdr_inline_decode(xdr, sizeof(*p));
1265         if (unlikely(!p))
1266                 return -EIO;
1267
1268         *length = 0;
1269         if (xdr_item_is_present(p))
1270                 if (decode_write_chunk(xdr, length))
1271                         return -EIO;
1272         return 0;
1273 }
1274
1275 static int
1276 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1277                    struct rpc_rqst *rqst)
1278 {
1279         struct xdr_stream *xdr = &rep->rr_stream;
1280         u32 writelist, replychunk, rpclen;
1281         char *base;
1282
1283         /* Decode the chunk lists */
1284         if (decode_read_list(xdr))
1285                 return -EIO;
1286         if (decode_write_list(xdr, &writelist))
1287                 return -EIO;
1288         if (decode_reply_chunk(xdr, &replychunk))
1289                 return -EIO;
1290
1291         /* RDMA_MSG sanity checks */
1292         if (unlikely(replychunk))
1293                 return -EIO;
1294
1295         /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1296         base = (char *)xdr_inline_decode(xdr, 0);
1297         rpclen = xdr_stream_remaining(xdr);
1298         r_xprt->rx_stats.fixup_copy_count +=
1299                 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1300
1301         r_xprt->rx_stats.total_rdma_reply += writelist;
1302         return rpclen + xdr_align_size(writelist);
1303 }
1304
1305 static noinline int
1306 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1307 {
1308         struct xdr_stream *xdr = &rep->rr_stream;
1309         u32 writelist, replychunk;
1310
1311         /* Decode the chunk lists */
1312         if (decode_read_list(xdr))
1313                 return -EIO;
1314         if (decode_write_list(xdr, &writelist))
1315                 return -EIO;
1316         if (decode_reply_chunk(xdr, &replychunk))
1317                 return -EIO;
1318
1319         /* RDMA_NOMSG sanity checks */
1320         if (unlikely(writelist))
1321                 return -EIO;
1322         if (unlikely(!replychunk))
1323                 return -EIO;
1324
1325         /* Reply chunk buffer already is the reply vector */
1326         r_xprt->rx_stats.total_rdma_reply += replychunk;
1327         return replychunk;
1328 }
1329
1330 static noinline int
1331 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1332                      struct rpc_rqst *rqst)
1333 {
1334         struct xdr_stream *xdr = &rep->rr_stream;
1335         __be32 *p;
1336
1337         p = xdr_inline_decode(xdr, sizeof(*p));
1338         if (unlikely(!p))
1339                 return -EIO;
1340
1341         switch (*p) {
1342         case err_vers:
1343                 p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1344                 if (!p)
1345                         break;
1346                 trace_xprtrdma_err_vers(rqst, p, p + 1);
1347                 break;
1348         case err_chunk:
1349                 trace_xprtrdma_err_chunk(rqst);
1350                 break;
1351         default:
1352                 trace_xprtrdma_err_unrecognized(rqst, p);
1353         }
1354
1355         return -EIO;
1356 }
1357
1358 /* Perform XID lookup, reconstruction of the RPC reply, and
1359  * RPC completion while holding the transport lock to ensure
1360  * the rep, rqst, and rq_task pointers remain stable.
1361  */
1362 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1363 {
1364         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1365         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1366         struct rpc_rqst *rqst = rep->rr_rqst;
1367         int status;
1368
1369         switch (rep->rr_proc) {
1370         case rdma_msg:
1371                 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1372                 break;
1373         case rdma_nomsg:
1374                 status = rpcrdma_decode_nomsg(r_xprt, rep);
1375                 break;
1376         case rdma_error:
1377                 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1378                 break;
1379         default:
1380                 status = -EIO;
1381         }
1382         if (status < 0)
1383                 goto out_badheader;
1384
1385 out:
1386         spin_lock(&xprt->queue_lock);
1387         xprt_complete_rqst(rqst->rq_task, status);
1388         xprt_unpin_rqst(rqst);
1389         spin_unlock(&xprt->queue_lock);
1390         return;
1391
1392 out_badheader:
1393         trace_xprtrdma_reply_hdr_err(rep);
1394         r_xprt->rx_stats.bad_reply_count++;
1395         rqst->rq_task->tk_status = status;
1396         status = 0;
1397         goto out;
1398 }
1399
1400 static void rpcrdma_reply_done(struct kref *kref)
1401 {
1402         struct rpcrdma_req *req =
1403                 container_of(kref, struct rpcrdma_req, rl_kref);
1404
1405         rpcrdma_complete_rqst(req->rl_reply);
1406 }
1407
1408 /**
1409  * rpcrdma_reply_handler - Process received RPC/RDMA messages
1410  * @rep: Incoming rpcrdma_rep object to process
1411  *
1412  * Errors must result in the RPC task either being awakened, or
1413  * allowed to timeout, to discover the errors at that time.
1414  */
1415 void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1416 {
1417         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1418         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1419         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1420         struct rpcrdma_req *req;
1421         struct rpc_rqst *rqst;
1422         u32 credits;
1423         __be32 *p;
1424
1425         /* Any data means we had a useful conversation, so
1426          * then we don't need to delay the next reconnect.
1427          */
1428         if (xprt->reestablish_timeout)
1429                 xprt->reestablish_timeout = 0;
1430
1431         /* Fixed transport header fields */
1432         xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1433                         rep->rr_hdrbuf.head[0].iov_base, NULL);
1434         p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1435         if (unlikely(!p))
1436                 goto out_shortreply;
1437         rep->rr_xid = *p++;
1438         rep->rr_vers = *p++;
1439         credits = be32_to_cpu(*p++);
1440         rep->rr_proc = *p++;
1441
1442         if (rep->rr_vers != rpcrdma_version)
1443                 goto out_badversion;
1444
1445         if (rpcrdma_is_bcall(r_xprt, rep))
1446                 return;
1447
1448         /* Match incoming rpcrdma_rep to an rpcrdma_req to
1449          * get context for handling any incoming chunks.
1450          */
1451         spin_lock(&xprt->queue_lock);
1452         rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1453         if (!rqst)
1454                 goto out_norqst;
1455         xprt_pin_rqst(rqst);
1456         spin_unlock(&xprt->queue_lock);
1457
1458         if (credits == 0)
1459                 credits = 1;    /* don't deadlock */
1460         else if (credits > r_xprt->rx_ep->re_max_requests)
1461                 credits = r_xprt->rx_ep->re_max_requests;
1462         if (buf->rb_credits != credits)
1463                 rpcrdma_update_cwnd(r_xprt, credits);
1464         rpcrdma_post_recvs(r_xprt, false);
1465
1466         req = rpcr_to_rdmar(rqst);
1467         if (unlikely(req->rl_reply))
1468                 rpcrdma_recv_buffer_put(req->rl_reply);
1469         req->rl_reply = rep;
1470         rep->rr_rqst = rqst;
1471
1472         trace_xprtrdma_reply(rqst->rq_task, rep, credits);
1473
1474         if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1475                 frwr_reminv(rep, &req->rl_registered);
1476         if (!list_empty(&req->rl_registered))
1477                 frwr_unmap_async(r_xprt, req);
1478                 /* LocalInv completion will complete the RPC */
1479         else
1480                 kref_put(&req->rl_kref, rpcrdma_reply_done);
1481         return;
1482
1483 out_badversion:
1484         trace_xprtrdma_reply_vers_err(rep);
1485         goto out;
1486
1487 out_norqst:
1488         spin_unlock(&xprt->queue_lock);
1489         trace_xprtrdma_reply_rqst_err(rep);
1490         goto out;
1491
1492 out_shortreply:
1493         trace_xprtrdma_reply_short_err(rep);
1494
1495 out:
1496         rpcrdma_recv_buffer_put(rep);
1497 }