SUNRPC: Use rpc_ntop() for constructing transport address strings
[linux-2.6-block.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/module.h>
24 #include <linux/capability.h>
25 #include <linux/pagemap.h>
26 #include <linux/errno.h>
27 #include <linux/socket.h>
28 #include <linux/in.h>
29 #include <linux/net.h>
30 #include <linux/mm.h>
31 #include <linux/udp.h>
32 #include <linux/tcp.h>
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/sched.h>
35 #include <linux/sunrpc/xprtsock.h>
36 #include <linux/file.h>
37 #ifdef CONFIG_NFS_V4_1
38 #include <linux/sunrpc/bc_xprt.h>
39 #endif
40
41 #include <net/sock.h>
42 #include <net/checksum.h>
43 #include <net/udp.h>
44 #include <net/tcp.h>
45
46 /*
47  * xprtsock tunables
48  */
49 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
50 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
51
52 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
53 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
54
55 #define XS_TCP_LINGER_TO        (15U * HZ)
56 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
57
58 /*
59  * We can register our own files under /proc/sys/sunrpc by
60  * calling register_sysctl_table() again.  The files in that
61  * directory become the union of all files registered there.
62  *
63  * We simply need to make sure that we don't collide with
64  * someone else's file names!
65  */
66
67 #ifdef RPC_DEBUG
68
69 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
70 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
71 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
72 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
73
74 static struct ctl_table_header *sunrpc_table_header;
75
76 /*
77  * FIXME: changing the UDP slot table size should also resize the UDP
78  *        socket buffers for existing UDP transports
79  */
80 static ctl_table xs_tunables_table[] = {
81         {
82                 .ctl_name       = CTL_SLOTTABLE_UDP,
83                 .procname       = "udp_slot_table_entries",
84                 .data           = &xprt_udp_slot_table_entries,
85                 .maxlen         = sizeof(unsigned int),
86                 .mode           = 0644,
87                 .proc_handler   = &proc_dointvec_minmax,
88                 .strategy       = &sysctl_intvec,
89                 .extra1         = &min_slot_table_size,
90                 .extra2         = &max_slot_table_size
91         },
92         {
93                 .ctl_name       = CTL_SLOTTABLE_TCP,
94                 .procname       = "tcp_slot_table_entries",
95                 .data           = &xprt_tcp_slot_table_entries,
96                 .maxlen         = sizeof(unsigned int),
97                 .mode           = 0644,
98                 .proc_handler   = &proc_dointvec_minmax,
99                 .strategy       = &sysctl_intvec,
100                 .extra1         = &min_slot_table_size,
101                 .extra2         = &max_slot_table_size
102         },
103         {
104                 .ctl_name       = CTL_MIN_RESVPORT,
105                 .procname       = "min_resvport",
106                 .data           = &xprt_min_resvport,
107                 .maxlen         = sizeof(unsigned int),
108                 .mode           = 0644,
109                 .proc_handler   = &proc_dointvec_minmax,
110                 .strategy       = &sysctl_intvec,
111                 .extra1         = &xprt_min_resvport_limit,
112                 .extra2         = &xprt_max_resvport_limit
113         },
114         {
115                 .ctl_name       = CTL_MAX_RESVPORT,
116                 .procname       = "max_resvport",
117                 .data           = &xprt_max_resvport,
118                 .maxlen         = sizeof(unsigned int),
119                 .mode           = 0644,
120                 .proc_handler   = &proc_dointvec_minmax,
121                 .strategy       = &sysctl_intvec,
122                 .extra1         = &xprt_min_resvport_limit,
123                 .extra2         = &xprt_max_resvport_limit
124         },
125         {
126                 .procname       = "tcp_fin_timeout",
127                 .data           = &xs_tcp_fin_timeout,
128                 .maxlen         = sizeof(xs_tcp_fin_timeout),
129                 .mode           = 0644,
130                 .proc_handler   = &proc_dointvec_jiffies,
131                 .strategy       = sysctl_jiffies
132         },
133         {
134                 .ctl_name = 0,
135         },
136 };
137
138 static ctl_table sunrpc_table[] = {
139         {
140                 .ctl_name       = CTL_SUNRPC,
141                 .procname       = "sunrpc",
142                 .mode           = 0555,
143                 .child          = xs_tunables_table
144         },
145         {
146                 .ctl_name = 0,
147         },
148 };
149
150 #endif
151
152 /*
153  * Time out for an RPC UDP socket connect.  UDP socket connects are
154  * synchronous, but we set a timeout anyway in case of resource
155  * exhaustion on the local host.
156  */
157 #define XS_UDP_CONN_TO          (5U * HZ)
158
159 /*
160  * Wait duration for an RPC TCP connection to be established.  Solaris
161  * NFS over TCP uses 60 seconds, for example, which is in line with how
162  * long a server takes to reboot.
163  */
164 #define XS_TCP_CONN_TO          (60U * HZ)
165
166 /*
167  * Wait duration for a reply from the RPC portmapper.
168  */
169 #define XS_BIND_TO              (60U * HZ)
170
171 /*
172  * Delay if a UDP socket connect error occurs.  This is most likely some
173  * kind of resource problem on the local host.
174  */
175 #define XS_UDP_REEST_TO         (2U * HZ)
176
177 /*
178  * The reestablish timeout allows clients to delay for a bit before attempting
179  * to reconnect to a server that just dropped our connection.
180  *
181  * We implement an exponential backoff when trying to reestablish a TCP
182  * transport connection with the server.  Some servers like to drop a TCP
183  * connection when they are overworked, so we start with a short timeout and
184  * increase over time if the server is down or not responding.
185  */
186 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
187 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
188
189 /*
190  * TCP idle timeout; client drops the transport socket if it is idle
191  * for this long.  Note that we also timeout UDP sockets to prevent
192  * holding port numbers when there is no RPC traffic.
193  */
194 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
195
196 #ifdef RPC_DEBUG
197 # undef  RPC_DEBUG_DATA
198 # define RPCDBG_FACILITY        RPCDBG_TRANS
199 #endif
200
201 #ifdef RPC_DEBUG_DATA
202 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
203 {
204         u8 *buf = (u8 *) packet;
205         int j;
206
207         dprintk("RPC:       %s\n", msg);
208         for (j = 0; j < count && j < 128; j += 4) {
209                 if (!(j & 31)) {
210                         if (j)
211                                 dprintk("\n");
212                         dprintk("0x%04x ", j);
213                 }
214                 dprintk("%02x%02x%02x%02x ",
215                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
216         }
217         dprintk("\n");
218 }
219 #else
220 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
221 {
222         /* NOP */
223 }
224 #endif
225
226 struct sock_xprt {
227         struct rpc_xprt         xprt;
228
229         /*
230          * Network layer
231          */
232         struct socket *         sock;
233         struct sock *           inet;
234
235         /*
236          * State of TCP reply receive
237          */
238         __be32                  tcp_fraghdr,
239                                 tcp_xid;
240
241         u32                     tcp_offset,
242                                 tcp_reclen;
243
244         unsigned long           tcp_copied,
245                                 tcp_flags;
246
247         /*
248          * Connection of transports
249          */
250         struct delayed_work     connect_worker;
251         struct sockaddr_storage addr;
252         unsigned short          port;
253
254         /*
255          * UDP socket buffer size parameters
256          */
257         size_t                  rcvsize,
258                                 sndsize;
259
260         /*
261          * Saved socket callback addresses
262          */
263         void                    (*old_data_ready)(struct sock *, int);
264         void                    (*old_state_change)(struct sock *);
265         void                    (*old_write_space)(struct sock *);
266         void                    (*old_error_report)(struct sock *);
267 };
268
269 /*
270  * TCP receive state flags
271  */
272 #define TCP_RCV_LAST_FRAG       (1UL << 0)
273 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
274 #define TCP_RCV_COPY_XID        (1UL << 2)
275 #define TCP_RCV_COPY_DATA       (1UL << 3)
276 #define TCP_RCV_READ_CALLDIR    (1UL << 4)
277 #define TCP_RCV_COPY_CALLDIR    (1UL << 5)
278
279 /*
280  * TCP RPC flags
281  */
282 #define TCP_RPC_REPLY           (1UL << 6)
283
284 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
285 {
286         return (struct sockaddr *) &xprt->addr;
287 }
288
289 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
290 {
291         return (struct sockaddr_in *) &xprt->addr;
292 }
293
294 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
295 {
296         return (struct sockaddr_in6 *) &xprt->addr;
297 }
298
299 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
300 {
301         struct sockaddr *sap = xs_addr(xprt);
302         char buf[128];
303
304         (void)rpc_ntop(sap, buf, sizeof(buf));
305         xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
306
307         (void)snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
308         xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
309
310         (void)snprintf(buf, sizeof(buf), "addr=%s port=%s proto=%s",
311                         xprt->address_strings[RPC_DISPLAY_ADDR],
312                         xprt->address_strings[RPC_DISPLAY_PORT],
313                         xprt->address_strings[RPC_DISPLAY_PROTO]);
314         xprt->address_strings[RPC_DISPLAY_ALL] = kstrdup(buf, GFP_KERNEL);
315
316         (void)snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
317         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
318 }
319
320 static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
321                                           const char *protocol,
322                                           const char *netid)
323 {
324         struct sockaddr_in *sin = xs_addr_in(xprt);
325         char buf[16];
326
327         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
328         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
329
330         (void)snprintf(buf, sizeof(buf), "%02x%02x%02x%02x",
331                                 NIPQUAD(sin->sin_addr.s_addr));
332         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
333
334         xs_format_common_peer_addresses(xprt);
335 }
336
337 static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
338                                           const char *protocol,
339                                           const char *netid)
340 {
341         struct sockaddr_in6 *sin6 = xs_addr_in6(xprt);
342         char buf[48];
343
344         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
345         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
346
347         (void)snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
348         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
349
350         xs_format_common_peer_addresses(xprt);
351 }
352
353 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
354 {
355         unsigned int i;
356
357         for (i = 0; i < RPC_DISPLAY_MAX; i++)
358                 switch (i) {
359                 case RPC_DISPLAY_PROTO:
360                 case RPC_DISPLAY_NETID:
361                         continue;
362                 default:
363                         kfree(xprt->address_strings[i]);
364                 }
365 }
366
367 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
368
369 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
370 {
371         struct msghdr msg = {
372                 .msg_name       = addr,
373                 .msg_namelen    = addrlen,
374                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
375         };
376         struct kvec iov = {
377                 .iov_base       = vec->iov_base + base,
378                 .iov_len        = vec->iov_len - base,
379         };
380
381         if (iov.iov_len != 0)
382                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
383         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
384 }
385
386 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
387 {
388         struct page **ppage;
389         unsigned int remainder;
390         int err, sent = 0;
391
392         remainder = xdr->page_len - base;
393         base += xdr->page_base;
394         ppage = xdr->pages + (base >> PAGE_SHIFT);
395         base &= ~PAGE_MASK;
396         for(;;) {
397                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
398                 int flags = XS_SENDMSG_FLAGS;
399
400                 remainder -= len;
401                 if (remainder != 0 || more)
402                         flags |= MSG_MORE;
403                 err = sock->ops->sendpage(sock, *ppage, base, len, flags);
404                 if (remainder == 0 || err != len)
405                         break;
406                 sent += err;
407                 ppage++;
408                 base = 0;
409         }
410         if (sent == 0)
411                 return err;
412         if (err > 0)
413                 sent += err;
414         return sent;
415 }
416
417 /**
418  * xs_sendpages - write pages directly to a socket
419  * @sock: socket to send on
420  * @addr: UDP only -- address of destination
421  * @addrlen: UDP only -- length of destination address
422  * @xdr: buffer containing this request
423  * @base: starting position in the buffer
424  *
425  */
426 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
427 {
428         unsigned int remainder = xdr->len - base;
429         int err, sent = 0;
430
431         if (unlikely(!sock))
432                 return -ENOTSOCK;
433
434         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
435         if (base != 0) {
436                 addr = NULL;
437                 addrlen = 0;
438         }
439
440         if (base < xdr->head[0].iov_len || addr != NULL) {
441                 unsigned int len = xdr->head[0].iov_len - base;
442                 remainder -= len;
443                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
444                 if (remainder == 0 || err != len)
445                         goto out;
446                 sent += err;
447                 base = 0;
448         } else
449                 base -= xdr->head[0].iov_len;
450
451         if (base < xdr->page_len) {
452                 unsigned int len = xdr->page_len - base;
453                 remainder -= len;
454                 err = xs_send_pagedata(sock, xdr, base, remainder != 0);
455                 if (remainder == 0 || err != len)
456                         goto out;
457                 sent += err;
458                 base = 0;
459         } else
460                 base -= xdr->page_len;
461
462         if (base >= xdr->tail[0].iov_len)
463                 return sent;
464         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
465 out:
466         if (sent == 0)
467                 return err;
468         if (err > 0)
469                 sent += err;
470         return sent;
471 }
472
473 static void xs_nospace_callback(struct rpc_task *task)
474 {
475         struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
476
477         transport->inet->sk_write_pending--;
478         clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
479 }
480
481 /**
482  * xs_nospace - place task on wait queue if transmit was incomplete
483  * @task: task to put to sleep
484  *
485  */
486 static int xs_nospace(struct rpc_task *task)
487 {
488         struct rpc_rqst *req = task->tk_rqstp;
489         struct rpc_xprt *xprt = req->rq_xprt;
490         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
491         int ret = 0;
492
493         dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
494                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
495                         req->rq_slen);
496
497         /* Protect against races with write_space */
498         spin_lock_bh(&xprt->transport_lock);
499
500         /* Don't race with disconnect */
501         if (xprt_connected(xprt)) {
502                 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
503                         ret = -EAGAIN;
504                         /*
505                          * Notify TCP that we're limited by the application
506                          * window size
507                          */
508                         set_bit(SOCK_NOSPACE, &transport->sock->flags);
509                         transport->inet->sk_write_pending++;
510                         /* ...and wait for more buffer space */
511                         xprt_wait_for_buffer_space(task, xs_nospace_callback);
512                 }
513         } else {
514                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
515                 ret = -ENOTCONN;
516         }
517
518         spin_unlock_bh(&xprt->transport_lock);
519         return ret;
520 }
521
522 /**
523  * xs_udp_send_request - write an RPC request to a UDP socket
524  * @task: address of RPC task that manages the state of an RPC request
525  *
526  * Return values:
527  *        0:    The request has been sent
528  *   EAGAIN:    The socket was blocked, please call again later to
529  *              complete the request
530  * ENOTCONN:    Caller needs to invoke connect logic then call again
531  *    other:    Some other error occured, the request was not sent
532  */
533 static int xs_udp_send_request(struct rpc_task *task)
534 {
535         struct rpc_rqst *req = task->tk_rqstp;
536         struct rpc_xprt *xprt = req->rq_xprt;
537         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
538         struct xdr_buf *xdr = &req->rq_snd_buf;
539         int status;
540
541         xs_pktdump("packet data:",
542                                 req->rq_svec->iov_base,
543                                 req->rq_svec->iov_len);
544
545         if (!xprt_bound(xprt))
546                 return -ENOTCONN;
547         status = xs_sendpages(transport->sock,
548                               xs_addr(xprt),
549                               xprt->addrlen, xdr,
550                               req->rq_bytes_sent);
551
552         dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
553                         xdr->len - req->rq_bytes_sent, status);
554
555         if (status >= 0) {
556                 task->tk_bytes_sent += status;
557                 if (status >= req->rq_slen)
558                         return 0;
559                 /* Still some bytes left; set up for a retry later. */
560                 status = -EAGAIN;
561         }
562         if (!transport->sock)
563                 goto out;
564
565         switch (status) {
566         case -ENOTSOCK:
567                 status = -ENOTCONN;
568                 /* Should we call xs_close() here? */
569                 break;
570         case -EAGAIN:
571                 status = xs_nospace(task);
572                 break;
573         default:
574                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
575                         -status);
576         case -ENETUNREACH:
577         case -EPIPE:
578         case -ECONNREFUSED:
579                 /* When the server has died, an ICMP port unreachable message
580                  * prompts ECONNREFUSED. */
581                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
582         }
583 out:
584         return status;
585 }
586
587 /**
588  * xs_tcp_shutdown - gracefully shut down a TCP socket
589  * @xprt: transport
590  *
591  * Initiates a graceful shutdown of the TCP socket by calling the
592  * equivalent of shutdown(SHUT_WR);
593  */
594 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
595 {
596         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
597         struct socket *sock = transport->sock;
598
599         if (sock != NULL)
600                 kernel_sock_shutdown(sock, SHUT_WR);
601 }
602
603 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
604 {
605         u32 reclen = buf->len - sizeof(rpc_fraghdr);
606         rpc_fraghdr *base = buf->head[0].iov_base;
607         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
608 }
609
610 /**
611  * xs_tcp_send_request - write an RPC request to a TCP socket
612  * @task: address of RPC task that manages the state of an RPC request
613  *
614  * Return values:
615  *        0:    The request has been sent
616  *   EAGAIN:    The socket was blocked, please call again later to
617  *              complete the request
618  * ENOTCONN:    Caller needs to invoke connect logic then call again
619  *    other:    Some other error occured, the request was not sent
620  *
621  * XXX: In the case of soft timeouts, should we eventually give up
622  *      if sendmsg is not able to make progress?
623  */
624 static int xs_tcp_send_request(struct rpc_task *task)
625 {
626         struct rpc_rqst *req = task->tk_rqstp;
627         struct rpc_xprt *xprt = req->rq_xprt;
628         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
629         struct xdr_buf *xdr = &req->rq_snd_buf;
630         int status;
631
632         xs_encode_tcp_record_marker(&req->rq_snd_buf);
633
634         xs_pktdump("packet data:",
635                                 req->rq_svec->iov_base,
636                                 req->rq_svec->iov_len);
637
638         /* Continue transmitting the packet/record. We must be careful
639          * to cope with writespace callbacks arriving _after_ we have
640          * called sendmsg(). */
641         while (1) {
642                 status = xs_sendpages(transport->sock,
643                                         NULL, 0, xdr, req->rq_bytes_sent);
644
645                 dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
646                                 xdr->len - req->rq_bytes_sent, status);
647
648                 if (unlikely(status < 0))
649                         break;
650
651                 /* If we've sent the entire packet, immediately
652                  * reset the count of bytes sent. */
653                 req->rq_bytes_sent += status;
654                 task->tk_bytes_sent += status;
655                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
656                         req->rq_bytes_sent = 0;
657                         return 0;
658                 }
659
660                 if (status != 0)
661                         continue;
662                 status = -EAGAIN;
663                 break;
664         }
665         if (!transport->sock)
666                 goto out;
667
668         switch (status) {
669         case -ENOTSOCK:
670                 status = -ENOTCONN;
671                 /* Should we call xs_close() here? */
672                 break;
673         case -EAGAIN:
674                 status = xs_nospace(task);
675                 break;
676         default:
677                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
678                         -status);
679         case -ECONNRESET:
680         case -EPIPE:
681                 xs_tcp_shutdown(xprt);
682         case -ECONNREFUSED:
683         case -ENOTCONN:
684                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
685         }
686 out:
687         return status;
688 }
689
690 /**
691  * xs_tcp_release_xprt - clean up after a tcp transmission
692  * @xprt: transport
693  * @task: rpc task
694  *
695  * This cleans up if an error causes us to abort the transmission of a request.
696  * In this case, the socket may need to be reset in order to avoid confusing
697  * the server.
698  */
699 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
700 {
701         struct rpc_rqst *req;
702
703         if (task != xprt->snd_task)
704                 return;
705         if (task == NULL)
706                 goto out_release;
707         req = task->tk_rqstp;
708         if (req->rq_bytes_sent == 0)
709                 goto out_release;
710         if (req->rq_bytes_sent == req->rq_snd_buf.len)
711                 goto out_release;
712         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
713 out_release:
714         xprt_release_xprt(xprt, task);
715 }
716
717 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
718 {
719         transport->old_data_ready = sk->sk_data_ready;
720         transport->old_state_change = sk->sk_state_change;
721         transport->old_write_space = sk->sk_write_space;
722         transport->old_error_report = sk->sk_error_report;
723 }
724
725 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
726 {
727         sk->sk_data_ready = transport->old_data_ready;
728         sk->sk_state_change = transport->old_state_change;
729         sk->sk_write_space = transport->old_write_space;
730         sk->sk_error_report = transport->old_error_report;
731 }
732
733 static void xs_reset_transport(struct sock_xprt *transport)
734 {
735         struct socket *sock = transport->sock;
736         struct sock *sk = transport->inet;
737
738         if (sk == NULL)
739                 return;
740
741         write_lock_bh(&sk->sk_callback_lock);
742         transport->inet = NULL;
743         transport->sock = NULL;
744
745         sk->sk_user_data = NULL;
746
747         xs_restore_old_callbacks(transport, sk);
748         write_unlock_bh(&sk->sk_callback_lock);
749
750         sk->sk_no_check = 0;
751
752         sock_release(sock);
753 }
754
755 /**
756  * xs_close - close a socket
757  * @xprt: transport
758  *
759  * This is used when all requests are complete; ie, no DRC state remains
760  * on the server we want to save.
761  *
762  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
763  * xs_reset_transport() zeroing the socket from underneath a writer.
764  */
765 static void xs_close(struct rpc_xprt *xprt)
766 {
767         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
768
769         dprintk("RPC:       xs_close xprt %p\n", xprt);
770
771         xs_reset_transport(transport);
772
773         smp_mb__before_clear_bit();
774         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
775         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
776         clear_bit(XPRT_CLOSING, &xprt->state);
777         smp_mb__after_clear_bit();
778         xprt_disconnect_done(xprt);
779 }
780
781 static void xs_tcp_close(struct rpc_xprt *xprt)
782 {
783         if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
784                 xs_close(xprt);
785         else
786                 xs_tcp_shutdown(xprt);
787 }
788
789 /**
790  * xs_destroy - prepare to shutdown a transport
791  * @xprt: doomed transport
792  *
793  */
794 static void xs_destroy(struct rpc_xprt *xprt)
795 {
796         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
797
798         dprintk("RPC:       xs_destroy xprt %p\n", xprt);
799
800         cancel_rearming_delayed_work(&transport->connect_worker);
801
802         xs_close(xprt);
803         xs_free_peer_addresses(xprt);
804         kfree(xprt->slot);
805         kfree(xprt);
806         module_put(THIS_MODULE);
807 }
808
809 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
810 {
811         return (struct rpc_xprt *) sk->sk_user_data;
812 }
813
814 /**
815  * xs_udp_data_ready - "data ready" callback for UDP sockets
816  * @sk: socket with data to read
817  * @len: how much data to read
818  *
819  */
820 static void xs_udp_data_ready(struct sock *sk, int len)
821 {
822         struct rpc_task *task;
823         struct rpc_xprt *xprt;
824         struct rpc_rqst *rovr;
825         struct sk_buff *skb;
826         int err, repsize, copied;
827         u32 _xid;
828         __be32 *xp;
829
830         read_lock(&sk->sk_callback_lock);
831         dprintk("RPC:       xs_udp_data_ready...\n");
832         if (!(xprt = xprt_from_sock(sk)))
833                 goto out;
834
835         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
836                 goto out;
837
838         if (xprt->shutdown)
839                 goto dropit;
840
841         repsize = skb->len - sizeof(struct udphdr);
842         if (repsize < 4) {
843                 dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
844                 goto dropit;
845         }
846
847         /* Copy the XID from the skb... */
848         xp = skb_header_pointer(skb, sizeof(struct udphdr),
849                                 sizeof(_xid), &_xid);
850         if (xp == NULL)
851                 goto dropit;
852
853         /* Look up and lock the request corresponding to the given XID */
854         spin_lock(&xprt->transport_lock);
855         rovr = xprt_lookup_rqst(xprt, *xp);
856         if (!rovr)
857                 goto out_unlock;
858         task = rovr->rq_task;
859
860         if ((copied = rovr->rq_private_buf.buflen) > repsize)
861                 copied = repsize;
862
863         /* Suck it into the iovec, verify checksum if not done by hw. */
864         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
865                 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
866                 goto out_unlock;
867         }
868
869         UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
870
871         /* Something worked... */
872         dst_confirm(skb_dst(skb));
873
874         xprt_adjust_cwnd(task, copied);
875         xprt_update_rtt(task);
876         xprt_complete_rqst(task, copied);
877
878  out_unlock:
879         spin_unlock(&xprt->transport_lock);
880  dropit:
881         skb_free_datagram(sk, skb);
882  out:
883         read_unlock(&sk->sk_callback_lock);
884 }
885
886 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
887 {
888         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
889         size_t len, used;
890         char *p;
891
892         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
893         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
894         used = xdr_skb_read_bits(desc, p, len);
895         transport->tcp_offset += used;
896         if (used != len)
897                 return;
898
899         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
900         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
901                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
902         else
903                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
904         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
905
906         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
907         transport->tcp_offset = 0;
908
909         /* Sanity check of the record length */
910         if (unlikely(transport->tcp_reclen < 8)) {
911                 dprintk("RPC:       invalid TCP record fragment length\n");
912                 xprt_force_disconnect(xprt);
913                 return;
914         }
915         dprintk("RPC:       reading TCP record fragment of length %d\n",
916                         transport->tcp_reclen);
917 }
918
919 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
920 {
921         if (transport->tcp_offset == transport->tcp_reclen) {
922                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
923                 transport->tcp_offset = 0;
924                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
925                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
926                         transport->tcp_flags |= TCP_RCV_COPY_XID;
927                         transport->tcp_copied = 0;
928                 }
929         }
930 }
931
932 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
933 {
934         size_t len, used;
935         char *p;
936
937         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
938         dprintk("RPC:       reading XID (%Zu bytes)\n", len);
939         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
940         used = xdr_skb_read_bits(desc, p, len);
941         transport->tcp_offset += used;
942         if (used != len)
943                 return;
944         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
945         transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
946         transport->tcp_copied = 4;
947         dprintk("RPC:       reading %s XID %08x\n",
948                         (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
949                                                               : "request with",
950                         ntohl(transport->tcp_xid));
951         xs_tcp_check_fraghdr(transport);
952 }
953
954 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
955                                        struct xdr_skb_reader *desc)
956 {
957         size_t len, used;
958         u32 offset;
959         __be32  calldir;
960
961         /*
962          * We want transport->tcp_offset to be 8 at the end of this routine
963          * (4 bytes for the xid and 4 bytes for the call/reply flag).
964          * When this function is called for the first time,
965          * transport->tcp_offset is 4 (after having already read the xid).
966          */
967         offset = transport->tcp_offset - sizeof(transport->tcp_xid);
968         len = sizeof(calldir) - offset;
969         dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
970         used = xdr_skb_read_bits(desc, &calldir, len);
971         transport->tcp_offset += used;
972         if (used != len)
973                 return;
974         transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
975         transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
976         transport->tcp_flags |= TCP_RCV_COPY_DATA;
977         /*
978          * We don't yet have the XDR buffer, so we will write the calldir
979          * out after we get the buffer from the 'struct rpc_rqst'
980          */
981         if (ntohl(calldir) == RPC_REPLY)
982                 transport->tcp_flags |= TCP_RPC_REPLY;
983         else
984                 transport->tcp_flags &= ~TCP_RPC_REPLY;
985         dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
986                         (transport->tcp_flags & TCP_RPC_REPLY) ?
987                                 "reply for" : "request with", calldir);
988         xs_tcp_check_fraghdr(transport);
989 }
990
991 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
992                                      struct xdr_skb_reader *desc,
993                                      struct rpc_rqst *req)
994 {
995         struct sock_xprt *transport =
996                                 container_of(xprt, struct sock_xprt, xprt);
997         struct xdr_buf *rcvbuf;
998         size_t len;
999         ssize_t r;
1000
1001         rcvbuf = &req->rq_private_buf;
1002
1003         if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1004                 /*
1005                  * Save the RPC direction in the XDR buffer
1006                  */
1007                 __be32  calldir = transport->tcp_flags & TCP_RPC_REPLY ?
1008                                         htonl(RPC_REPLY) : 0;
1009
1010                 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1011                         &calldir, sizeof(calldir));
1012                 transport->tcp_copied += sizeof(calldir);
1013                 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1014         }
1015
1016         len = desc->count;
1017         if (len > transport->tcp_reclen - transport->tcp_offset) {
1018                 struct xdr_skb_reader my_desc;
1019
1020                 len = transport->tcp_reclen - transport->tcp_offset;
1021                 memcpy(&my_desc, desc, sizeof(my_desc));
1022                 my_desc.count = len;
1023                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1024                                           &my_desc, xdr_skb_read_bits);
1025                 desc->count -= r;
1026                 desc->offset += r;
1027         } else
1028                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1029                                           desc, xdr_skb_read_bits);
1030
1031         if (r > 0) {
1032                 transport->tcp_copied += r;
1033                 transport->tcp_offset += r;
1034         }
1035         if (r != len) {
1036                 /* Error when copying to the receive buffer,
1037                  * usually because we weren't able to allocate
1038                  * additional buffer pages. All we can do now
1039                  * is turn off TCP_RCV_COPY_DATA, so the request
1040                  * will not receive any additional updates,
1041                  * and time out.
1042                  * Any remaining data from this record will
1043                  * be discarded.
1044                  */
1045                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1046                 dprintk("RPC:       XID %08x truncated request\n",
1047                                 ntohl(transport->tcp_xid));
1048                 dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1049                                 "tcp_offset = %u, tcp_reclen = %u\n",
1050                                 xprt, transport->tcp_copied,
1051                                 transport->tcp_offset, transport->tcp_reclen);
1052                 return;
1053         }
1054
1055         dprintk("RPC:       XID %08x read %Zd bytes\n",
1056                         ntohl(transport->tcp_xid), r);
1057         dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1058                         "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1059                         transport->tcp_offset, transport->tcp_reclen);
1060
1061         if (transport->tcp_copied == req->rq_private_buf.buflen)
1062                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1063         else if (transport->tcp_offset == transport->tcp_reclen) {
1064                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1065                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1066         }
1067
1068         return;
1069 }
1070
1071 /*
1072  * Finds the request corresponding to the RPC xid and invokes the common
1073  * tcp read code to read the data.
1074  */
1075 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1076                                     struct xdr_skb_reader *desc)
1077 {
1078         struct sock_xprt *transport =
1079                                 container_of(xprt, struct sock_xprt, xprt);
1080         struct rpc_rqst *req;
1081
1082         dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1083
1084         /* Find and lock the request corresponding to this xid */
1085         spin_lock(&xprt->transport_lock);
1086         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1087         if (!req) {
1088                 dprintk("RPC:       XID %08x request not found!\n",
1089                                 ntohl(transport->tcp_xid));
1090                 spin_unlock(&xprt->transport_lock);
1091                 return -1;
1092         }
1093
1094         xs_tcp_read_common(xprt, desc, req);
1095
1096         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1097                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1098
1099         spin_unlock(&xprt->transport_lock);
1100         return 0;
1101 }
1102
1103 #if defined(CONFIG_NFS_V4_1)
1104 /*
1105  * Obtains an rpc_rqst previously allocated and invokes the common
1106  * tcp read code to read the data.  The result is placed in the callback
1107  * queue.
1108  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1109  * connection and return -1.
1110  */
1111 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1112                                        struct xdr_skb_reader *desc)
1113 {
1114         struct sock_xprt *transport =
1115                                 container_of(xprt, struct sock_xprt, xprt);
1116         struct rpc_rqst *req;
1117
1118         req = xprt_alloc_bc_request(xprt);
1119         if (req == NULL) {
1120                 printk(KERN_WARNING "Callback slot table overflowed\n");
1121                 xprt_force_disconnect(xprt);
1122                 return -1;
1123         }
1124
1125         req->rq_xid = transport->tcp_xid;
1126         dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1127         xs_tcp_read_common(xprt, desc, req);
1128
1129         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1130                 struct svc_serv *bc_serv = xprt->bc_serv;
1131
1132                 /*
1133                  * Add callback request to callback list.  The callback
1134                  * service sleeps on the sv_cb_waitq waiting for new
1135                  * requests.  Wake it up after adding enqueing the
1136                  * request.
1137                  */
1138                 dprintk("RPC:       add callback request to list\n");
1139                 spin_lock(&bc_serv->sv_cb_lock);
1140                 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1141                 spin_unlock(&bc_serv->sv_cb_lock);
1142                 wake_up(&bc_serv->sv_cb_waitq);
1143         }
1144
1145         req->rq_private_buf.len = transport->tcp_copied;
1146
1147         return 0;
1148 }
1149
1150 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1151                                         struct xdr_skb_reader *desc)
1152 {
1153         struct sock_xprt *transport =
1154                                 container_of(xprt, struct sock_xprt, xprt);
1155
1156         return (transport->tcp_flags & TCP_RPC_REPLY) ?
1157                 xs_tcp_read_reply(xprt, desc) :
1158                 xs_tcp_read_callback(xprt, desc);
1159 }
1160 #else
1161 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1162                                         struct xdr_skb_reader *desc)
1163 {
1164         return xs_tcp_read_reply(xprt, desc);
1165 }
1166 #endif /* CONFIG_NFS_V4_1 */
1167
1168 /*
1169  * Read data off the transport.  This can be either an RPC_CALL or an
1170  * RPC_REPLY.  Relay the processing to helper functions.
1171  */
1172 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1173                                     struct xdr_skb_reader *desc)
1174 {
1175         struct sock_xprt *transport =
1176                                 container_of(xprt, struct sock_xprt, xprt);
1177
1178         if (_xs_tcp_read_data(xprt, desc) == 0)
1179                 xs_tcp_check_fraghdr(transport);
1180         else {
1181                 /*
1182                  * The transport_lock protects the request handling.
1183                  * There's no need to hold it to update the tcp_flags.
1184                  */
1185                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1186         }
1187 }
1188
1189 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1190 {
1191         size_t len;
1192
1193         len = transport->tcp_reclen - transport->tcp_offset;
1194         if (len > desc->count)
1195                 len = desc->count;
1196         desc->count -= len;
1197         desc->offset += len;
1198         transport->tcp_offset += len;
1199         dprintk("RPC:       discarded %Zu bytes\n", len);
1200         xs_tcp_check_fraghdr(transport);
1201 }
1202
1203 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1204 {
1205         struct rpc_xprt *xprt = rd_desc->arg.data;
1206         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1207         struct xdr_skb_reader desc = {
1208                 .skb    = skb,
1209                 .offset = offset,
1210                 .count  = len,
1211         };
1212
1213         dprintk("RPC:       xs_tcp_data_recv started\n");
1214         do {
1215                 /* Read in a new fragment marker if necessary */
1216                 /* Can we ever really expect to get completely empty fragments? */
1217                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1218                         xs_tcp_read_fraghdr(xprt, &desc);
1219                         continue;
1220                 }
1221                 /* Read in the xid if necessary */
1222                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1223                         xs_tcp_read_xid(transport, &desc);
1224                         continue;
1225                 }
1226                 /* Read in the call/reply flag */
1227                 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1228                         xs_tcp_read_calldir(transport, &desc);
1229                         continue;
1230                 }
1231                 /* Read in the request data */
1232                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1233                         xs_tcp_read_data(xprt, &desc);
1234                         continue;
1235                 }
1236                 /* Skip over any trailing bytes on short reads */
1237                 xs_tcp_read_discard(transport, &desc);
1238         } while (desc.count);
1239         dprintk("RPC:       xs_tcp_data_recv done\n");
1240         return len - desc.count;
1241 }
1242
1243 /**
1244  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1245  * @sk: socket with data to read
1246  * @bytes: how much data to read
1247  *
1248  */
1249 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1250 {
1251         struct rpc_xprt *xprt;
1252         read_descriptor_t rd_desc;
1253         int read;
1254
1255         dprintk("RPC:       xs_tcp_data_ready...\n");
1256
1257         read_lock(&sk->sk_callback_lock);
1258         if (!(xprt = xprt_from_sock(sk)))
1259                 goto out;
1260         if (xprt->shutdown)
1261                 goto out;
1262
1263         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1264         rd_desc.arg.data = xprt;
1265         do {
1266                 rd_desc.count = 65536;
1267                 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1268         } while (read > 0);
1269 out:
1270         read_unlock(&sk->sk_callback_lock);
1271 }
1272
1273 /*
1274  * Do the equivalent of linger/linger2 handling for dealing with
1275  * broken servers that don't close the socket in a timely
1276  * fashion
1277  */
1278 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1279                 unsigned long timeout)
1280 {
1281         struct sock_xprt *transport;
1282
1283         if (xprt_test_and_set_connecting(xprt))
1284                 return;
1285         set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1286         transport = container_of(xprt, struct sock_xprt, xprt);
1287         queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1288                            timeout);
1289 }
1290
1291 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1292 {
1293         struct sock_xprt *transport;
1294
1295         transport = container_of(xprt, struct sock_xprt, xprt);
1296
1297         if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1298             !cancel_delayed_work(&transport->connect_worker))
1299                 return;
1300         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1301         xprt_clear_connecting(xprt);
1302 }
1303
1304 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1305 {
1306         smp_mb__before_clear_bit();
1307         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1308         clear_bit(XPRT_CLOSING, &xprt->state);
1309         smp_mb__after_clear_bit();
1310         /* Mark transport as closed and wake up all pending tasks */
1311         xprt_disconnect_done(xprt);
1312 }
1313
1314 /**
1315  * xs_tcp_state_change - callback to handle TCP socket state changes
1316  * @sk: socket whose state has changed
1317  *
1318  */
1319 static void xs_tcp_state_change(struct sock *sk)
1320 {
1321         struct rpc_xprt *xprt;
1322
1323         read_lock(&sk->sk_callback_lock);
1324         if (!(xprt = xprt_from_sock(sk)))
1325                 goto out;
1326         dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1327         dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1328                         sk->sk_state, xprt_connected(xprt),
1329                         sock_flag(sk, SOCK_DEAD),
1330                         sock_flag(sk, SOCK_ZAPPED));
1331
1332         switch (sk->sk_state) {
1333         case TCP_ESTABLISHED:
1334                 spin_lock_bh(&xprt->transport_lock);
1335                 if (!xprt_test_and_set_connected(xprt)) {
1336                         struct sock_xprt *transport = container_of(xprt,
1337                                         struct sock_xprt, xprt);
1338
1339                         /* Reset TCP record info */
1340                         transport->tcp_offset = 0;
1341                         transport->tcp_reclen = 0;
1342                         transport->tcp_copied = 0;
1343                         transport->tcp_flags =
1344                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1345
1346                         xprt_wake_pending_tasks(xprt, -EAGAIN);
1347                 }
1348                 spin_unlock_bh(&xprt->transport_lock);
1349                 break;
1350         case TCP_FIN_WAIT1:
1351                 /* The client initiated a shutdown of the socket */
1352                 xprt->connect_cookie++;
1353                 xprt->reestablish_timeout = 0;
1354                 set_bit(XPRT_CLOSING, &xprt->state);
1355                 smp_mb__before_clear_bit();
1356                 clear_bit(XPRT_CONNECTED, &xprt->state);
1357                 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1358                 smp_mb__after_clear_bit();
1359                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1360                 break;
1361         case TCP_CLOSE_WAIT:
1362                 /* The server initiated a shutdown of the socket */
1363                 xprt_force_disconnect(xprt);
1364         case TCP_SYN_SENT:
1365                 xprt->connect_cookie++;
1366         case TCP_CLOSING:
1367                 /*
1368                  * If the server closed down the connection, make sure that
1369                  * we back off before reconnecting
1370                  */
1371                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1372                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1373                 break;
1374         case TCP_LAST_ACK:
1375                 set_bit(XPRT_CLOSING, &xprt->state);
1376                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1377                 smp_mb__before_clear_bit();
1378                 clear_bit(XPRT_CONNECTED, &xprt->state);
1379                 smp_mb__after_clear_bit();
1380                 break;
1381         case TCP_CLOSE:
1382                 xs_tcp_cancel_linger_timeout(xprt);
1383                 xs_sock_mark_closed(xprt);
1384         }
1385  out:
1386         read_unlock(&sk->sk_callback_lock);
1387 }
1388
1389 /**
1390  * xs_error_report - callback mainly for catching socket errors
1391  * @sk: socket
1392  */
1393 static void xs_error_report(struct sock *sk)
1394 {
1395         struct rpc_xprt *xprt;
1396
1397         read_lock(&sk->sk_callback_lock);
1398         if (!(xprt = xprt_from_sock(sk)))
1399                 goto out;
1400         dprintk("RPC:       %s client %p...\n"
1401                         "RPC:       error %d\n",
1402                         __func__, xprt, sk->sk_err);
1403         xprt_wake_pending_tasks(xprt, -EAGAIN);
1404 out:
1405         read_unlock(&sk->sk_callback_lock);
1406 }
1407
1408 static void xs_write_space(struct sock *sk)
1409 {
1410         struct socket *sock;
1411         struct rpc_xprt *xprt;
1412
1413         if (unlikely(!(sock = sk->sk_socket)))
1414                 return;
1415         clear_bit(SOCK_NOSPACE, &sock->flags);
1416
1417         if (unlikely(!(xprt = xprt_from_sock(sk))))
1418                 return;
1419         if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1420                 return;
1421
1422         xprt_write_space(xprt);
1423 }
1424
1425 /**
1426  * xs_udp_write_space - callback invoked when socket buffer space
1427  *                             becomes available
1428  * @sk: socket whose state has changed
1429  *
1430  * Called when more output buffer space is available for this socket.
1431  * We try not to wake our writers until they can make "significant"
1432  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1433  * with a bunch of small requests.
1434  */
1435 static void xs_udp_write_space(struct sock *sk)
1436 {
1437         read_lock(&sk->sk_callback_lock);
1438
1439         /* from net/core/sock.c:sock_def_write_space */
1440         if (sock_writeable(sk))
1441                 xs_write_space(sk);
1442
1443         read_unlock(&sk->sk_callback_lock);
1444 }
1445
1446 /**
1447  * xs_tcp_write_space - callback invoked when socket buffer space
1448  *                             becomes available
1449  * @sk: socket whose state has changed
1450  *
1451  * Called when more output buffer space is available for this socket.
1452  * We try not to wake our writers until they can make "significant"
1453  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1454  * with a bunch of small requests.
1455  */
1456 static void xs_tcp_write_space(struct sock *sk)
1457 {
1458         read_lock(&sk->sk_callback_lock);
1459
1460         /* from net/core/stream.c:sk_stream_write_space */
1461         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1462                 xs_write_space(sk);
1463
1464         read_unlock(&sk->sk_callback_lock);
1465 }
1466
1467 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1468 {
1469         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1470         struct sock *sk = transport->inet;
1471
1472         if (transport->rcvsize) {
1473                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1474                 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1475         }
1476         if (transport->sndsize) {
1477                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1478                 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1479                 sk->sk_write_space(sk);
1480         }
1481 }
1482
1483 /**
1484  * xs_udp_set_buffer_size - set send and receive limits
1485  * @xprt: generic transport
1486  * @sndsize: requested size of send buffer, in bytes
1487  * @rcvsize: requested size of receive buffer, in bytes
1488  *
1489  * Set socket send and receive buffer size limits.
1490  */
1491 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1492 {
1493         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1494
1495         transport->sndsize = 0;
1496         if (sndsize)
1497                 transport->sndsize = sndsize + 1024;
1498         transport->rcvsize = 0;
1499         if (rcvsize)
1500                 transport->rcvsize = rcvsize + 1024;
1501
1502         xs_udp_do_set_buffer_size(xprt);
1503 }
1504
1505 /**
1506  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1507  * @task: task that timed out
1508  *
1509  * Adjust the congestion window after a retransmit timeout has occurred.
1510  */
1511 static void xs_udp_timer(struct rpc_task *task)
1512 {
1513         xprt_adjust_cwnd(task, -ETIMEDOUT);
1514 }
1515
1516 static unsigned short xs_get_random_port(void)
1517 {
1518         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1519         unsigned short rand = (unsigned short) net_random() % range;
1520         return rand + xprt_min_resvport;
1521 }
1522
1523 /**
1524  * xs_set_port - reset the port number in the remote endpoint address
1525  * @xprt: generic transport
1526  * @port: new port number
1527  *
1528  */
1529 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1530 {
1531         struct sockaddr *addr = xs_addr(xprt);
1532
1533         dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1534
1535         switch (addr->sa_family) {
1536         case AF_INET:
1537                 ((struct sockaddr_in *)addr)->sin_port = htons(port);
1538                 break;
1539         case AF_INET6:
1540                 ((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
1541                 break;
1542         default:
1543                 BUG();
1544         }
1545 }
1546
1547 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1548 {
1549         unsigned short port = transport->port;
1550
1551         if (port == 0 && transport->xprt.resvport)
1552                 port = xs_get_random_port();
1553         return port;
1554 }
1555
1556 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1557 {
1558         if (transport->port != 0)
1559                 transport->port = 0;
1560         if (!transport->xprt.resvport)
1561                 return 0;
1562         if (port <= xprt_min_resvport || port > xprt_max_resvport)
1563                 return xprt_max_resvport;
1564         return --port;
1565 }
1566
1567 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1568 {
1569         struct sockaddr_in myaddr = {
1570                 .sin_family = AF_INET,
1571         };
1572         struct sockaddr_in *sa;
1573         int err, nloop = 0;
1574         unsigned short port = xs_get_srcport(transport, sock);
1575         unsigned short last;
1576
1577         sa = (struct sockaddr_in *)&transport->addr;
1578         myaddr.sin_addr = sa->sin_addr;
1579         do {
1580                 myaddr.sin_port = htons(port);
1581                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1582                                                 sizeof(myaddr));
1583                 if (port == 0)
1584                         break;
1585                 if (err == 0) {
1586                         transport->port = port;
1587                         break;
1588                 }
1589                 last = port;
1590                 port = xs_next_srcport(transport, sock, port);
1591                 if (port > last)
1592                         nloop++;
1593         } while (err == -EADDRINUSE && nloop != 2);
1594         dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1595                         __func__, &myaddr.sin_addr,
1596                         port, err ? "failed" : "ok", err);
1597         return err;
1598 }
1599
1600 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1601 {
1602         struct sockaddr_in6 myaddr = {
1603                 .sin6_family = AF_INET6,
1604         };
1605         struct sockaddr_in6 *sa;
1606         int err, nloop = 0;
1607         unsigned short port = xs_get_srcport(transport, sock);
1608         unsigned short last;
1609
1610         sa = (struct sockaddr_in6 *)&transport->addr;
1611         myaddr.sin6_addr = sa->sin6_addr;
1612         do {
1613                 myaddr.sin6_port = htons(port);
1614                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1615                                                 sizeof(myaddr));
1616                 if (port == 0)
1617                         break;
1618                 if (err == 0) {
1619                         transport->port = port;
1620                         break;
1621                 }
1622                 last = port;
1623                 port = xs_next_srcport(transport, sock, port);
1624                 if (port > last)
1625                         nloop++;
1626         } while (err == -EADDRINUSE && nloop != 2);
1627         dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1628                 &myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1629         return err;
1630 }
1631
1632 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1633 static struct lock_class_key xs_key[2];
1634 static struct lock_class_key xs_slock_key[2];
1635
1636 static inline void xs_reclassify_socket4(struct socket *sock)
1637 {
1638         struct sock *sk = sock->sk;
1639
1640         BUG_ON(sock_owned_by_user(sk));
1641         sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1642                 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1643 }
1644
1645 static inline void xs_reclassify_socket6(struct socket *sock)
1646 {
1647         struct sock *sk = sock->sk;
1648
1649         BUG_ON(sock_owned_by_user(sk));
1650         sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1651                 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1652 }
1653 #else
1654 static inline void xs_reclassify_socket4(struct socket *sock)
1655 {
1656 }
1657
1658 static inline void xs_reclassify_socket6(struct socket *sock)
1659 {
1660 }
1661 #endif
1662
1663 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1664 {
1665         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1666
1667         if (!transport->inet) {
1668                 struct sock *sk = sock->sk;
1669
1670                 write_lock_bh(&sk->sk_callback_lock);
1671
1672                 xs_save_old_callbacks(transport, sk);
1673
1674                 sk->sk_user_data = xprt;
1675                 sk->sk_data_ready = xs_udp_data_ready;
1676                 sk->sk_write_space = xs_udp_write_space;
1677                 sk->sk_error_report = xs_error_report;
1678                 sk->sk_no_check = UDP_CSUM_NORCV;
1679                 sk->sk_allocation = GFP_ATOMIC;
1680
1681                 xprt_set_connected(xprt);
1682
1683                 /* Reset to new socket */
1684                 transport->sock = sock;
1685                 transport->inet = sk;
1686
1687                 write_unlock_bh(&sk->sk_callback_lock);
1688         }
1689         xs_udp_do_set_buffer_size(xprt);
1690 }
1691
1692 /**
1693  * xs_udp_connect_worker4 - set up a UDP socket
1694  * @work: RPC transport to connect
1695  *
1696  * Invoked by a work queue tasklet.
1697  */
1698 static void xs_udp_connect_worker4(struct work_struct *work)
1699 {
1700         struct sock_xprt *transport =
1701                 container_of(work, struct sock_xprt, connect_worker.work);
1702         struct rpc_xprt *xprt = &transport->xprt;
1703         struct socket *sock = transport->sock;
1704         int err, status = -EIO;
1705
1706         if (xprt->shutdown)
1707                 goto out;
1708
1709         /* Start by resetting any existing state */
1710         xs_reset_transport(transport);
1711
1712         err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1713         if (err < 0) {
1714                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1715                 goto out;
1716         }
1717         xs_reclassify_socket4(sock);
1718
1719         if (xs_bind4(transport, sock)) {
1720                 sock_release(sock);
1721                 goto out;
1722         }
1723
1724         dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1725                         xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1726
1727         xs_udp_finish_connecting(xprt, sock);
1728         status = 0;
1729 out:
1730         xprt_clear_connecting(xprt);
1731         xprt_wake_pending_tasks(xprt, status);
1732 }
1733
1734 /**
1735  * xs_udp_connect_worker6 - set up a UDP socket
1736  * @work: RPC transport to connect
1737  *
1738  * Invoked by a work queue tasklet.
1739  */
1740 static void xs_udp_connect_worker6(struct work_struct *work)
1741 {
1742         struct sock_xprt *transport =
1743                 container_of(work, struct sock_xprt, connect_worker.work);
1744         struct rpc_xprt *xprt = &transport->xprt;
1745         struct socket *sock = transport->sock;
1746         int err, status = -EIO;
1747
1748         if (xprt->shutdown)
1749                 goto out;
1750
1751         /* Start by resetting any existing state */
1752         xs_reset_transport(transport);
1753
1754         err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1755         if (err < 0) {
1756                 dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1757                 goto out;
1758         }
1759         xs_reclassify_socket6(sock);
1760
1761         if (xs_bind6(transport, sock) < 0) {
1762                 sock_release(sock);
1763                 goto out;
1764         }
1765
1766         dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1767                         xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1768
1769         xs_udp_finish_connecting(xprt, sock);
1770         status = 0;
1771 out:
1772         xprt_clear_connecting(xprt);
1773         xprt_wake_pending_tasks(xprt, status);
1774 }
1775
1776 /*
1777  * We need to preserve the port number so the reply cache on the server can
1778  * find our cached RPC replies when we get around to reconnecting.
1779  */
1780 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1781 {
1782         int result;
1783         struct sockaddr any;
1784
1785         dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1786
1787         /*
1788          * Disconnect the transport socket by doing a connect operation
1789          * with AF_UNSPEC.  This should return immediately...
1790          */
1791         memset(&any, 0, sizeof(any));
1792         any.sa_family = AF_UNSPEC;
1793         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1794         if (!result)
1795                 xs_sock_mark_closed(xprt);
1796         else
1797                 dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1798                                 result);
1799 }
1800
1801 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1802 {
1803         unsigned int state = transport->inet->sk_state;
1804
1805         if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1806                 return;
1807         if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1808                 return;
1809         xs_abort_connection(xprt, transport);
1810 }
1811
1812 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1813 {
1814         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1815
1816         if (!transport->inet) {
1817                 struct sock *sk = sock->sk;
1818
1819                 write_lock_bh(&sk->sk_callback_lock);
1820
1821                 xs_save_old_callbacks(transport, sk);
1822
1823                 sk->sk_user_data = xprt;
1824                 sk->sk_data_ready = xs_tcp_data_ready;
1825                 sk->sk_state_change = xs_tcp_state_change;
1826                 sk->sk_write_space = xs_tcp_write_space;
1827                 sk->sk_error_report = xs_error_report;
1828                 sk->sk_allocation = GFP_ATOMIC;
1829
1830                 /* socket options */
1831                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1832                 sock_reset_flag(sk, SOCK_LINGER);
1833                 tcp_sk(sk)->linger2 = 0;
1834                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1835
1836                 xprt_clear_connected(xprt);
1837
1838                 /* Reset to new socket */
1839                 transport->sock = sock;
1840                 transport->inet = sk;
1841
1842                 write_unlock_bh(&sk->sk_callback_lock);
1843         }
1844
1845         if (!xprt_bound(xprt))
1846                 return -ENOTCONN;
1847
1848         /* Tell the socket layer to start connecting... */
1849         xprt->stat.connect_count++;
1850         xprt->stat.connect_start = jiffies;
1851         return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1852 }
1853
1854 /**
1855  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1856  * @xprt: RPC transport to connect
1857  * @transport: socket transport to connect
1858  * @create_sock: function to create a socket of the correct type
1859  *
1860  * Invoked by a work queue tasklet.
1861  */
1862 static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1863                 struct sock_xprt *transport,
1864                 struct socket *(*create_sock)(struct rpc_xprt *,
1865                         struct sock_xprt *))
1866 {
1867         struct socket *sock = transport->sock;
1868         int status = -EIO;
1869
1870         if (xprt->shutdown)
1871                 goto out;
1872
1873         if (!sock) {
1874                 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1875                 sock = create_sock(xprt, transport);
1876                 if (IS_ERR(sock)) {
1877                         status = PTR_ERR(sock);
1878                         goto out;
1879                 }
1880         } else {
1881                 int abort_and_exit;
1882
1883                 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1884                                 &xprt->state);
1885                 /* "close" the socket, preserving the local port */
1886                 xs_tcp_reuse_connection(xprt, transport);
1887
1888                 if (abort_and_exit)
1889                         goto out_eagain;
1890         }
1891
1892         dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1893                         xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1894
1895         status = xs_tcp_finish_connecting(xprt, sock);
1896         dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1897                         xprt, -status, xprt_connected(xprt),
1898                         sock->sk->sk_state);
1899         switch (status) {
1900         default:
1901                 printk("%s: connect returned unhandled error %d\n",
1902                         __func__, status);
1903         case -EADDRNOTAVAIL:
1904                 /* We're probably in TIME_WAIT. Get rid of existing socket,
1905                  * and retry
1906                  */
1907                 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1908                 xprt_force_disconnect(xprt);
1909                 break;
1910         case -ECONNREFUSED:
1911         case -ECONNRESET:
1912         case -ENETUNREACH:
1913                 /* retry with existing socket, after a delay */
1914         case 0:
1915         case -EINPROGRESS:
1916         case -EALREADY:
1917                 xprt_clear_connecting(xprt);
1918                 return;
1919         }
1920 out_eagain:
1921         status = -EAGAIN;
1922 out:
1923         xprt_clear_connecting(xprt);
1924         xprt_wake_pending_tasks(xprt, status);
1925 }
1926
1927 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1928                 struct sock_xprt *transport)
1929 {
1930         struct socket *sock;
1931         int err;
1932
1933         /* start from scratch */
1934         err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1935         if (err < 0) {
1936                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
1937                                 -err);
1938                 goto out_err;
1939         }
1940         xs_reclassify_socket4(sock);
1941
1942         if (xs_bind4(transport, sock) < 0) {
1943                 sock_release(sock);
1944                 goto out_err;
1945         }
1946         return sock;
1947 out_err:
1948         return ERR_PTR(-EIO);
1949 }
1950
1951 /**
1952  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1953  * @work: RPC transport to connect
1954  *
1955  * Invoked by a work queue tasklet.
1956  */
1957 static void xs_tcp_connect_worker4(struct work_struct *work)
1958 {
1959         struct sock_xprt *transport =
1960                 container_of(work, struct sock_xprt, connect_worker.work);
1961         struct rpc_xprt *xprt = &transport->xprt;
1962
1963         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
1964 }
1965
1966 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
1967                 struct sock_xprt *transport)
1968 {
1969         struct socket *sock;
1970         int err;
1971
1972         /* start from scratch */
1973         err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
1974         if (err < 0) {
1975                 dprintk("RPC:       can't create TCP transport socket (%d).\n",
1976                                 -err);
1977                 goto out_err;
1978         }
1979         xs_reclassify_socket6(sock);
1980
1981         if (xs_bind6(transport, sock) < 0) {
1982                 sock_release(sock);
1983                 goto out_err;
1984         }
1985         return sock;
1986 out_err:
1987         return ERR_PTR(-EIO);
1988 }
1989
1990 /**
1991  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1992  * @work: RPC transport to connect
1993  *
1994  * Invoked by a work queue tasklet.
1995  */
1996 static void xs_tcp_connect_worker6(struct work_struct *work)
1997 {
1998         struct sock_xprt *transport =
1999                 container_of(work, struct sock_xprt, connect_worker.work);
2000         struct rpc_xprt *xprt = &transport->xprt;
2001
2002         xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
2003 }
2004
2005 /**
2006  * xs_connect - connect a socket to a remote endpoint
2007  * @task: address of RPC task that manages state of connect request
2008  *
2009  * TCP: If the remote end dropped the connection, delay reconnecting.
2010  *
2011  * UDP socket connects are synchronous, but we use a work queue anyway
2012  * to guarantee that even unprivileged user processes can set up a
2013  * socket on a privileged port.
2014  *
2015  * If a UDP socket connect fails, the delay behavior here prevents
2016  * retry floods (hard mounts).
2017  */
2018 static void xs_connect(struct rpc_task *task)
2019 {
2020         struct rpc_xprt *xprt = task->tk_xprt;
2021         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2022
2023         if (xprt_test_and_set_connecting(xprt))
2024                 return;
2025
2026         if (transport->sock != NULL) {
2027                 dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2028                                 "seconds\n",
2029                                 xprt, xprt->reestablish_timeout / HZ);
2030                 queue_delayed_work(rpciod_workqueue,
2031                                    &transport->connect_worker,
2032                                    xprt->reestablish_timeout);
2033                 xprt->reestablish_timeout <<= 1;
2034                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2035                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2036         } else {
2037                 dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2038                 queue_delayed_work(rpciod_workqueue,
2039                                    &transport->connect_worker, 0);
2040         }
2041 }
2042
2043 static void xs_tcp_connect(struct rpc_task *task)
2044 {
2045         struct rpc_xprt *xprt = task->tk_xprt;
2046
2047         /* Exit if we need to wait for socket shutdown to complete */
2048         if (test_bit(XPRT_CLOSING, &xprt->state))
2049                 return;
2050         xs_connect(task);
2051 }
2052
2053 /**
2054  * xs_udp_print_stats - display UDP socket-specifc stats
2055  * @xprt: rpc_xprt struct containing statistics
2056  * @seq: output file
2057  *
2058  */
2059 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2060 {
2061         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2062
2063         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2064                         transport->port,
2065                         xprt->stat.bind_count,
2066                         xprt->stat.sends,
2067                         xprt->stat.recvs,
2068                         xprt->stat.bad_xids,
2069                         xprt->stat.req_u,
2070                         xprt->stat.bklog_u);
2071 }
2072
2073 /**
2074  * xs_tcp_print_stats - display TCP socket-specifc stats
2075  * @xprt: rpc_xprt struct containing statistics
2076  * @seq: output file
2077  *
2078  */
2079 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2080 {
2081         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2082         long idle_time = 0;
2083
2084         if (xprt_connected(xprt))
2085                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2086
2087         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2088                         transport->port,
2089                         xprt->stat.bind_count,
2090                         xprt->stat.connect_count,
2091                         xprt->stat.connect_time,
2092                         idle_time,
2093                         xprt->stat.sends,
2094                         xprt->stat.recvs,
2095                         xprt->stat.bad_xids,
2096                         xprt->stat.req_u,
2097                         xprt->stat.bklog_u);
2098 }
2099
2100 static struct rpc_xprt_ops xs_udp_ops = {
2101         .set_buffer_size        = xs_udp_set_buffer_size,
2102         .reserve_xprt           = xprt_reserve_xprt_cong,
2103         .release_xprt           = xprt_release_xprt_cong,
2104         .rpcbind                = rpcb_getport_async,
2105         .set_port               = xs_set_port,
2106         .connect                = xs_connect,
2107         .buf_alloc              = rpc_malloc,
2108         .buf_free               = rpc_free,
2109         .send_request           = xs_udp_send_request,
2110         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2111         .timer                  = xs_udp_timer,
2112         .release_request        = xprt_release_rqst_cong,
2113         .close                  = xs_close,
2114         .destroy                = xs_destroy,
2115         .print_stats            = xs_udp_print_stats,
2116 };
2117
2118 static struct rpc_xprt_ops xs_tcp_ops = {
2119         .reserve_xprt           = xprt_reserve_xprt,
2120         .release_xprt           = xs_tcp_release_xprt,
2121         .rpcbind                = rpcb_getport_async,
2122         .set_port               = xs_set_port,
2123         .connect                = xs_tcp_connect,
2124         .buf_alloc              = rpc_malloc,
2125         .buf_free               = rpc_free,
2126         .send_request           = xs_tcp_send_request,
2127         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2128 #if defined(CONFIG_NFS_V4_1)
2129         .release_request        = bc_release_request,
2130 #endif /* CONFIG_NFS_V4_1 */
2131         .close                  = xs_tcp_close,
2132         .destroy                = xs_destroy,
2133         .print_stats            = xs_tcp_print_stats,
2134 };
2135
2136 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2137                                       unsigned int slot_table_size)
2138 {
2139         struct rpc_xprt *xprt;
2140         struct sock_xprt *new;
2141
2142         if (args->addrlen > sizeof(xprt->addr)) {
2143                 dprintk("RPC:       xs_setup_xprt: address too large\n");
2144                 return ERR_PTR(-EBADF);
2145         }
2146
2147         new = kzalloc(sizeof(*new), GFP_KERNEL);
2148         if (new == NULL) {
2149                 dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2150                                 "rpc_xprt\n");
2151                 return ERR_PTR(-ENOMEM);
2152         }
2153         xprt = &new->xprt;
2154
2155         xprt->max_reqs = slot_table_size;
2156         xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2157         if (xprt->slot == NULL) {
2158                 kfree(xprt);
2159                 dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2160                                 "table\n");
2161                 return ERR_PTR(-ENOMEM);
2162         }
2163
2164         memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2165         xprt->addrlen = args->addrlen;
2166         if (args->srcaddr)
2167                 memcpy(&new->addr, args->srcaddr, args->addrlen);
2168
2169         return xprt;
2170 }
2171
2172 static const struct rpc_timeout xs_udp_default_timeout = {
2173         .to_initval = 5 * HZ,
2174         .to_maxval = 30 * HZ,
2175         .to_increment = 5 * HZ,
2176         .to_retries = 5,
2177 };
2178
2179 /**
2180  * xs_setup_udp - Set up transport to use a UDP socket
2181  * @args: rpc transport creation arguments
2182  *
2183  */
2184 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2185 {
2186         struct sockaddr *addr = args->dstaddr;
2187         struct rpc_xprt *xprt;
2188         struct sock_xprt *transport;
2189
2190         xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2191         if (IS_ERR(xprt))
2192                 return xprt;
2193         transport = container_of(xprt, struct sock_xprt, xprt);
2194
2195         xprt->prot = IPPROTO_UDP;
2196         xprt->tsh_size = 0;
2197         /* XXX: header size can vary due to auth type, IPv6, etc. */
2198         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2199
2200         xprt->bind_timeout = XS_BIND_TO;
2201         xprt->connect_timeout = XS_UDP_CONN_TO;
2202         xprt->reestablish_timeout = XS_UDP_REEST_TO;
2203         xprt->idle_timeout = XS_IDLE_DISC_TO;
2204
2205         xprt->ops = &xs_udp_ops;
2206
2207         xprt->timeout = &xs_udp_default_timeout;
2208
2209         switch (addr->sa_family) {
2210         case AF_INET:
2211                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2212                         xprt_set_bound(xprt);
2213
2214                 INIT_DELAYED_WORK(&transport->connect_worker,
2215                                         xs_udp_connect_worker4);
2216                 xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2217                 break;
2218         case AF_INET6:
2219                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2220                         xprt_set_bound(xprt);
2221
2222                 INIT_DELAYED_WORK(&transport->connect_worker,
2223                                         xs_udp_connect_worker6);
2224                 xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2225                 break;
2226         default:
2227                 kfree(xprt);
2228                 return ERR_PTR(-EAFNOSUPPORT);
2229         }
2230
2231         dprintk("RPC:       set up transport to address %s\n",
2232                         xprt->address_strings[RPC_DISPLAY_ALL]);
2233
2234         if (try_module_get(THIS_MODULE))
2235                 return xprt;
2236
2237         kfree(xprt->slot);
2238         kfree(xprt);
2239         return ERR_PTR(-EINVAL);
2240 }
2241
2242 static const struct rpc_timeout xs_tcp_default_timeout = {
2243         .to_initval = 60 * HZ,
2244         .to_maxval = 60 * HZ,
2245         .to_retries = 2,
2246 };
2247
2248 /**
2249  * xs_setup_tcp - Set up transport to use a TCP socket
2250  * @args: rpc transport creation arguments
2251  *
2252  */
2253 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2254 {
2255         struct sockaddr *addr = args->dstaddr;
2256         struct rpc_xprt *xprt;
2257         struct sock_xprt *transport;
2258
2259         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2260         if (IS_ERR(xprt))
2261                 return xprt;
2262         transport = container_of(xprt, struct sock_xprt, xprt);
2263
2264         xprt->prot = IPPROTO_TCP;
2265         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2266         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2267
2268         xprt->bind_timeout = XS_BIND_TO;
2269         xprt->connect_timeout = XS_TCP_CONN_TO;
2270         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2271         xprt->idle_timeout = XS_IDLE_DISC_TO;
2272
2273         xprt->ops = &xs_tcp_ops;
2274         xprt->timeout = &xs_tcp_default_timeout;
2275
2276         switch (addr->sa_family) {
2277         case AF_INET:
2278                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2279                         xprt_set_bound(xprt);
2280
2281                 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
2282                 xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2283                 break;
2284         case AF_INET6:
2285                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2286                         xprt_set_bound(xprt);
2287
2288                 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
2289                 xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2290                 break;
2291         default:
2292                 kfree(xprt);
2293                 return ERR_PTR(-EAFNOSUPPORT);
2294         }
2295
2296         dprintk("RPC:       set up transport to address %s\n",
2297                         xprt->address_strings[RPC_DISPLAY_ALL]);
2298
2299         if (try_module_get(THIS_MODULE))
2300                 return xprt;
2301
2302         kfree(xprt->slot);
2303         kfree(xprt);
2304         return ERR_PTR(-EINVAL);
2305 }
2306
2307 static struct xprt_class        xs_udp_transport = {
2308         .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2309         .name           = "udp",
2310         .owner          = THIS_MODULE,
2311         .ident          = IPPROTO_UDP,
2312         .setup          = xs_setup_udp,
2313 };
2314
2315 static struct xprt_class        xs_tcp_transport = {
2316         .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2317         .name           = "tcp",
2318         .owner          = THIS_MODULE,
2319         .ident          = IPPROTO_TCP,
2320         .setup          = xs_setup_tcp,
2321 };
2322
2323 /**
2324  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2325  *
2326  */
2327 int init_socket_xprt(void)
2328 {
2329 #ifdef RPC_DEBUG
2330         if (!sunrpc_table_header)
2331                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
2332 #endif
2333
2334         xprt_register_transport(&xs_udp_transport);
2335         xprt_register_transport(&xs_tcp_transport);
2336
2337         return 0;
2338 }
2339
2340 /**
2341  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2342  *
2343  */
2344 void cleanup_socket_xprt(void)
2345 {
2346 #ifdef RPC_DEBUG
2347         if (sunrpc_table_header) {
2348                 unregister_sysctl_table(sunrpc_table_header);
2349                 sunrpc_table_header = NULL;
2350         }
2351 #endif
2352
2353         xprt_unregister_transport(&xs_udp_transport);
2354         xprt_unregister_transport(&xs_tcp_transport);
2355 }