SUNRPC: Simplify TCP receive code by switching to using iterators
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Fri, 14 Sep 2018 13:49:06 +0000 (09:49 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:16 +0000 (15:35 -0400)
Most of this code should also be reusable with other socket types.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/xprtsock.h
include/trace/events/sunrpc.h
net/sunrpc/xprtsock.c

index 005cfb6e7238531666ceab7827c4d99dd518c7fa..458bfe0137f5ec818dca7f1da7deb264940652d8 100644 (file)
@@ -31,15 +31,16 @@ struct sock_xprt {
         * State of TCP reply receive
         */
        struct {
-               __be32          fraghdr,
+               struct {
+                       __be32  fraghdr,
                                xid,
                                calldir;
+               } __attribute__((packed));
 
                u32             offset,
                                len;
 
-               unsigned long   copied,
-                               flags;
+               unsigned long   copied;
        } recv;
 
        /*
@@ -76,21 +77,9 @@ struct sock_xprt {
        void                    (*old_error_report)(struct sock *);
 };
 
-/*
- * TCP receive state flags
- */
-#define TCP_RCV_LAST_FRAG      (1UL << 0)
-#define TCP_RCV_COPY_FRAGHDR   (1UL << 1)
-#define TCP_RCV_COPY_XID       (1UL << 2)
-#define TCP_RCV_COPY_DATA      (1UL << 3)
-#define TCP_RCV_READ_CALLDIR   (1UL << 4)
-#define TCP_RCV_COPY_CALLDIR   (1UL << 5)
-
 /*
  * TCP RPC flags
  */
-#define TCP_RPC_REPLY          (1UL << 6)
-
 #define XPRT_SOCK_CONNECTING   1U
 #define XPRT_SOCK_DATA_READY   (2)
 #define XPRT_SOCK_UPD_TIMEOUT  (3)
index 0aa347194e0f1b781c5bad00af2384f0dd600735..19e08d12696c976333cf0b9c5f69fc623cbb9814 100644 (file)
@@ -497,16 +497,6 @@ TRACE_EVENT(xs_tcp_data_ready,
                        __get_str(port), __entry->err, __entry->total)
 );
 
-#define rpc_show_sock_xprt_flags(flags) \
-       __print_flags(flags, "|", \
-               { TCP_RCV_LAST_FRAG, "TCP_RCV_LAST_FRAG" }, \
-               { TCP_RCV_COPY_FRAGHDR, "TCP_RCV_COPY_FRAGHDR" }, \
-               { TCP_RCV_COPY_XID, "TCP_RCV_COPY_XID" }, \
-               { TCP_RCV_COPY_DATA, "TCP_RCV_COPY_DATA" }, \
-               { TCP_RCV_READ_CALLDIR, "TCP_RCV_READ_CALLDIR" }, \
-               { TCP_RCV_COPY_CALLDIR, "TCP_RCV_COPY_CALLDIR" }, \
-               { TCP_RPC_REPLY, "TCP_RPC_REPLY" })
-
 TRACE_EVENT(xs_tcp_data_recv,
        TP_PROTO(struct sock_xprt *xs),
 
@@ -516,7 +506,6 @@ TRACE_EVENT(xs_tcp_data_recv,
                __string(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR])
                __string(port, xs->xprt.address_strings[RPC_DISPLAY_PORT])
                __field(u32, xid)
-               __field(unsigned long, flags)
                __field(unsigned long, copied)
                __field(unsigned int, reclen)
                __field(unsigned long, offset)
@@ -526,15 +515,13 @@ TRACE_EVENT(xs_tcp_data_recv,
                __assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]);
                __assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]);
                __entry->xid = be32_to_cpu(xs->recv.xid);
-               __entry->flags = xs->recv.flags;
                __entry->copied = xs->recv.copied;
                __entry->reclen = xs->recv.len;
                __entry->offset = xs->recv.offset;
        ),
 
-       TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu",
+       TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%lu",
                        __get_str(addr), __get_str(port), __entry->xid,
