xprtrdma: Use smaller buffers for RPC-over-RDMA headers
[linux-2.6-block.git] / net / sunrpc / xprtrdma / backchannel.c
1 /*
2  * Copyright (c) 2015 Oracle.  All rights reserved.
3  *
4  * Support for backward direction RPCs on RPC/RDMA.
5  */
6
7 #include <linux/module.h>
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
10 #include <linux/sunrpc/svc_xprt.h>
11
12 #include "xprt_rdma.h"
13
14 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
15 # define RPCDBG_FACILITY        RPCDBG_TRANS
16 #endif
17
18 #undef RPCRDMA_BACKCHANNEL_DEBUG
19
20 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
21                                  struct rpc_rqst *rqst)
22 {
23         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
24         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
25
26         spin_lock(&buf->rb_reqslock);
27         list_del(&req->rl_all);
28         spin_unlock(&buf->rb_reqslock);
29
30         rpcrdma_destroy_req(&r_xprt->rx_ia, req);
31
32         kfree(rqst);
33 }
34
35 static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
36                                  struct rpc_rqst *rqst)
37 {
38         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
39         struct rpcrdma_regbuf *rb;
40         struct rpcrdma_req *req;
41         size_t size;
42
43         req = rpcrdma_create_req(r_xprt);
44         if (IS_ERR(req))
45                 return PTR_ERR(req);
46         req->rl_backchannel = true;
47
48         rb = rpcrdma_alloc_regbuf(ia, RPCRDMA_HDRBUF_SIZE, GFP_KERNEL);
49         if (IS_ERR(rb))
50                 goto out_fail;
51         req->rl_rdmabuf = rb;
52
53         size = r_xprt->rx_data.inline_rsize;
54         rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
55         if (IS_ERR(rb))
56                 goto out_fail;
57         req->rl_sendbuf = rb;
58         xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size);
59         rpcrdma_set_xprtdata(rqst, req);
60         return 0;
61
62 out_fail:
63         rpcrdma_bc_free_rqst(r_xprt, rqst);
64         return -ENOMEM;
65 }
66
67 /* Allocate and add receive buffers to the rpcrdma_buffer's
68  * existing list of rep's. These are released when the
69  * transport is destroyed.
70  */
71 static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
72                                  unsigned int count)
73 {
74         struct rpcrdma_rep *rep;
75         int rc = 0;
76
77         while (count--) {
78                 rep = rpcrdma_create_rep(r_xprt);
79                 if (IS_ERR(rep)) {
80                         pr_err("RPC:       %s: reply buffer alloc failed\n",
81                                __func__);
82                         rc = PTR_ERR(rep);
83                         break;
84                 }
85
86                 rpcrdma_recv_buffer_put(rep);
87         }
88
89         return rc;
90 }
91
92 /**
93  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
94  * @xprt: transport associated with these backchannel resources
95  * @reqs: number of concurrent incoming requests to expect
96  *
97  * Returns 0 on success; otherwise a negative errno
98  */
99 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
100 {
101         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
102         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
103         struct rpc_rqst *rqst;
104         unsigned int i;
105         int rc;
106
107         /* The backchannel reply path returns each rpc_rqst to the
108          * bc_pa_list _after_ the reply is sent. If the server is
109          * faster than the client, it can send another backward
110          * direction request before the rpc_rqst is returned to the
111          * list. The client rejects the request in this case.
112          *
113          * Twice as many rpc_rqsts are prepared to ensure there is
114          * always an rpc_rqst available as soon as a reply is sent.
115          */
116         if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
117                 goto out_err;
118
119         for (i = 0; i < (reqs << 1); i++) {
120                 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
121                 if (!rqst) {
122                         pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
123                                __func__);
124                         goto out_free;
125                 }
126                 dprintk("RPC:       %s: new rqst %p\n", __func__, rqst);
127
128                 rqst->rq_xprt = &r_xprt->rx_xprt;
129                 INIT_LIST_HEAD(&rqst->rq_list);
130                 INIT_LIST_HEAD(&rqst->rq_bc_list);
131
132                 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
133                         goto out_free;
134
135                 spin_lock_bh(&xprt->bc_pa_lock);
136                 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
137                 spin_unlock_bh(&xprt->bc_pa_lock);
138         }
139
140         rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
141         if (rc)
142                 goto out_free;
143
144         rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
145         if (rc)
146                 goto out_free;
147
148         buffer->rb_bc_srv_max_requests = reqs;
149         request_module("svcrdma");
150
151         return 0;
152
153 out_free:
154         xprt_rdma_bc_destroy(xprt, reqs);
155
156 out_err:
157         pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
158         return -ENOMEM;
159 }
160
161 /**
162  * xprt_rdma_bc_up - Create transport endpoint for backchannel service
163  * @serv: server endpoint
164  * @net: network namespace
165  *
166  * The "xprt" is an implied argument: it supplies the name of the
167  * backchannel transport class.
168  *
169  * Returns zero on success, negative errno on failure
170  */
171 int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
172 {
173         int ret;
174
175         ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
176         if (ret < 0)
177                 return ret;
178         return 0;
179 }
180
181 /**
182  * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
183  * @xprt: transport
184  *
185  * Returns maximum size, in bytes, of a backchannel message
186  */
187 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
188 {
189         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
190         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
191         size_t maxmsg;
192
193         maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
194         return maxmsg - RPCRDMA_HDRLEN_MIN;
195 }
196
197 /**
198  * rpcrdma_bc_marshal_reply - Send backwards direction reply
199  * @rqst: buffer containing RPC reply data
200  *
201  * Returns zero on success.
202  */
203 int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
204 {
205         struct rpc_xprt *xprt = rqst->rq_xprt;
206         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
207         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
208         struct rpcrdma_msg *headerp;
209         size_t rpclen;
210
211         headerp = rdmab_to_msg(req->rl_rdmabuf);
212         headerp->rm_xid = rqst->rq_xid;
213         headerp->rm_vers = rpcrdma_version;
214         headerp->rm_credit =
215                         cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
216         headerp->rm_type = rdma_msg;
217         headerp->rm_body.rm_chunks[0] = xdr_zero;
218         headerp->rm_body.rm_chunks[1] = xdr_zero;
219         headerp->rm_body.rm_chunks[2] = xdr_zero;
220
221         rpclen = rqst->rq_svec[0].iov_len;
222
223 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
224         pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
225                 __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
226         pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
227                 __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
228         pr_info("RPC:       %s:      RPC: %*ph\n",
229                 __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
230 #endif
231
232         req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
233         req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
234         req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
235
236         req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
237         req->rl_send_iov[1].length = rpclen;
238         req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
239
240         req->rl_niovs = 2;
241         return 0;
242 }
243
244 /**
245  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
246  * @xprt: transport associated with these backchannel resources
247  * @reqs: number of incoming requests to destroy; ignored
248  */
249 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
250 {
251         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
252         struct rpc_rqst *rqst, *tmp;
253
254         spin_lock_bh(&xprt->bc_pa_lock);
255         list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
256                 list_del(&rqst->rq_bc_pa_list);
257                 spin_unlock_bh(&xprt->bc_pa_lock);
258
259                 rpcrdma_bc_free_rqst(r_xprt, rqst);
260
261                 spin_lock_bh(&xprt->bc_pa_lock);
262         }
263         spin_unlock_bh(&xprt->bc_pa_lock);
264 }
265
266 /**
267  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
268  * @rqst: request to release
269  */
270 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
271 {
272         struct rpc_xprt *xprt = rqst->rq_xprt;
273
274         dprintk("RPC:       %s: freeing rqst %p (req %p)\n",
275                 __func__, rqst, rpcr_to_rdmar(rqst));
276
277         smp_mb__before_atomic();
278         WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
279         clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
280         smp_mb__after_atomic();
281
282         spin_lock_bh(&xprt->bc_pa_lock);
283         list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
284         spin_unlock_bh(&xprt->bc_pa_lock);
285 }
286
287 /**
288  * rpcrdma_bc_receive_call - Handle a backward direction call
289  * @xprt: transport receiving the call
290  * @rep: receive buffer containing the call
291  *
292  * Called in the RPC reply handler, which runs in a tasklet.
293  * Be quick about it.
294  *
295  * Operational assumptions:
296  *    o Backchannel credits are ignored, just as the NFS server
297  *      forechannel currently does
298  *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
299  *      No replay detection is done at the transport level
300  */
301 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
302                              struct rpcrdma_rep *rep)
303 {
304         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
305         struct rpcrdma_msg *headerp;
306         struct svc_serv *bc_serv;
307         struct rpcrdma_req *req;
308         struct rpc_rqst *rqst;
309         struct xdr_buf *buf;
310         size_t size;
311         __be32 *p;
312
313         headerp = rdmab_to_msg(rep->rr_rdmabuf);
314 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
315         pr_info("RPC:       %s: callback XID %08x, length=%u\n",
316                 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
317         pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
318 #endif
319
320         /* Sanity check:
321          * Need at least enough bytes for RPC/RDMA header, as code
322          * here references the header fields by array offset. Also,
323          * backward calls are always inline, so ensure there
324          * are some bytes beyond the RPC/RDMA header.
325          */
326         if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
327                 goto out_short;
328         p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
329         size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
330
331         /* Grab a free bc rqst */
332         spin_lock(&xprt->bc_pa_lock);
333         if (list_empty(&xprt->bc_pa_list)) {
334                 spin_unlock(&xprt->bc_pa_lock);
335                 goto out_overflow;
336         }
337         rqst = list_first_entry(&xprt->bc_pa_list,
338                                 struct rpc_rqst, rq_bc_pa_list);
339         list_del(&rqst->rq_bc_pa_list);
340         spin_unlock(&xprt->bc_pa_lock);
341         dprintk("RPC:       %s: using rqst %p\n", __func__, rqst);
342
343         /* Prepare rqst */
344         rqst->rq_reply_bytes_recvd = 0;
345         rqst->rq_bytes_sent = 0;
346         rqst->rq_xid = headerp->rm_xid;
347
348         rqst->rq_private_buf.len = size;
349         set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
350
351         buf = &rqst->rq_rcv_buf;
352         memset(buf, 0, sizeof(*buf));
353         buf->head[0].iov_base = p;
354         buf->head[0].iov_len = size;
355         buf->len = size;
356
357         /* The receive buffer has to be hooked to the rpcrdma_req
358          * so that it can be reposted after the server is done
359          * parsing it but just before sending the backward
360          * direction reply.
361          */
362         req = rpcr_to_rdmar(rqst);
363         dprintk("RPC:       %s: attaching rep %p to req %p\n",
364                 __func__, rep, req);
365         req->rl_reply = rep;
366
367         /* Defeat the retransmit detection logic in send_request */
368         req->rl_connect_cookie = 0;
369
370         /* Queue rqst for ULP's callback service */
371         bc_serv = xprt->bc_serv;
372         spin_lock(&bc_serv->sv_cb_lock);
373         list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
374         spin_unlock(&bc_serv->sv_cb_lock);
375
376         wake_up(&bc_serv->sv_cb_waitq);
377
378         r_xprt->rx_stats.bcall_count++;
379         return;
380
381 out_overflow:
382         pr_warn("RPC/RDMA backchannel overflow\n");
383         xprt_disconnect_done(xprt);
384         /* This receive buffer gets reposted automatically
385          * when the connection is re-established.
386          */
387         return;
388
389 out_short:
390         pr_warn("RPC/RDMA short backward direction call\n");
391
392         if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
393                 xprt_disconnect_done(xprt);
394         else
395                 pr_warn("RPC:       %s: reposting rep %p\n",
396                         __func__, rep);
397 }