-                       rpc_show_sock_xprt_flags(__entry->flags),
                        __entry->copied, __entry->reclen, __entry->offset)
 );
 
index f16406228ead9b44615211fd1498c7cbaf252c3a..06aa750087086a21b23c3c892e8bcca0bda2af7e 100644 (file)
 #include <net/checksum.h>
 #include <net/udp.h>
 #include <net/tcp.h>
+#include <linux/bvec.h>
+#include <linux/uio.h>
 
 #include <trace/events/sunrpc.h>
 
 #include "sunrpc.h"
 
-#define RPC_TCP_READ_CHUNK_SZ  (3*512*1024)
-
 static void xs_close(struct rpc_xprt *xprt);
 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
                struct socket *sock);
@@ -325,6 +325,323 @@ static void xs_free_peer_addresses(struct rpc_xprt *xprt)
                }
 }
 
+static size_t
+xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
+{
+       size_t i,n;
+
+       if (!(buf->flags & XDRBUF_SPARSE_PAGES))
+               return want;
+       if (want > buf->page_len)
+               want = buf->page_len;
+       n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
+       for (i = 0; i < n; i++) {
+               if (buf->pages[i])
+                       continue;
+               buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
+               if (!buf->pages[i]) {
+                       buf->page_len = (i * PAGE_SIZE) - buf->page_base;
+                       return buf->page_len;
+               }
+       }
+       return want;
+}
+
+static ssize_t
+xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
+{
+       ssize_t ret;
+       if (seek != 0)
+               iov_iter_advance(&msg->msg_iter, seek);
+       ret = sock_recvmsg(sock, msg, flags);
+       return ret > 0 ? ret + seek : ret;
+}
+
+static ssize_t
+xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
+               struct kvec *kvec, size_t count, size_t seek)
+{
+       iov_iter_kvec(&msg->msg_iter, READ | ITER_KVEC, kvec, 1, count);
+       return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
+               struct bio_vec *bvec, unsigned long nr, size_t count,
+               size_t seek)
+{
+       iov_iter_bvec(&msg->msg_iter, READ | ITER_BVEC, bvec, nr, count);
+       return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
+               size_t count)
+{
+       struct kvec kvec = { 0 };
+       return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, 0);
+}
+
+static ssize_t
+xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
+               struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
+{
+       size_t want, seek_init = seek, offset = 0;
+       ssize_t ret;
+
+       if (seek < buf->head[0].iov_len) {
+               want = min_t(size_t, count, buf->head[0].iov_len);
+               ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
+               if (ret <= 0)
+                       goto sock_err;
+               offset += ret;
+               if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+                       goto out;
+               if (ret != want)
+                       goto eagain;
+               seek = 0;
+       } else {
+               seek -= buf->head[0].iov_len;
+               offset += buf->head[0].iov_len;
+       }
+       if (seek < buf->page_len) {
+               want = xs_alloc_sparse_pages(buf,
+                               min_t(size_t, count - offset, buf->page_len),
+                               GFP_NOWAIT);
+               ret = xs_read_bvec(sock, msg, flags, buf->bvec,
+                               xdr_buf_pagecount(buf),
+                               want + buf->page_base,
+                               seek + buf->page_base);
+               if (ret <= 0)
+                       goto sock_err;
+               offset += ret - buf->page_base;
+               if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+                       goto out;
+               if (ret != want)
+                       goto eagain;
+               seek = 0;
+       } else {
+               seek -= buf->page_len;
+               offset += buf->page_len;
+       }
+       if (seek < buf->tail[0].iov_len) {
+               want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+               ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
+               if (ret <= 0)
+                       goto sock_err;
+               offset += ret;
+               if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+                       goto out;
+               if (ret != want)
+                       goto eagain;
+       } else
+               offset += buf->tail[0].iov_len;
+       ret = -EMSGSIZE;
+       msg->msg_flags |= MSG_TRUNC;
+out:
+       *read = offset - seek_init;
+       return ret;
+eagain:
+       ret = -EAGAIN;
+       goto out;
+sock_err:
+       offset += seek;
+       goto out;
+}
+
+static void
+xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
+{
+       if (!transport->recv.copied) {
+               if (buf->head[0].iov_len >= transport->recv.offset)
+                       memcpy(buf->head[0].iov_base,
+                                       &transport->recv.xid,
+                                       transport->recv.offset);
+               transport->recv.copied = transport->recv.offset;
+       }
+}
+
+static bool
+xs_read_stream_request_done(struct sock_xprt *transport)
+{
+       return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
+}
+
+static ssize_t
+xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
+               int flags, struct rpc_rqst *req)
+{
+       struct xdr_buf *buf = &req->rq_private_buf;
+       size_t want, read;
+       ssize_t ret;
+
+       xs_read_header(transport, buf);
+
+       want = transport->recv.len - transport->recv.offset;
+       ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
+                       transport->recv.copied + want, transport->recv.copied,
+                       &read);
+       transport->recv.offset += read;
+       transport->recv.copied += read;
+       if (transport->recv.offset == transport->recv.len) {
+               if (xs_read_stream_request_done(transport))
+                       msg->msg_flags |= MSG_EOR;
+               return transport->recv.copied;
+       }
+
+       switch (ret) {
+       case -EMSGSIZE:
+               return transport->recv.copied;
+       case 0:
+               return -ESHUTDOWN;
+       default:
+               if (ret < 0)
+                       return ret;
+       }
+       return -EAGAIN;
+}
+
+static size_t
+xs_read_stream_headersize(bool isfrag)
+{
+       if (isfrag)
+               return sizeof(__be32);
+       return 3 * sizeof(__be32);
+}
+
+static ssize_t
+xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
+               int flags, size_t want, size_t seek)
+{
+       struct kvec kvec = {
+               .iov_base = &transport->recv.fraghdr,
+               .iov_len = want,
+       };
+       return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
+}
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+       struct rpc_xprt *xprt = &transport->xprt;
+       struct rpc_rqst *req;
+       ssize_t ret;
+
+       /* Look up and lock the request corresponding to the given XID */
+       req = xprt_lookup_bc_request(xprt, transport->recv.xid);
+       if (!req) {
+               printk(KERN_WARNING "Callback slot table overflowed\n");
+               return -ESHUTDOWN;
+       }
+
+       ret = xs_read_stream_request(transport, msg, flags, req);
+       if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+               xprt_complete_bc_request(req, ret);
+
+       return ret;
+}
+#else /* CONFIG_SUNRPC_BACKCHANNEL */
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+       return -ESHUTDOWN;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+static ssize_t
+xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+       struct rpc_xprt *xprt = &transport->xprt;
+       struct rpc_rqst *req;
+       ssize_t ret = 0;
+
+       /* Look up and lock the request corresponding to the given XID */
+       spin_lock(&xprt->queue_lock);
+       req = xprt_lookup_rqst(xprt, transport->recv.xid);
+       if (!req) {
+               msg->msg_flags |= MSG_TRUNC;
+               goto out;
+       }
+       xprt_pin_rqst(req);
+       spin_unlock(&xprt->queue_lock);
+
+       ret = xs_read_stream_request(transport, msg, flags, req);
+
+       spin_lock(&xprt->queue_lock);
+       if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+               xprt_complete_rqst(req->rq_task, ret);
+       xprt_unpin_rqst(req);
+out:
+       spin_unlock(&xprt->queue_lock);
+       return ret;
+}
+
+static ssize_t
+xs_read_stream(struct sock_xprt *transport, int flags)
+{
+       struct msghdr msg = { 0 };
+       size_t want, read = 0;
+       ssize_t ret = 0;
+
+       if (transport->recv.len == 0) {
+               want = xs_read_stream_headersize(transport->recv.copied != 0);
+               ret = xs_read_stream_header(transport, &msg, flags, want,
+                               transport->recv.offset);
+               if (ret <= 0)
+                       goto out_err;
+               transport->recv.offset = ret;
+               if (ret != want) {
+                       ret = -EAGAIN;
+                       goto out_err;
+               }
+               transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
+                       RPC_FRAGMENT_SIZE_MASK;
+               transport->recv.offset -= sizeof(transport->recv.fraghdr);
+               read = ret;
+       }
+
+       switch (be32_to_cpu(transport->recv.calldir)) {
+       case RPC_CALL:
+               ret = xs_read_stream_call(transport, &msg, flags);
+               break;
+       case RPC_REPLY:
+               ret = xs_read_stream_reply(transport, &msg, flags);
+       }
+       if (msg.msg_flags & MSG_TRUNC) {
+               transport->recv.calldir = cpu_to_be32(-1);
+               transport->recv.copied = -1;
+       }
+       if (ret < 0)
+               goto out_err;
+       read += ret;
+       if (transport->recv.offset < transport->recv.len) {
+               ret = xs_read_discard(transport->sock, &msg, flags,
+                               transport->recv.len - transport->recv.offset);
+               if (ret <= 0)
+                       goto out_err;
+               transport->recv.offset += ret;
+               read += ret;
+               if (transport->recv.offset != transport->recv.len)
+                       return -EAGAIN;
+       }
+       if (xs_read_stream_request_done(transport)) {
+               trace_xs_tcp_data_recv(transport);
+               transport->recv.copied = 0;
+       }
+       transport->recv.offset = 0;
+       transport->recv.len = 0;
+       return read;
+out_err:
+       switch (ret) {
+       case 0:
+       case -ESHUTDOWN:
+               xprt_force_disconnect(&transport->xprt);
+               return -ESHUTDOWN;
+       }
+       return ret;
+}
+
 #define XS_SENDMSG_FLAGS       (MSG_DONTWAIT | MSG_NOSIGNAL)
 
 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
@@ -484,6 +801,12 @@ static int xs_nospace(struct rpc_rqst *req)
        return ret;
 }
 
+static void
+xs_stream_prepare_request(struct rpc_rqst *req)
+{
+       req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
+}
+
 /*
  * Determine if the previous message in the stream was aborted before it
  * could complete transmission.
@@ -1157,263 +1480,7 @@ static void xs_tcp_force_close(struct rpc_xprt *xprt)
        xprt_force_disconnect(xprt);
 }
 
-static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-       size_t len, used;
-       char *p;
-
-       p = ((char *) &transport->recv.fraghdr) + transport->recv.offset;
-       len = sizeof(transport->recv.fraghdr) - transport->recv.offset;
-       used = xdr_skb_read_bits(desc, p, len);
-       transport->recv.offset += used;
-       if (used != len)
-               return;
-
-       transport->recv.len = ntohl(transport->recv.fraghdr);
-       if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT)
-               transport->recv.flags |= TCP_RCV_LAST_FRAG;
-       else
-               transport->recv.flags &= ~TCP_RCV_LAST_FRAG;
-       transport->recv.len &= RPC_FRAGMENT_SIZE_MASK;
-
-       transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR;
-       transport->recv.offset = 0;
-
-       /* Sanity check of the record length */
-       if (unlikely(transport->recv.len < 8)) {
-               dprintk("RPC:       invalid TCP record fragment length\n");
-               xs_tcp_force_close(xprt);
-               return;
-       }
-       dprintk("RPC:       reading TCP record fragment of length %d\n",
-                       transport->recv.len);
-}
-
-static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
-{
-       if (transport->recv.offset == transport->recv.len) {
-               transport->recv.flags |= TCP_RCV_COPY_FRAGHDR;
-               transport->recv.offset = 0;
-               if (transport->recv.flags & TCP_RCV_LAST_FRAG) {
-                       transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-                       transport->recv.flags |= TCP_RCV_COPY_XID;
-                       transport->recv.copied = 0;
-               }
-       }
-}
-
-static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-       size_t len, used;
-       char *p;
-
-       len = sizeof(transport->recv.xid) - transport->recv.offset;
-       dprintk("RPC:       reading XID (%zu bytes)\n", len);
-       p = ((char *) &transport->recv.xid) + transport->recv.offset;
-       used = xdr_skb_read_bits(desc, p, len);
-       transport->recv.offset += used;
-       if (used != len)
-               return;
-       transport->recv.flags &= ~TCP_RCV_COPY_XID;
-       transport->recv.flags |= TCP_RCV_READ_CALLDIR;
-       transport->recv.copied = 4;
-       dprintk("RPC:       reading %s XID %08x\n",
-                       (transport->recv.flags & TCP_RPC_REPLY) ? "reply for"
-                                                             : "request with",
-                       ntohl(transport->recv.xid));
-       xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
-                                      struct xdr_skb_reader *desc)
-{
-       size_t len, used;
-       u32 offset;
-       char *p;
-
-       /*
-        * We want transport->recv.offset to be 8 at the end of this routine
-        * (4 bytes for the xid and 4 bytes for the call/reply flag).
-        * When this function is called for the first time,
-        * transport->recv.offset is 4 (after having already read the xid).
-        */
-       offset = transport->recv.offset - sizeof(transport->recv.xid);
-       len = sizeof(transport->recv.calldir) - offset;
-       dprintk("RPC:       reading CALL/REPLY flag (%zu bytes)\n", len);
-       p = ((char *) &transport->recv.calldir) + offset;
-       used = xdr_skb_read_bits(desc, p, len);
-       transport->recv.offset += used;
-       if (used != len)
-               return;
-       transport->recv.flags &= ~TCP_RCV_READ_CALLDIR;
-       /*
-        * We don't yet have the XDR buffer, so we will write the calldir
-        * out after we get the buffer from the 'struct rpc_rqst'
-        */
-       switch (ntohl(transport->recv.calldir)) {
-       case RPC_REPLY:
-               transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
-               transport->recv.flags |= TCP_RCV_COPY_DATA;
-               transport->recv.flags |= TCP_RPC_REPLY;
-               break;
-       case RPC_CALL:
-               transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
-               transport->recv.flags |= TCP_RCV_COPY_DATA;
-               transport->recv.flags &= ~TCP_RPC_REPLY;
-               break;
-       default:
-               dprintk("RPC:       invalid request message type\n");
-               xs_tcp_force_close(&transport->xprt);
-       }
-       xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
-                                    struct xdr_skb_reader *desc,
-                                    struct rpc_rqst *req)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-       struct xdr_buf *rcvbuf;
-       size_t len;
-       ssize_t r;
-
-       rcvbuf = &req->rq_private_buf;
-
-       if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) {
-               /*
-                * Save the RPC direction in the XDR buffer
-                */
-               memcpy(rcvbuf->head[0].iov_base + transport->recv.copied,
-                       &transport->recv.calldir,
-                       sizeof(transport->recv.calldir));
-               transport->recv.copied += sizeof(transport->recv.calldir);
-               transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR;
-       }
-
-       len = desc->count;
-       if (len > transport->recv.len - transport->recv.offset)
-               desc->count = transport->recv.len - transport->recv.offset;
-       r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied,
-                                         desc, xdr_skb_read_bits);
-
-       if (desc->count) {
-               /* Error when copying to the receive buffer,
-                * usually because we weren't able to allocate
-                * additional buffer pages. All we can do now
-                * is turn off TCP_RCV_COPY_DATA, so the request
-                * will not receive any additional updates,
-                * and time out.
-                * Any remaining data from this record will
-                * be discarded.
-                */
-               transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-               dprintk("RPC:       XID %08x truncated request\n",
-                               ntohl(transport->recv.xid));
-               dprintk("RPC:       xprt = %p, recv.copied = %lu, "
-                               "recv.offset = %u, recv.len = %u\n",
-                               xprt, transport->recv.copied,
-                               transport->recv.offset, transport->recv.len);
-               return;
-       }
-
-       transport->recv.copied += r;
-       transport->recv.offset += r;
-       desc->count = len - r;
-
-       dprintk("RPC:       XID %08x read %zd bytes\n",
-                       ntohl(transport->recv.xid), r);
-       dprintk("RPC:       xprt = %p, recv.copied = %lu, recv.offset = %u, "
-                       "recv.len = %u\n", xprt, transport->recv.copied,
-                       transport->recv.offset, transport->recv.len);
-
-       if (transport->recv.copied == req->rq_private_buf.buflen)
-               transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-       else if (transport->recv.offset == transport->recv.len) {
-               if (transport->recv.flags & TCP_RCV_LAST_FRAG)
-                       transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-       }
-}
-
-/*
- * Finds the request corresponding to the RPC xid and invokes the common
- * tcp read code to read the data.
- */
-static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
-                                   struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-       struct rpc_rqst *req;
-
-       dprintk("RPC:       read reply XID %08x\n", ntohl(transport->recv.xid));
-
-       /* Find and lock the request corresponding to this xid */
-       spin_lock(&xprt->queue_lock);
-       req = xprt_lookup_rqst(xprt, transport->recv.xid);
-       if (!req) {
-               dprintk("RPC:       XID %08x request not found!\n",
-                               ntohl(transport->recv.xid));
-               spin_unlock(&xprt->queue_lock);
-               return -1;
-       }
-       xprt_pin_rqst(req);
-       spin_unlock(&xprt->queue_lock);
-
-       xs_tcp_read_common(xprt, desc, req);
-
-       spin_lock(&xprt->queue_lock);
-       if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
-               xprt_complete_rqst(req->rq_task, transport->recv.copied);
-       xprt_unpin_rqst(req);
-       spin_unlock(&xprt->queue_lock);
-       return 0;
-}
-
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
-/*
- * Obtains an rpc_rqst previously allocated and invokes the common
- * tcp read code to read the data.  The result is placed in the callback
- * queue.
- * If we're unable to obtain the rpc_rqst we schedule the closing of the
- * connection and return -1.
- */
-static int xs_tcp_read_callback(struct rpc_xprt *xprt,
-                                      struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-       struct rpc_rqst *req;
-
-       /* Look up the request corresponding to the given XID */
-       req = xprt_lookup_bc_request(xprt, transport->recv.xid);
-       if (req == NULL) {
-               printk(KERN_WARNING "Callback slot table overflowed\n");
-               xprt_force_disconnect(xprt);
-               return -1;
-       }
-
-       dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
-       xs_tcp_read_common(xprt, desc, req);
-
-       if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
-               xprt_complete_bc_request(req, transport->recv.copied);
-
-       return 0;
-}
-
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-                                       struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-
-       return (transport->recv.flags & TCP_RPC_REPLY) ?
-               xs_tcp_read_reply(xprt, desc) :
-               xs_tcp_read_callback(xprt, desc);
-}
-
 static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
 {
        int ret;
@@ -1429,106 +1496,14 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
 {
        return PAGE_SIZE;
 }
-#else
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-                                       struct xdr_skb_reader *desc)
-{
-       return xs_tcp_read_reply(xprt, desc);
-}
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
-/*
- * Read data off the transport.  This can be either an RPC_CALL or an
- * RPC_REPLY.  Relay the processing to helper functions.
- */
-static void xs_tcp_read_data(struct rpc_xprt *xprt,
-                                   struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-
-       if (_xs_tcp_read_data(xprt, desc) == 0)
-               xs_tcp_check_fraghdr(transport);
-       else {
-               /*
-                * The transport_lock protects the request handling.
-                * There's no need to hold it to update the recv.flags.
-                */
-               transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-       }
-}
-
-static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-       size_t len;
-
-       len = transport->recv.len - transport->recv.offset;
-       if (len > desc->count)
-               len = desc->count;
-       desc->count -= len;
-       desc->offset += len;
-       transport->recv.offset += len;
-       dprintk("RPC:       discarded %zu bytes\n", len);
-       xs_tcp_check_fraghdr(transport);
-}
-
-static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
-{
-       struct rpc_xprt *xprt = rd_desc->arg.data;
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-       struct xdr_skb_reader desc = {
-               .skb    = skb,
-               .offset = offset,
-               .count  = len,
-       };
-       size_t ret;
-
-       dprintk("RPC:       xs_tcp_data_recv started\n");
-       do {
-               trace_xs_tcp_data_recv(transport);
-               /* Read in a new fragment marker if necessary */
-               /* Can we ever really expect to get completely empty fragments? */
-               if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) {
-                       xs_tcp_read_fraghdr(xprt, &desc);
-                       continue;
-               }
-               /* Read in the xid if necessary */
-               if (transport->recv.flags & TCP_RCV_COPY_XID) {
-                       xs_tcp_read_xid(transport, &desc);
-                       continue;
-               }
-               /* Read in the call/reply flag */
-               if (transport->recv.flags & TCP_RCV_READ_CALLDIR) {
-                       xs_tcp_read_calldir(transport, &desc);
-                       continue;
-               }
-               /* Read in the request data */
-               if (transport->recv.flags & TCP_RCV_COPY_DATA) {
-                       xs_tcp_read_data(xprt, &desc);
-                       continue;
-               }
-               /* Skip over any trailing bytes on short reads */
-               xs_tcp_read_discard(transport, &desc);
-       } while (desc.count);
-       ret = len - desc.count;
-       if (ret < rd_desc->count)
-               rd_desc->count -= ret;
-       else
-               rd_desc->count = 0;
-       trace_xs_tcp_data_recv(transport);
-       dprintk("RPC:       xs_tcp_data_recv done\n");
-       return ret;
-}
-
 static void xs_tcp_data_receive(struct sock_xprt *transport)
 {
        struct rpc_xprt *xprt = &transport->xprt;
        struct sock *sk;
-       read_descriptor_t rd_desc = {
-               .arg.data = xprt,
-       };
-       unsigned long total = 0;
-       int read = 0;
+       size_t read = 0;
+       ssize_t ret = 0;
 
 restart:
        mutex_lock(&transport->recv_mutex);
@@ -1536,18 +1511,12 @@ restart:
        if (sk == NULL)
                goto out;
 
-       /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
        for (;;) {
-               rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
-               lock_sock(sk);
-               read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-               if (rd_desc.count != 0 || read < 0) {
-                       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
-                       release_sock(sk);
+               clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+               ret = xs_read_stream(transport, MSG_DONTWAIT | MSG_NOSIGNAL);
+               if (ret < 0)
                        break;
-               }
-               release_sock(sk);
-               total += read;
+               read += ret;
                if (need_resched()) {
                        mutex_unlock(&transport->recv_mutex);
                        cond_resched();
@@ -1558,7 +1527,7 @@ restart:
                queue_work(xprtiod_workqueue, &transport->recv_worker);
 out:
        mutex_unlock(&transport->recv_mutex);
-       trace_xs_tcp_data_ready(xprt, read, total);
+       trace_xs_tcp_data_ready(xprt, ret, read);
 }
 
 static void xs_tcp_data_receive_workfn(struct work_struct *work)
@@ -2380,7 +2349,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
        transport->recv.offset = 0;
        transport->recv.len = 0;
        transport->recv.copied = 0;
-       transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
        transport->xmit.offset = 0;
 
        /* Tell the socket layer to start connecting... */
@@ -2802,6 +2770,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
        .connect                = xs_connect,
        .buf_alloc              = rpc_malloc,
        .buf_free               = rpc_free,
+       .prepare_request        = xs_stream_prepare_request,
        .send_request           = xs_tcp_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
        .close                  = xs_tcp_shutdown,