Merge tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 22 Mar 2016 20:16:21 +0000 (13:16 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 22 Mar 2016 20:16:21 +0000 (13:16 -0700)
Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Features:
   - Add support for multiple NFSv4.1 callbacks in flight
   - Initial patchset for RPC multipath support
   - Adapt RPC/RDMA to use the new completion queue API

  Bugfixes and cleanups:
   - nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
   - Cleanups to remove nfs_inode_dio_wait and nfs4_file_fsync
   - Fix RPC/RDMA credit accounting
   - Properly handle RDMA_ERROR replies
   - xprtrdma: Do not wait if ib_post_send() fails
   - xprtrdma: Segment head and tail XDR buffers on page boundaries
   - xprtrdma cleanups for dprintk, physical_op_map and unused macros"

* tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (35 commits)
  nfs/blocklayout: make sure making a aligned read request
  nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
  nfs: remove nfs_inode_dio_wait
  nfs: remove nfs4_file_fsync
  xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs
  xprtrdma: Use an anonymous union in struct rpcrdma_mw
  xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
  xprtrdma: Serialize credit accounting again
  xprtrdma: Properly handle RDMA_ERROR replies
  rpcrdma: Add RPCRDMA_HDRLEN_ERR
  xprtrdma: Do not wait if ib_post_send() fails
  xprtrdma: Segment head and tail XDR buffers on page boundaries
  xprtrdma: Clean up dprintk format string containing a newline
  xprtrdma: Clean up physical_op_map()
  xprtrdma: Clean up unused RPCRDMA_INLINE_PAD_THRESH macro
  NFS add callback_ops to nfs4_proc_bind_conn_to_session_callback
  pnfs/NFSv4.1: Add multipath capabilities to pNFS flexfiles servers over NFSv3
  SUNRPC: Allow addition of new transports to a struct rpc_clnt
  NFSv4.1: nfs4_proc_bind_conn_to_session must iterate over all connections
  SUNRPC: Make NFS swap work with multipath
  ...

33 files changed:
fs/nfs/blocklayout/blocklayout.c
fs/nfs/callback.h
fs/nfs/callback_proc.c
fs/nfs/callback_xdr.c
fs/nfs/file.c
fs/nfs/flexfilelayout/flexfilelayoutdev.c
fs/nfs/inode.c
fs/nfs/internal.h
fs/nfs/nfs4file.c
fs/nfs/nfs4proc.c
fs/nfs/nfs4session.c
fs/nfs/nfs4session.h
fs/nfs/pnfs_nfs.c
include/linux/sunrpc/clnt.h
include/linux/sunrpc/rpc_rdma.h
include/linux/sunrpc/sched.h
include/linux/sunrpc/xprt.h
include/linux/sunrpc/xprtmultipath.h [new file with mode: 0644]
include/linux/sunrpc/xprtrdma.h
net/sunrpc/Makefile
net/sunrpc/auth_gss/auth_gss.c
net/sunrpc/clnt.c
net/sunrpc/rpcb_clnt.c
net/sunrpc/sched.c
net/sunrpc/xprt.c
net/sunrpc/xprtmultipath.c [new file with mode: 0644]
net/sunrpc/xprtrdma/fmr_ops.c
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/physical_ops.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h
net/sunrpc/xprtsock.c

index ddd0138f410c8af4963575b4765d2a3f069c21de..8bc870e4c4670738747ffb2fd5f24b669f30e95a 100644 (file)
@@ -743,7 +743,7 @@ bl_set_layoutdriver(struct nfs_server *server, const struct nfs_fh *fh)
 
 static bool
 is_aligned_req(struct nfs_pageio_descriptor *pgio,
-               struct nfs_page *req, unsigned int alignment)
+               struct nfs_page *req, unsigned int alignment, bool is_write)
 {
        /*
         * Always accept buffered writes, higher layers take care of the
@@ -758,7 +758,8 @@ is_aligned_req(struct nfs_pageio_descriptor *pgio,
        if (IS_ALIGNED(req->wb_bytes, alignment))
                return true;
 
-       if (req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode)) {
+       if (is_write &&
+           (req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode))) {
                /*
                 * If the write goes up to the inode size, just write
                 * the full page.  Data past the inode size is
@@ -775,7 +776,7 @@ is_aligned_req(struct nfs_pageio_descriptor *pgio,
 static void
 bl_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 {
-       if (!is_aligned_req(pgio, req, SECTOR_SIZE)) {
+       if (!is_aligned_req(pgio, req, SECTOR_SIZE, false)) {
                nfs_pageio_reset_read_mds(pgio);
                return;
        }
@@ -791,7 +792,7 @@ static size_t
 bl_pg_test_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
                struct nfs_page *req)
 {
-       if (!is_aligned_req(pgio, req, SECTOR_SIZE))
+       if (!is_aligned_req(pgio, req, SECTOR_SIZE, false))
                return 0;
        return pnfs_generic_pg_test(pgio, prev, req);
 }
@@ -824,7 +825,7 @@ bl_pg_init_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 {
        u64 wb_size;
 
-       if (!is_aligned_req(pgio, req, PAGE_SIZE)) {
+       if (!is_aligned_req(pgio, req, PAGE_SIZE, true)) {
                nfs_pageio_reset_write_mds(pgio);
                return;
        }
@@ -846,7 +847,7 @@ static size_t
 bl_pg_test_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
                 struct nfs_page *req)
 {
-       if (!is_aligned_req(pgio, req, PAGE_SIZE))
+       if (!is_aligned_req(pgio, req, PAGE_SIZE, true))
                return 0;
        return pnfs_generic_pg_test(pgio, prev, req);
 }
index ff8195bd75ea11ae996e504dd546fa59776aa4d7..5fe1cecbf9f033ce0603639e771c1d82520599b0 100644 (file)
@@ -37,10 +37,11 @@ enum nfs4_callback_opnum {
        OP_CB_ILLEGAL = 10044,
 };
 
+struct nfs4_slot;
 struct cb_process_state {
        __be32                  drc_status;
        struct nfs_client       *clp;
-       u32                     slotid;
+       struct nfs4_slot        *slot;
        u32                     minorversion;
        struct net              *net;
 };
index f0939d097406a21b3f77d3ad2b8eec96807fa39c..618ced381a1405ff7c31f704409781c7ef1e00f6 100644 (file)
@@ -354,47 +354,38 @@ out:
  * a single outstanding callback request at a time.
  */
 static __be32
-validate_seqid(struct nfs4_slot_table *tbl, struct cb_sequenceargs * args)
+validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot,
+               const struct cb_sequenceargs * args)
 {
-       struct nfs4_slot *slot;
-
-       dprintk("%s enter. slotid %u seqid %u\n",
-               __func__, args->csa_slotid, args->csa_sequenceid);
+       dprintk("%s enter. slotid %u seqid %u, slot table seqid: %u\n",
+               __func__, args->csa_slotid, args->csa_sequenceid, slot->seq_nr);
 
-       if (args->csa_slotid >= NFS41_BC_MAX_CALLBACKS)
+       if (args->csa_slotid > tbl->server_highest_slotid)
                return htonl(NFS4ERR_BADSLOT);
 
-       slot = tbl->slots + args->csa_slotid;
-       dprintk("%s slot table seqid: %u\n", __func__, slot->seq_nr);
-
-       /* Normal */
-       if (likely(args->csa_sequenceid == slot->seq_nr + 1))
-               goto out_ok;
-
        /* Replay */
        if (args->csa_sequenceid == slot->seq_nr) {
                dprintk("%s seqid %u is a replay\n",
                        __func__, args->csa_sequenceid);
+               if (nfs4_test_locked_slot(tbl, slot->slot_nr))
+                       return htonl(NFS4ERR_DELAY);
                /* Signal process_op to set this error on next op */
                if (args->csa_cachethis == 0)
                        return htonl(NFS4ERR_RETRY_UNCACHED_REP);
 
-               /* The ca_maxresponsesize_cached is 0 with no DRC */
-               else if (args->csa_cachethis == 1)
-                       return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
+               /* Liar! We never allowed you to set csa_cachethis != 0 */
+               return htonl(NFS4ERR_SEQ_FALSE_RETRY);
        }
 
        /* Wraparound */
-       if (args->csa_sequenceid == 1 && (slot->seq_nr + 1) == 0) {
-               slot->seq_nr = 1;
-               goto out_ok;
-       }
+       if (unlikely(slot->seq_nr == 0xFFFFFFFFU)) {
+               if (args->csa_sequenceid == 1)
+                       return htonl(NFS4_OK);
+       } else if (likely(args->csa_sequenceid == slot->seq_nr + 1))
+               return htonl(NFS4_OK);
 
        /* Misordered request */
        return htonl(NFS4ERR_SEQ_MISORDERED);
-out_ok:
-       tbl->highest_used_slotid = args->csa_slotid;
-       return htonl(NFS4_OK);
 }
 
 /*
@@ -473,6 +464,12 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
        tbl = &clp->cl_session->bc_slot_table;
        slot = tbl->slots + args->csa_slotid;
 
+       /* Set up res before grabbing the spinlock */
+       memcpy(&res->csr_sessionid, &args->csa_sessionid,
+              sizeof(res->csr_sessionid));
+       res->csr_sequenceid = args->csa_sequenceid;
+       res->csr_slotid = args->csa_slotid;
+
        spin_lock(&tbl->slot_tbl_lock);
        /* state manager is resetting the session */
        if (test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state)) {
@@ -485,18 +482,26 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
                goto out_unlock;
        }
 
-       memcpy(&res->csr_sessionid, &args->csa_sessionid,
-              sizeof(res->csr_sessionid));
-       res->csr_sequenceid = args->csa_sequenceid;
-       res->csr_slotid = args->csa_slotid;
-       res->csr_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
-       res->csr_target_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
+       status = htonl(NFS4ERR_BADSLOT);
+       slot = nfs4_lookup_slot(tbl, args->csa_slotid);
+       if (IS_ERR(slot))
+               goto out_unlock;
+
+       res->csr_highestslotid = tbl->server_highest_slotid;
+       res->csr_target_highestslotid = tbl->target_highest_slotid;
 
-       status = validate_seqid(tbl, args);
+       status = validate_seqid(tbl, slot, args);
        if (status)
                goto out_unlock;
+       if (!nfs4_try_to_lock_slot(tbl, slot)) {
+               status = htonl(NFS4ERR_DELAY);
+               goto out_unlock;
+       }
+       cps->slot = slot;
 
-       cps->slotid = args->csa_slotid;
+       /* The ca_maxresponsesize_cached is 0 with no DRC */
+       if (args->csa_cachethis != 0)
+               return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
 
        /*
         * Check for pending referring calls.  If a match is found, a
@@ -513,7 +518,7 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
         * If CB_SEQUENCE returns an error, then the state of the slot
         * (sequence ID, cached reply) MUST NOT change.
         */
-       slot->seq_nr++;
+       slot->seq_nr = args->csa_sequenceid;
 out_unlock:
        spin_unlock(&tbl->slot_tbl_lock);
 
index 646cdac73488e96041f2bcad33b4220b096da684..976c90608e5618103d385a3436cafe461873a867 100644 (file)
@@ -752,7 +752,8 @@ preprocess_nfs41_op(int nop, unsigned int op_nr, struct callback_op **op)
        return htonl(NFS_OK);
 }
 
-static void nfs4_callback_free_slot(struct nfs4_session *session)
+static void nfs4_callback_free_slot(struct nfs4_session *session,
+               struct nfs4_slot *slot)
 {
        struct nfs4_slot_table *tbl = &session->bc_slot_table;
 
@@ -761,15 +762,17 @@ static void nfs4_callback_free_slot(struct nfs4_session *session)
         * Let the state manager know callback processing done.
         * A single slot, so highest used slotid is either 0 or -1
         */
-       tbl->highest_used_slotid = NFS4_NO_SLOT;
+       nfs4_free_slot(tbl, slot);
        nfs4_slot_tbl_drain_complete(tbl);
        spin_unlock(&tbl->slot_tbl_lock);
 }
 
 static void nfs4_cb_free_slot(struct cb_process_state *cps)
 {
-       if (cps->slotid != NFS4_NO_SLOT)
-               nfs4_callback_free_slot(cps->clp->cl_session);
+       if (cps->slot) {
+               nfs4_callback_free_slot(cps->clp->cl_session, cps->slot);
+               cps->slot = NULL;
+       }
 }
 
 #else /* CONFIG_NFS_V4_1 */
@@ -893,7 +896,6 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp, void *argp, void *r
        struct cb_process_state cps = {
                .drc_status = 0,
                .clp = NULL,
-               .slotid = NFS4_NO_SLOT,
                .net = SVC_NET(rqstp),
        };
        unsigned int nops = 0;
index 748bb813b8ecd63095f2f50b8e733fe960e188ec..89bf093d342a583d0d3c19738a2baebf046c68e2 100644 (file)
@@ -233,7 +233,7 @@ EXPORT_SYMBOL_GPL(nfs_file_mmap);
  * nfs_file_write() that a write error occurred, and hence cause it to
  * fall back to doing a synchronous write.
  */
-int
+static int
 nfs_file_fsync_commit(struct file *file, loff_t start, loff_t end, int datasync)
 {
        struct nfs_open_context *ctx = nfs_file_open_context(file);
@@ -263,9 +263,8 @@ nfs_file_fsync_commit(struct file *file, loff_t start, loff_t end, int datasync)
 out:
        return ret;
 }
-EXPORT_SYMBOL_GPL(nfs_file_fsync_commit);
 
-static int
+int
 nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
        int ret;
@@ -273,13 +272,15 @@ nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 
        trace_nfs_fsync_enter(inode);
 
-       nfs_inode_dio_wait(inode);
+       inode_dio_wait(inode);
        do {
                ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
                if (ret != 0)
                        break;
                inode_lock(inode);
                ret = nfs_file_fsync_commit(file, start, end, datasync);
+               if (!ret)
+                       ret = pnfs_sync_inode(inode, !!datasync);
                inode_unlock(inode);
                /*
                 * If nfs_file_fsync_commit detected a server reboot, then
@@ -293,6 +294,7 @@ nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
        trace_nfs_fsync_exit(inode, ret);
        return ret;
 }
+EXPORT_SYMBOL_GPL(nfs_file_fsync);
 
 /*
  * Decide whether a read/modify/write cycle may be more efficient
@@ -368,7 +370,7 @@ start:
        /*
         * Wait for O_DIRECT to complete
         */
-       nfs_inode_dio_wait(mapping->host);
+       inode_dio_wait(mapping->host);
 
        page = grab_cache_page_write_begin(mapping, index, flags);
        if (!page)
index eb370460ce203c11c8461e88115cc0fab9c417fa..add0e5a70bd60f70479452ddc8a0ccf853c2c87f 100644 (file)
@@ -418,6 +418,8 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
                                pnfs_error_mark_layout_for_return(ino, lseg);
                } else
                        pnfs_error_mark_layout_for_return(ino, lseg);
+               ds = NULL;
+               goto out;
        }
 out_update_creds:
        if (ff_layout_update_mirror_cred(mirror, ds))
index 86faecf8f328f2639bab1c5f7391df6d659610f2..33d18c4119057bb874604398337a990e94b5f9c4 100644 (file)
@@ -141,7 +141,7 @@ void nfs_evict_inode(struct inode *inode)
 
 int nfs_sync_inode(struct inode *inode)
 {
-       nfs_inode_dio_wait(inode);
+       inode_dio_wait(inode);
        return nfs_wb_all(inode);
 }
 EXPORT_SYMBOL_GPL(nfs_sync_inode);
index 9a547aa3ec8e873160da5a36906eb3813650034d..565f8135ae1fdeb3bdce2c5797451d652c501b4b 100644 (file)
@@ -358,7 +358,7 @@ int nfs_mknod(struct inode *, struct dentry *, umode_t, dev_t);
 int nfs_rename(struct inode *, struct dentry *, struct inode *, struct dentry *);
 
 /* file.c */
-int nfs_file_fsync_commit(struct file *, loff_t, loff_t, int);
+int nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync);
 loff_t nfs_file_llseek(struct file *, loff_t, int);
 ssize_t nfs_file_read(struct kiocb *, struct iov_iter *);
 ssize_t nfs_file_splice_read(struct file *, loff_t *, struct pipe_inode_info *,
@@ -515,10 +515,6 @@ extern int nfs_sillyrename(struct inode *dir, struct dentry *dentry);
 /* direct.c */
 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
                              struct nfs_direct_req *dreq);
-static inline void nfs_inode_dio_wait(struct inode *inode)
-{
-       inode_dio_wait(inode);
-}
 extern ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq);
 
 /* nfs4proc.c */
index 57ca1c8039c1e00bfcb6e9bb4d8fe7e40f4578d5..22c35abbee9d6c88244b220054d2256c30d050ec 100644 (file)
@@ -128,37 +128,6 @@ nfs4_file_flush(struct file *file, fl_owner_t id)
        return vfs_fsync(file, 0);
 }
 
-static int
-nfs4_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
-{
-       int ret;
-       struct inode *inode = file_inode(file);
-
-       trace_nfs_fsync_enter(inode);
-
-       nfs_inode_dio_wait(inode);
-       do {
-               ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
-               if (ret != 0)
-                       break;
-               inode_lock(inode);
-               ret = nfs_file_fsync_commit(file, start, end, datasync);
-               if (!ret)
-                       ret = pnfs_sync_inode(inode, !!datasync);
-               inode_unlock(inode);
-               /*
-                * If nfs_file_fsync_commit detected a server reboot, then
-                * resend all dirty pages that might have been covered by
-                * the NFS_CONTEXT_RESEND_WRITES flag
-                */
-               start = 0;
-               end = LLONG_MAX;
-       } while (ret == -EAGAIN);
-
-       trace_nfs_fsync_exit(inode, ret);
-       return ret;
-}
-
 #ifdef CONFIG_NFS_V4_2
 static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence)
 {
@@ -266,7 +235,7 @@ const struct file_operations nfs4_file_operations = {
        .open           = nfs4_file_open,
        .flush          = nfs4_file_flush,
        .release        = nfs_file_release,
-       .fsync          = nfs4_file_fsync,
+       .fsync          = nfs_file_fsync,
        .lock           = nfs_lock,
        .flock          = nfs_flock,
        .splice_read    = nfs_file_splice_read,
index 400a70b3be7b29e366c6a0737c39369fd1dc84cc..327b8c34d3606e5d234f006fd26a3293f12b1d36 100644 (file)
@@ -6783,13 +6783,26 @@ nfs41_same_server_scope(struct nfs41_server_scope *a,
        return false;
 }
 
+static void
+nfs4_bind_one_conn_to_session_done(struct rpc_task *task, void *calldata)
+{
+}
+
+static const struct rpc_call_ops nfs4_bind_one_conn_to_session_ops = {
+       .rpc_call_done =  &nfs4_bind_one_conn_to_session_done,
+};
+
 /*
- * nfs4_proc_bind_conn_to_session()
+ * nfs4_proc_bind_one_conn_to_session()
  *
  * The 4.1 client currently uses the same TCP connection for the
  * fore and backchannel.
  */
-int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
+static
+int nfs4_proc_bind_one_conn_to_session(struct rpc_clnt *clnt,
+               struct rpc_xprt *xprt,
+               struct nfs_client *clp,
+               struct rpc_cred *cred)
 {
        int status;
        struct nfs41_bind_conn_to_session_args args = {
@@ -6804,6 +6817,14 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
                .rpc_resp = &res,
                .rpc_cred = cred,
        };
+       struct rpc_task_setup task_setup_data = {
+               .rpc_client = clnt,
+               .rpc_xprt = xprt,
+               .callback_ops = &nfs4_bind_one_conn_to_session_ops,
+               .rpc_message = &msg,
+               .flags = RPC_TASK_TIMEOUT,
+       };
+       struct rpc_task *task;
 
        dprintk("--> %s\n", __func__);
 
@@ -6811,7 +6832,16 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
        if (!(clp->cl_session->flags & SESSION4_BACK_CHAN))
                args.dir = NFS4_CDFC4_FORE;
 
-       status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
+       /* Do not set the backchannel flag unless this is clnt->cl_xprt */
+       if (xprt != rcu_access_pointer(clnt->cl_xprt))
+               args.dir = NFS4_CDFC4_FORE;
+
+       task = rpc_run_task(&task_setup_data);
+       if (!IS_ERR(task)) {
+               status = task->tk_status;
+               rpc_put_task(task);
+       } else
+               status = PTR_ERR(task);
        trace_nfs4_bind_conn_to_session(clp, status);
        if (status == 0) {
                if (memcmp(res.sessionid.data,
@@ -6838,6 +6868,31 @@ out:
        return status;
 }
 
+struct rpc_bind_conn_calldata {
+       struct nfs_client *clp;
+       struct rpc_cred *cred;
+};
+
+static int
+nfs4_proc_bind_conn_to_session_callback(struct rpc_clnt *clnt,
+               struct rpc_xprt *xprt,
+               void *calldata)
+{
+       struct rpc_bind_conn_calldata *p = calldata;
+
+       return nfs4_proc_bind_one_conn_to_session(clnt, xprt, p->clp, p->cred);
+}
+
+int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
+{
+       struct rpc_bind_conn_calldata data = {
+               .clp = clp,
+               .cred = cred,
+       };
+       return rpc_clnt_iterate_for_each_xprt(clp->cl_rpcclient,
+                       nfs4_proc_bind_conn_to_session_callback, &data);
+}
+
 /*
  * Minimum set of SP4_MACH_CRED operations from RFC 5661 in the enforce map
  * and operations we'd like to see to enable certain features in the allow map
@@ -7320,7 +7375,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args)
        args->bc_attrs.max_resp_sz = PAGE_SIZE;
        args->bc_attrs.max_resp_sz_cached = 0;
        args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
-       args->bc_attrs.max_reqs = 1;
+       args->bc_attrs.max_reqs = NFS41_BC_MAX_CALLBACKS;
 
        dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
                "max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
index e23366effcfb1e43bcb81983bcbaeacf2e512002..332d06e64fa910fcfa54ca3fc304e9869f05bc6f 100644 (file)
@@ -135,6 +135,43 @@ static struct nfs4_slot *nfs4_find_or_create_slot(struct nfs4_slot_table  *tbl,
        return ERR_PTR(-ENOMEM);
 }
 
+static void nfs4_lock_slot(struct nfs4_slot_table *tbl,
+               struct nfs4_slot *slot)
+{
+       u32 slotid = slot->slot_nr;
+
+       __set_bit(slotid, tbl->used_slots);
+       if (slotid > tbl->highest_used_slotid ||
+           tbl->highest_used_slotid == NFS4_NO_SLOT)
+               tbl->highest_used_slotid = slotid;
+       slot->generation = tbl->generation;
+}
+
+/*
+ * nfs4_try_to_lock_slot - Given a slot try to allocate it
+ *
+ * Note: must be called with the slot_tbl_lock held.
+ */
+bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot)
+{
+       if (nfs4_test_locked_slot(tbl, slot->slot_nr))
+               return false;
+       nfs4_lock_slot(tbl, slot);
+       return true;
+}
+
+/*
+ * nfs4_lookup_slot - Find a slot but don't allocate it
+ *
+ * Note: must be called with the slot_tbl_lock held.
+ */
+struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid)
+{
+       if (slotid <= tbl->max_slotid)
+               return nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
+       return ERR_PTR(-E2BIG);
+}
+
 /*
  * nfs4_alloc_slot - efficiently look for a free slot
  *
@@ -153,18 +190,11 @@ struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl)
                __func__, tbl->used_slots[0], tbl->highest_used_slotid,
                tbl->max_slotid + 1);
        slotid = find_first_zero_bit(tbl->used_slots, tbl->max_slotid + 1);
-       if (slotid > tbl->max_slotid)
-               goto out;
-       ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
-       if (IS_ERR(ret))
-               goto out;
-       __set_bit(slotid, tbl->used_slots);
-       if (slotid > tbl->highest_used_slotid ||
-                       tbl->highest_used_slotid == NFS4_NO_SLOT)
-               tbl->highest_used_slotid = slotid;
-       ret->generation = tbl->generation;
-
-out:
+       if (slotid <= tbl->max_slotid) {
+               ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
+               if (!IS_ERR(ret))
+                       nfs4_lock_slot(tbl, ret);
+       }
        dprintk("<-- %s used_slots=%04lx highest_used=%u slotid=%u\n",
                __func__, tbl->used_slots[0], tbl->highest_used_slotid,
                !IS_ERR(ret) ? ret->slot_nr : NFS4_NO_SLOT);
index e3ea2c5324d68e92591058e896bf478538302a5a..5b51298d1d03765684dd9ea79599169c01c71aaa 100644 (file)
@@ -77,6 +77,8 @@ extern int nfs4_setup_slot_table(struct nfs4_slot_table *tbl,
                unsigned int max_reqs, const char *queue);
 extern void nfs4_shutdown_slot_table(struct nfs4_slot_table *tbl);
 extern struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl);
+extern struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid);
+extern bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
 extern void nfs4_free_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
 extern void nfs4_slot_tbl_drain_complete(struct nfs4_slot_table *tbl);
 bool nfs41_wake_and_assign_slot(struct nfs4_slot_table *tbl,
@@ -88,6 +90,12 @@ static inline bool nfs4_slot_tbl_draining(struct nfs4_slot_table *tbl)
        return !!test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state);
 }
 
+static inline bool nfs4_test_locked_slot(const struct nfs4_slot_table *tbl,
+               u32 slotid)
+{
+       return !!test_bit(slotid, tbl->used_slots);
+}
+
 #if defined(CONFIG_NFS_V4_1)
 extern void nfs41_set_target_slotid(struct nfs4_slot_table *tbl,
                u32 target_highest_slotid);
index 81ac6480f9e77d66d45ba966b31cb440f17f246a..4aaed890048fd34d1fdda141a6479e4fec7ba59b 100644 (file)
@@ -606,12 +606,22 @@ static int _nfs4_pnfs_v3_ds_connect(struct nfs_server *mds_srv,
                dprintk("%s: DS %s: trying address %s\n",
                        __func__, ds->ds_remotestr, da->da_remotestr);
 
-               clp = get_v3_ds_connect(mds_srv->nfs_client,
+               if (!IS_ERR(clp)) {
+                       struct xprt_create xprt_args = {
+                               .ident = XPRT_TRANSPORT_TCP,
+                               .net = clp->cl_net,
+                               .dstaddr = (struct sockaddr *)&da->da_addr,
+                               .addrlen = da->da_addrlen,
+                               .servername = clp->cl_hostname,
+                       };
+                       /* Add this address as an alias */
+                       rpc_clnt_add_xprt(clp->cl_rpcclient, &xprt_args,
+                                       rpc_clnt_test_and_add_xprt, NULL);
+               } else
+                       clp = get_v3_ds_connect(mds_srv->nfs_client,
                                        (struct sockaddr *)&da->da_addr,
                                        da->da_addrlen, IPPROTO_TCP,
                                        timeo, retrans, au_flavor);
-               if (!IS_ERR(clp))
-                       break;
        }
 
        if (IS_ERR(clp)) {
index 131032f15cc187e0c7cbd0df80a08f4c44220800..9a7ddbaf116e56036ad74997773ffe178f2e6c18 100644 (file)
@@ -25,6 +25,7 @@
 #include <asm/signal.h>
 #include <linux/path.h>
 #include <net/ipv6.h>
+#include <linux/sunrpc/xprtmultipath.h>
 
 struct rpc_inode;
 
@@ -67,6 +68,7 @@ struct rpc_clnt {
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
        struct dentry           *cl_debugfs;    /* debugfs directory */
 #endif
+       struct rpc_xprt_iter    cl_xpi;
 };
 
 /*
@@ -139,7 +141,6 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
                                        struct rpc_xprt *xprt);
 struct rpc_clnt        *rpc_bind_new_program(struct rpc_clnt *,
                                const struct rpc_program *, u32);
-void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt);
 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *);
 struct rpc_clnt *rpc_clone_client_set_auth(struct rpc_clnt *,
                                rpc_authflavor_t);
@@ -181,6 +182,21 @@ size_t             rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
 const char     *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
 int            rpc_localaddr(struct rpc_clnt *, struct sockaddr *, size_t);
 
+int            rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
+                       int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
+                       void *data);
+
+int            rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
+                       struct rpc_xprt_switch *xps,
+                       struct rpc_xprt *xprt,
+                       void *dummy);
+int            rpc_clnt_add_xprt(struct rpc_clnt *, struct xprt_create *,
+                       int (*setup)(struct rpc_clnt *,
+                               struct rpc_xprt_switch *,
+                               struct rpc_xprt *,
+                               void *),
+                       void *data);
+
 const char *rpc_proc_name(const struct rpc_task *task);
 #endif /* __KERNEL__ */
 #endif /* _LINUX_SUNRPC_CLNT_H */
index f33c5a4d6fe47fddb2ae57e4eac448df6ff810c8..3b1ff38f0c37aac2aff8a536b8a49da4b8433207 100644 (file)
@@ -93,6 +93,12 @@ struct rpcrdma_msg {
                        __be32 rm_pempty[3];    /* 3 empty chunk lists */
                } rm_padded;
 
+               struct {
+                       __be32 rm_err;
+                       __be32 rm_vers_low;
+                       __be32 rm_vers_high;
+               } rm_error;
+
                __be32 rm_chunks[0];    /* read, write and reply chunks */
 
        } rm_body;
@@ -102,17 +108,13 @@ struct rpcrdma_msg {
  * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
  */
 #define RPCRDMA_HDRLEN_MIN     (sizeof(__be32) * 7)
+#define RPCRDMA_HDRLEN_ERR     (sizeof(__be32) * 5)
 
 enum rpcrdma_errcode {
        ERR_VERS = 1,
        ERR_CHUNK = 2
 };
 
-struct rpcrdma_err_vers {
-       uint32_t rdma_vers_low; /* Version range supported by peer */
-       uint32_t rdma_vers_high;
-};
-
 enum rpcrdma_proc {
        RDMA_MSG = 0,           /* An RPC call or reply msg */
        RDMA_NOMSG = 1,         /* An RPC call or reply msg - separate body */
index d703f0ef37d8f87436310247c19ca37f60ea692b..05a1809c44d99e59813576a1e2a6daf242e2a4be 100644 (file)
@@ -42,40 +42,43 @@ struct rpc_wait {
  */
 struct rpc_task {
        atomic_t                tk_count;       /* Reference count */
+       int                     tk_status;      /* result of last operation */
        struct list_head        tk_task;        /* global list of tasks */
-       struct rpc_clnt *       tk_client;      /* RPC client */
-       struct rpc_rqst *       tk_rqstp;       /* RPC request */
-
-       /*
-        * RPC call state
-        */
-       struct rpc_message      tk_msg;         /* RPC call info */
 
        /*
         * callback     to be executed after waking up
         * action       next procedure for async tasks
-        * tk_ops       caller callbacks
         */
        void                    (*tk_callback)(struct rpc_task *);
        void                    (*tk_action)(struct rpc_task *);
-       const struct rpc_call_ops *tk_ops;
-       void *                  tk_calldata;
 
        unsigned long           tk_timeout;     /* timeout for rpc_sleep() */
        unsigned long           tk_runstate;    /* Task run status */
-       struct workqueue_struct *tk_workqueue;  /* Normally rpciod, but could
-                                                * be any workqueue
-                                                */
+
        struct rpc_wait_queue   *tk_waitqueue;  /* RPC wait queue we're on */
        union {
                struct work_struct      tk_work;        /* Async task work queue */
                struct rpc_wait         tk_wait;        /* RPC wait */
        } u;
 
+       /*
+        * RPC call state
+        */
+       struct rpc_message      tk_msg;         /* RPC call info */
+       void *                  tk_calldata;    /* Caller private data */
+       const struct rpc_call_ops *tk_ops;      /* Caller callbacks */
+
+       struct rpc_clnt *       tk_client;      /* RPC client */
+       struct rpc_xprt *       tk_xprt;        /* Transport */
+
+       struct rpc_rqst *       tk_rqstp;       /* RPC request */
+
+       struct workqueue_struct *tk_workqueue;  /* Normally rpciod, but could
+                                                * be any workqueue
+                                                */
        ktime_t                 tk_start;       /* RPC task init timestamp */
 
        pid_t                   tk_owner;       /* Process id for batching tasks */
-       int                     tk_status;      /* result of last operation */
        unsigned short          tk_flags;       /* misc flags */
        unsigned short          tk_timeouts;    /* maj timeouts */
 
@@ -100,6 +103,7 @@ struct rpc_call_ops {
 struct rpc_task_setup {
        struct rpc_task *task;
        struct rpc_clnt *rpc_client;
+       struct rpc_xprt *rpc_xprt;
        const struct rpc_message *rpc_message;
        const struct rpc_call_ops *callback_ops;
        void *callback_data;
index 69ef5b3ab038334ea51d2b9dfbe38144f33e4a32..fb0d212e0d3af2dd3ed78d4ec181e63f8e72dac1 100644 (file)
@@ -13,6 +13,7 @@
 #include <linux/socket.h>
 #include <linux/in.h>
 #include <linux/ktime.h>
+#include <linux/kref.h>
 #include <linux/sunrpc/sched.h>
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/msg_prot.h>
@@ -166,7 +167,7 @@ enum xprt_transports {
 };
 
 struct rpc_xprt {
-       atomic_t                count;          /* Reference count */
+       struct kref             kref;           /* Reference count */
        struct rpc_xprt_ops *   ops;            /* transport methods */
 
        const struct rpc_timeout *timeout;      /* timeout parms */
@@ -196,6 +197,11 @@ struct rpc_xprt {
                                                   transport */
        unsigned int            bind_index;     /* bind function index */
 
+       /*
+        * Multipath
+        */
+       struct list_head        xprt_switch;
+
        /*
         * Connection of transports
         */
@@ -256,6 +262,7 @@ struct rpc_xprt {
        struct dentry           *debugfs;               /* debugfs directory */
        atomic_t                inject_disconnect;
 #endif
+       struct rcu_head         rcu;
 };
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
@@ -318,24 +325,13 @@ int                       xprt_adjust_timeout(struct rpc_rqst *req);
 void                   xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
 void                   xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
 void                   xprt_release(struct rpc_task *task);
+struct rpc_xprt *      xprt_get(struct rpc_xprt *xprt);
 void                   xprt_put(struct rpc_xprt *xprt);
 struct rpc_xprt *      xprt_alloc(struct net *net, size_t size,
                                unsigned int num_prealloc,
                                unsigned int max_req);
 void                   xprt_free(struct rpc_xprt *);
 
-/**
- * xprt_get - return a reference to an RPC transport.
- * @xprt: pointer to the transport
- *
- */
-static inline struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
-{
-       if (atomic_inc_not_zero(&xprt->count))
-               return xprt;
-       return NULL;
-}
-
 static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
 {
        return p + xprt->tsh_size;
diff --git a/include/linux/sunrpc/xprtmultipath.h b/include/linux/sunrpc/xprtmultipath.h
new file mode 100644 (file)
index 0000000..5a9acff
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * RPC client multipathing definitions
+ *
+ * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
+ *
+ * Trond Myklebust <trond.myklebust@primarydata.com>
+ */
+#ifndef _NET_SUNRPC_XPRTMULTIPATH_H
+#define _NET_SUNRPC_XPRTMULTIPATH_H
+
+struct rpc_xprt_iter_ops;
+struct rpc_xprt_switch {
+       spinlock_t              xps_lock;
+       struct kref             xps_kref;
+
+       unsigned int            xps_nxprts;
+       struct list_head        xps_xprt_list;
+
+       struct net *            xps_net;
+
+       const struct rpc_xprt_iter_ops *xps_iter_ops;
+
+       struct rcu_head         xps_rcu;
+};
+
+struct rpc_xprt_iter {
+       struct rpc_xprt_switch __rcu *xpi_xpswitch;
+       struct rpc_xprt *       xpi_cursor;
+
+       const struct rpc_xprt_iter_ops *xpi_ops;
+};
+
+
+struct rpc_xprt_iter_ops {
+       void (*xpi_rewind)(struct rpc_xprt_iter *);
+       struct rpc_xprt *(*xpi_xprt)(struct rpc_xprt_iter *);
+       struct rpc_xprt *(*xpi_next)(struct rpc_xprt_iter *);
+};
+
+extern struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
+               gfp_t gfp_flags);
+
+extern struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps);
+extern void xprt_switch_put(struct rpc_xprt_switch *xps);
+
+extern void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps);
+
+extern void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
+               struct rpc_xprt *xprt);
+extern void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
+               struct rpc_xprt *xprt);
+
+extern void xprt_iter_init(struct rpc_xprt_iter *xpi,
+               struct rpc_xprt_switch *xps);
+
+extern void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
+               struct rpc_xprt_switch *xps);
+
+extern void xprt_iter_destroy(struct rpc_xprt_iter *xpi);
+
+extern struct rpc_xprt_switch *xprt_iter_xchg_switch(
+               struct rpc_xprt_iter *xpi,
+               struct rpc_xprt_switch *newswitch);
+
+extern struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi);
+extern struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi);
+extern struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi);
+
+#endif
index b7b279b545049c174bf10a46e16b013a04efa9dd..767190b013638f2aa0c5e70a3f338adbc6db10a8 100644 (file)
@@ -54,8 +54,6 @@
 
 #define RPCRDMA_DEF_INLINE  (1024)     /* default inline max */
 
-#define RPCRDMA_INLINE_PAD_THRESH  (512)/* payload threshold to pad (bytes) */
-
 /* Memory registration strategies, by number.
  * This is part of a kernel / user space API. Do not remove. */
 enum rpcrdma_memreg {
index b512fbd9d79a403ee980d40c1a21fc3fde47f215..ea7ffa12e0f9ed66023ba397c9e4d365a1bb1d56 100644 (file)
@@ -12,7 +12,8 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
            svc.o svcsock.o svcauth.o svcauth_unix.o \
            addr.o rpcb_clnt.o timer.o xdr.o \
            sunrpc_syms.o cache.o rpc_pipe.o \
-           svc_xprt.o
+           svc_xprt.o \
+           xprtmultipath.o
 sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o
 sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
index cabf586f47d7d0d75a8cb77b44d497e76a34abd7..8c6bc795f0602a9991fc2b0782b965d96d6b5bf9 100644 (file)
@@ -1181,12 +1181,12 @@ static struct rpc_auth *
 gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
        struct gss_auth *gss_auth;
-       struct rpc_xprt *xprt = rcu_access_pointer(clnt->cl_xprt);
+       struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
 
        while (clnt != clnt->cl_parent) {
                struct rpc_clnt *parent = clnt->cl_parent;
                /* Find the original parent for this transport */
-               if (rcu_access_pointer(parent->cl_xprt) != xprt)
+               if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
                        break;
                clnt = parent;
        }
index b7f21044f4d8cb4ab2012566fa80c6b412fa3d2b..7e0c9bf22df811475385496007cfac321e0422a5 100644 (file)
@@ -354,6 +354,7 @@ static void rpc_free_clid(struct rpc_clnt *clnt)
 }
 
 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
+               struct rpc_xprt_switch *xps,
                struct rpc_xprt *xprt,
                struct rpc_clnt *parent)
 {
@@ -411,6 +412,8 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
        }
 
        rpc_clnt_set_transport(clnt, xprt, timeout);
+       xprt_iter_init(&clnt->cl_xpi, xps);
+       xprt_switch_put(xps);
 
        clnt->cl_rtt = &clnt->cl_rtt_default;
        rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
@@ -438,6 +441,7 @@ out_no_clid:
 out_err:
        rpciod_down();
 out_no_rpciod:
+       xprt_switch_put(xps);
        xprt_put(xprt);
        return ERR_PTR(err);
 }
@@ -446,8 +450,13 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
                                        struct rpc_xprt *xprt)
 {
        struct rpc_clnt *clnt = NULL;
+       struct rpc_xprt_switch *xps;
 
-       clnt = rpc_new_client(args, xprt, NULL);
+       xps = xprt_switch_alloc(xprt, GFP_KERNEL);
+       if (xps == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       clnt = rpc_new_client(args, xps, xprt, NULL);
        if (IS_ERR(clnt))
                return clnt;
 
@@ -564,6 +573,7 @@ EXPORT_SYMBOL_GPL(rpc_create);
 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
                                           struct rpc_clnt *clnt)
 {
+       struct rpc_xprt_switch *xps;
        struct rpc_xprt *xprt;
        struct rpc_clnt *new;
        int err;
@@ -571,13 +581,17 @@ static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
        err = -ENOMEM;
        rcu_read_lock();
        xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+       xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
        rcu_read_unlock();
-       if (xprt == NULL)
+       if (xprt == NULL || xps == NULL) {
+               xprt_put(xprt);
+               xprt_switch_put(xps);
                goto out_err;
+       }
        args->servername = xprt->servername;
        args->nodename = clnt->cl_nodename;
 
-       new = rpc_new_client(args, xprt, clnt);
+       new = rpc_new_client(args, xps, xprt, clnt);
        if (IS_ERR(new)) {
                err = PTR_ERR(new);
                goto out_err;
@@ -657,6 +671,7 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
 {
        const struct rpc_timeout *old_timeo;
        rpc_authflavor_t pseudoflavor;
+       struct rpc_xprt_switch *xps, *oldxps;
        struct rpc_xprt *xprt, *old;
        struct rpc_clnt *parent;
        int err;
@@ -668,10 +683,17 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
                return PTR_ERR(xprt);
        }
 
+       xps = xprt_switch_alloc(xprt, GFP_KERNEL);
+       if (xps == NULL) {
+               xprt_put(xprt);
+               return -ENOMEM;
+       }
+
        pseudoflavor = clnt->cl_auth->au_flavor;
 
        old_timeo = clnt->cl_timeout;
        old = rpc_clnt_set_transport(clnt, xprt, timeout);
+       oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
 
        rpc_unregister_client(clnt);
        __rpc_clnt_remove_pipedir(clnt);
@@ -697,20 +719,74 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt,
        synchronize_rcu();
        if (parent != clnt)
                rpc_release_client(parent);
+       xprt_switch_put(oldxps);
        xprt_put(old);
        dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
        return 0;
 
 out_revert:
+       xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
        rpc_clnt_set_transport(clnt, old, old_timeo);
        clnt->cl_parent = parent;
        rpc_client_register(clnt, pseudoflavor, NULL);
+       xprt_switch_put(xps);
        xprt_put(xprt);
        dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
        return err;
 }
 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
 
+static
+int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
+{
+       struct rpc_xprt_switch *xps;
+
+       rcu_read_lock();
+       xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+       rcu_read_unlock();
+       if (xps == NULL)
+               return -EAGAIN;
+       xprt_iter_init_listall(xpi, xps);
+       xprt_switch_put(xps);
+       return 0;
+}
+
+/**
+ * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
+ * @clnt: pointer to client
+ * @fn: function to apply
+ * @data: void pointer to function data
+ *
+ * Iterates through the list of RPC transports currently attached to the
+ * client and applies the function fn(clnt, xprt, data).
+ *
+ * On error, the iteration stops, and the function returns the error value.
+ */
+int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
+               int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
+               void *data)
+{
+       struct rpc_xprt_iter xpi;
+       int ret;
+
+       ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
+       if (ret)
+               return ret;
+       for (;;) {
+               struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
+
+               if (!xprt)
+                       break;
+               ret = fn(clnt, xprt, data);
+               xprt_put(xprt);
+               if (ret < 0)
+                       break;
+       }
+       xprt_iter_destroy(&xpi);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
+
 /*
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
@@ -783,6 +859,7 @@ rpc_free_client(struct rpc_clnt *clnt)
        rpc_free_iostats(clnt->cl_metrics);
        clnt->cl_metrics = NULL;
        xprt_put(rcu_dereference_raw(clnt->cl_xprt));
+       xprt_iter_destroy(&clnt->cl_xpi);
        rpciod_down();
        rpc_free_clid(clnt);
        kfree(clnt);
@@ -868,6 +945,7 @@ EXPORT_SYMBOL_GPL(rpc_bind_new_program);
 void rpc_task_release_client(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
+       struct rpc_xprt *xprt = task->tk_xprt;
 
        if (clnt != NULL) {
                /* Remove from client task list */
@@ -878,13 +956,22 @@ void rpc_task_release_client(struct rpc_task *task)
 
                rpc_release_client(clnt);
        }
+
+       if (xprt != NULL) {
+               task->tk_xprt = NULL;
+
+               xprt_put(xprt);
+       }
 }
 
 static
 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 {
+
        if (clnt != NULL) {
                rpc_task_release_client(task);
+               if (task->tk_xprt == NULL)
+                       task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
                task->tk_client = clnt;
                atomic_inc(&clnt->cl_count);
                if (clnt->cl_softrtry)
@@ -900,14 +987,6 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
        }
 }
 
-void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
-{
-       rpc_task_release_client(task);
-       rpc_task_set_client(task, clnt);
-}
-EXPORT_SYMBOL_GPL(rpc_task_reset_client);
-
-
 static void
 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
 {
@@ -2104,11 +2183,9 @@ call_timeout(struct rpc_task *task)
        }
        if (RPC_IS_SOFT(task)) {
                if (clnt->cl_chatty) {
-                       rcu_read_lock();
                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
                                clnt->cl_program->name,
-                               rcu_dereference(clnt->cl_xprt)->servername);
-                       rcu_read_unlock();
+                               task->tk_xprt->servername);
                }
                if (task->tk_flags & RPC_TASK_TIMEOUT)
                        rpc_exit(task, -ETIMEDOUT);
@@ -2120,11 +2197,9 @@ call_timeout(struct rpc_task *task)
        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
                task->tk_flags |= RPC_CALL_MAJORSEEN;
                if (clnt->cl_chatty) {
-                       rcu_read_lock();
                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
                        clnt->cl_program->name,
-                       rcu_dereference(clnt->cl_xprt)->servername);
-                       rcu_read_unlock();
+                       task->tk_xprt->servername);
                }
        }
        rpc_force_rebind(clnt);
@@ -2154,11 +2229,9 @@ call_decode(struct rpc_task *task)
 
        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
                if (clnt->cl_chatty) {
-                       rcu_read_lock();
                        printk(KERN_NOTICE "%s: server %s OK\n",
                                clnt->cl_program->name,
-                               rcu_dereference(clnt->cl_xprt)->servername);
-                       rcu_read_unlock();
+                               task->tk_xprt->servername);
                }
                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
        }
@@ -2312,11 +2385,9 @@ rpc_verify_header(struct rpc_task *task)
                        task->tk_action = call_bind;
                        goto out_retry;
                case RPC_AUTH_TOOWEAK:
-                       rcu_read_lock();
                        printk(KERN_NOTICE "RPC: server %s requires stronger "
                               "authentication.\n",
-                              rcu_dereference(clnt->cl_xprt)->servername);
-                       rcu_read_unlock();
+                              task->tk_xprt->servername);
                        break;
                default:
                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
@@ -2341,27 +2412,27 @@ rpc_verify_header(struct rpc_task *task)
        case RPC_SUCCESS:
                return p;
        case RPC_PROG_UNAVAIL:
-               dprintk_rcu("RPC: %5u %s: program %u is unsupported "
+               dprintk("RPC: %5u %s: program %u is unsupported "
                                "by server %s\n", task->tk_pid, __func__,
                                (unsigned int)clnt->cl_prog,
-                               rcu_dereference(clnt->cl_xprt)->servername);
+                               task->tk_xprt->servername);
                error = -EPFNOSUPPORT;
                goto out_err;
        case RPC_PROG_MISMATCH:
-               dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
+               dprintk("RPC: %5u %s: program %u, version %u unsupported "
                                "by server %s\n", task->tk_pid, __func__,
                                (unsigned int)clnt->cl_prog,
                                (unsigned int)clnt->cl_vers,
-                               rcu_dereference(clnt->cl_xprt)->servername);
+                               task->tk_xprt->servername);
                error = -EPROTONOSUPPORT;
                goto out_err;
        case RPC_PROC_UNAVAIL:
-               dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
+               dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
                                "version %u on server %s\n",
                                task->tk_pid, __func__,
                                rpc_proc_name(task),
                                clnt->cl_prog, clnt->cl_vers,
-                               rcu_dereference(clnt->cl_xprt)->servername);
+                               task->tk_xprt->servername);
                error = -EOPNOTSUPP;
                goto out_err;
        case RPC_GARBAGE_ARGS:
@@ -2421,7 +2492,10 @@ static int rpc_ping(struct rpc_clnt *clnt)
        return err;
 }
 
-struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
+static
+struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
+               struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
+               const struct rpc_call_ops *ops, void *data)
 {
        struct rpc_message msg = {
                .rpc_proc = &rpcproc_null,
@@ -2429,14 +2503,140 @@ struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int
        };
        struct rpc_task_setup task_setup_data = {
                .rpc_client = clnt,
+               .rpc_xprt = xprt,
                .rpc_message = &msg,
-               .callback_ops = &rpc_default_ops,
+               .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
+               .callback_data = data,
                .flags = flags,
        };
+
        return rpc_run_task(&task_setup_data);
 }
+
+struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
+{
+       return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_call_null);
 
+struct rpc_cb_add_xprt_calldata {
+       struct rpc_xprt_switch *xps;
+       struct rpc_xprt *xprt;
+};
+
+static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
+{
+       struct rpc_cb_add_xprt_calldata *data = calldata;
+
+       if (task->tk_status == 0)
+               rpc_xprt_switch_add_xprt(data->xps, data->xprt);
+}
+
+static void rpc_cb_add_xprt_release(void *calldata)
+{
+       struct rpc_cb_add_xprt_calldata *data = calldata;
+
+       xprt_put(data->xprt);
+       xprt_switch_put(data->xps);
+       kfree(data);
+}
+
+const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
+       .rpc_call_done = rpc_cb_add_xprt_done,
+       .rpc_release = rpc_cb_add_xprt_release,
+};
+
+/**
+ * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
+ * @clnt: pointer to struct rpc_clnt
+ * @xps: pointer to struct rpc_xprt_switch,
+ * @xprt: pointer struct rpc_xprt
+ * @dummy: unused
+ */
+int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
+               struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
+               void *dummy)
+{
+       struct rpc_cb_add_xprt_calldata *data;
+       struct rpc_cred *cred;
+       struct rpc_task *task;
+
+       data = kmalloc(sizeof(*data), GFP_NOFS);
+       if (!data)
+               return -ENOMEM;
+       data->xps = xprt_switch_get(xps);
+       data->xprt = xprt_get(xprt);
+
+       cred = authnull_ops.lookup_cred(NULL, NULL, 0);
+       task = rpc_call_null_helper(clnt, xprt, cred,
+                       RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC,
+                       &rpc_cb_add_xprt_call_ops, data);
+       put_rpccred(cred);
+       if (IS_ERR(task))
+               return PTR_ERR(task);
+       rpc_put_task(task);
+       return 1;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
+
+/**
+ * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
+ * @clnt: pointer to struct rpc_clnt
+ * @xprtargs: pointer to struct xprt_create
+ * @setup: callback to test and/or set up the connection
+ * @data: pointer to setup function data
+ *
+ * Creates a new transport using the parameters set in args and
+ * adds it to clnt.
+ * If ping is set, then test that connectivity succeeds before
+ * adding the new transport.
+ *
+ */
+int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
+               struct xprt_create *xprtargs,
+               int (*setup)(struct rpc_clnt *,
+                       struct rpc_xprt_switch *,
+                       struct rpc_xprt *,
+                       void *),
+               void *data)
+{
+       struct rpc_xprt_switch *xps;
+       struct rpc_xprt *xprt;
+       unsigned char resvport;
+       int ret = 0;
+
+       rcu_read_lock();
+       xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+       xprt = xprt_iter_xprt(&clnt->cl_xpi);
+       if (xps == NULL || xprt == NULL) {
+               rcu_read_unlock();
+               return -EAGAIN;
+       }
+       resvport = xprt->resvport;
+       rcu_read_unlock();
+
+       xprt = xprt_create_transport(xprtargs);
+       if (IS_ERR(xprt)) {
+               ret = PTR_ERR(xprt);
+               goto out_put_switch;
+       }
+       xprt->resvport = resvport;
+
+       rpc_xprt_switch_set_roundrobin(xps);
+       if (setup) {
+               ret = setup(clnt, xps, xprt, data);
+               if (ret != 0)
+                       goto out_put_xprt;
+       }
+       rpc_xprt_switch_add_xprt(xps, xprt);
+out_put_xprt:
+       xprt_put(xprt);
+out_put_switch:
+       xprt_switch_put(xps);
+       return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
+
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 static void rpc_show_header(void)
 {
@@ -2483,57 +2683,39 @@ void rpc_show_tasks(struct net *net)
 #endif
 
 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+static int
+rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
+               struct rpc_xprt *xprt,
+               void *dummy)
+{
+       return xprt_enable_swap(xprt);
+}
+
 int
 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
 {
-       int ret = 0;
-       struct rpc_xprt *xprt;
-
-       if (atomic_inc_return(&clnt->cl_swapper) == 1) {
-retry:
-               rcu_read_lock();
-               xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
-               rcu_read_unlock();
-               if (!xprt) {
-                       /*
-                        * If we didn't get a reference, then we likely are
-                        * racing with a migration event. Wait for a grace
-                        * period and try again.
-                        */
-                       synchronize_rcu();
-                       goto retry;
-               }
-
-               ret = xprt_enable_swap(xprt);
-               xprt_put(xprt);
-       }
-       return ret;
+       if (atomic_inc_return(&clnt->cl_swapper) == 1)
+               return rpc_clnt_iterate_for_each_xprt(clnt,
+                               rpc_clnt_swap_activate_callback, NULL);
+       return 0;
 }
 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
 
+static int
+rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
+               struct rpc_xprt *xprt,
+               void *dummy)
+{
+       xprt_disable_swap(xprt);
+       return 0;
+}
+
 void
 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
 {
-       struct rpc_xprt *xprt;
-
-       if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) {
-retry:
-               rcu_read_lock();
-               xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
-               rcu_read_unlock();
-               if (!xprt) {
-                       /*
-                        * If we didn't get a reference, then we likely are
-                        * racing with a migration event. Wait for a grace
-                        * period and try again.
-                        */
-                       synchronize_rcu();
-                       goto retry;
-               }
-
-               xprt_disable_swap(xprt);
-               xprt_put(xprt);
-       }
+       if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
+               rpc_clnt_iterate_for_each_xprt(clnt,
+                               rpc_clnt_swap_deactivate_callback, NULL);
 }
 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
 #endif /* CONFIG_SUNRPC_SWAP */
index cf5770d8f49af952a930ce8e8793d09e723f7cf5..5b30603596d0c7c5c5a4d3e3ffdca259c65ba343 100644 (file)
@@ -648,10 +648,10 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi
 static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
 {
        struct rpc_clnt *parent = clnt->cl_parent;
-       struct rpc_xprt *xprt = rcu_dereference(clnt->cl_xprt);
+       struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
 
        while (parent != clnt) {
-               if (rcu_dereference(parent->cl_xprt) != xprt)
+               if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
                        break;
                if (clnt->cl_autobind)
                        break;
@@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task)
        int status;
 
        rcu_read_lock();
-       do {
-               clnt = rpcb_find_transport_owner(task->tk_client);
-               xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
-       } while (xprt == NULL);
+       clnt = rpcb_find_transport_owner(task->tk_client);
        rcu_read_unlock();
+       xprt = xprt_get(task->tk_xprt);
 
        dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
                task->tk_pid, __func__,
index 73ad57a59989cb9234d06c1d399e02408e50c7d3..fcfd48d263f64f1f52ef317a9a8974a8a457196e 100644 (file)
@@ -909,6 +909,8 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
        /* Initialize workqueue for async tasks */
        task->tk_workqueue = task_setup_data->workqueue;
 
+       task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);
+
        if (task->tk_ops->rpc_call_prepare != NULL)
                task->tk_action = rpc_prepare_task;
 
index 37edea6fa92d9685570ad3c1b37cb5fda5c0b9b3..216a1385718a27e9f86516720d28d61b9aaac609 100644 (file)
@@ -48,6 +48,7 @@
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/metrics.h>
 #include <linux/sunrpc/bc_xprt.h>
+#include <linux/rcupdate.h>
 
 #include <trace/events/sunrpc.h>
 
@@ -1166,7 +1167,7 @@ void xprt_free(struct rpc_xprt *xprt)
 {
        put_net(xprt->xprt_net);
        xprt_free_all_slots(xprt);
-       kfree(xprt);
+       kfree_rcu(xprt, rcu);
 }
 EXPORT_SYMBOL_GPL(xprt_free);
 
@@ -1180,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free);
  */
 void xprt_reserve(struct rpc_task *task)
 {
-       struct rpc_xprt *xprt;
+       struct rpc_xprt *xprt = task->tk_xprt;
 
        task->tk_status = 0;
        if (task->tk_rqstp != NULL)
@@ -1188,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task)
 
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
-       rcu_read_lock();
-       xprt = rcu_dereference(task->tk_client->cl_xprt);
        if (!xprt_throttle_congested(xprt, task))
                xprt->ops->alloc_slot(xprt, task);
-       rcu_read_unlock();
 }
 
 /**
@@ -1206,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task)
  */
 void xprt_retry_reserve(struct rpc_task *task)
 {
-       struct rpc_xprt *xprt;
+       struct rpc_xprt *xprt = task->tk_xprt;
 
        task->tk_status = 0;
        if (task->tk_rqstp != NULL)
@@ -1214,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task)
 
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
-       rcu_read_lock();
-       xprt = rcu_dereference(task->tk_client->cl_xprt);
        xprt->ops->alloc_slot(xprt, task);
-       rcu_read_unlock();
 }
 
 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
@@ -1264,11 +1259,9 @@ void xprt_release(struct rpc_task *task)
 
        if (req == NULL) {
                if (task->tk_client) {
-                       rcu_read_lock();
-                       xprt = rcu_dereference(task->tk_client->cl_xprt);
+                       xprt = task->tk_xprt;
                        if (xprt->snd_task == task)
                                xprt_release_write(xprt, task);
-                       rcu_read_unlock();
                }
                return;
        }
@@ -1307,7 +1300,7 @@ void xprt_release(struct rpc_task *task)
 
 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 {
-       atomic_set(&xprt->count, 1);
+       kref_init(&xprt->kref);
 
        spin_lock_init(&xprt->transport_lock);
        spin_lock_init(&xprt->reserve_lock);
@@ -1318,6 +1311,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
        spin_lock_init(&xprt->bc_pa_lock);
        INIT_LIST_HEAD(&xprt->bc_pa_list);
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
+       INIT_LIST_HEAD(&xprt->xprt_switch);
 
        xprt->last_used = jiffies;
        xprt->cwnd = RPC_INITCWND;
@@ -1415,6 +1409,24 @@ static void xprt_destroy(struct rpc_xprt *xprt)
        xprt->ops->destroy(xprt);
 }
 
+static void xprt_destroy_kref(struct kref *kref)
+{
+       xprt_destroy(container_of(kref, struct rpc_xprt, kref));
+}
+
+/**
+ * xprt_get - return a reference to an RPC transport.
+ * @xprt: pointer to the transport
+ *
+ */
+struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
+{
+       if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
+               return xprt;
+       return NULL;
+}
+EXPORT_SYMBOL_GPL(xprt_get);
+
 /**
  * xprt_put - release a reference to an RPC transport.
  * @xprt: pointer to the transport
@@ -1422,7 +1434,7 @@ static void xprt_destroy(struct rpc_xprt *xprt)
  */
 void xprt_put(struct rpc_xprt *xprt)
 {
-       if (atomic_dec_and_test(&xprt->count))
-               xprt_destroy(xprt);
+       if (xprt != NULL)
+               kref_put(&xprt->kref, xprt_destroy_kref);
 }
 EXPORT_SYMBOL_GPL(xprt_put);
diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c
new file mode 100644 (file)
index 0000000..e7fd769
--- /dev/null
@@ -0,0 +1,475 @@
+/*
+ * Multipath support for RPC
+ *
+ * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
+ *
+ * Trond Myklebust <trond.myklebust@primarydata.com>
+ *
+ */
+#include <linux/types.h>
+#include <linux/kref.h>
+#include <linux/list.h>
+#include <linux/rcupdate.h>
+#include <linux/rculist.h>
+#include <linux/slab.h>
+#include <asm/cmpxchg.h>
+#include <linux/spinlock.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/xprtmultipath.h>
+
+typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
+               const struct rpc_xprt *cur);
+
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
+static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
+
+static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
+               struct rpc_xprt *xprt)
+{
+       if (unlikely(xprt_get(xprt) == NULL))
+               return;
+       list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
+       smp_wmb();
+       if (xps->xps_nxprts == 0)
+               xps->xps_net = xprt->xprt_net;
+       xps->xps_nxprts++;
+}
+
+/**
+ * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ *
+ * Adds xprt to the end of the list of struct rpc_xprt in xps.
+ */
+void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
+               struct rpc_xprt *xprt)
+{
+       if (xprt == NULL)
+               return;
+       spin_lock(&xps->xps_lock);
+       if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
+               xprt_switch_add_xprt_locked(xps, xprt);
+       spin_unlock(&xps->xps_lock);
+}
+
+static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
+               struct rpc_xprt *xprt)
+{
+       if (unlikely(xprt == NULL))
+               return;
+       xps->xps_nxprts--;
+       if (xps->xps_nxprts == 0)
+               xps->xps_net = NULL;
+       smp_wmb();
+       list_del_rcu(&xprt->xprt_switch);
+}
+
+/**
+ * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ *
+ * Removes xprt from the list of struct rpc_xprt in xps.
+ */
+void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
+               struct rpc_xprt *xprt)
+{
+       spin_lock(&xps->xps_lock);
+       xprt_switch_remove_xprt_locked(xps, xprt);
+       spin_unlock(&xps->xps_lock);
+       xprt_put(xprt);
+}
+
+/**
+ * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
+ * @xprt: pointer to struct rpc_xprt
+ * @gfp_flags: allocation flags
+ *
+ * On success, returns an initialised struct rpc_xprt_switch, containing
+ * the entry xprt. Returns NULL on failure.
+ */
+struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
+               gfp_t gfp_flags)
+{
+       struct rpc_xprt_switch *xps;
+
+       xps = kmalloc(sizeof(*xps), gfp_flags);
+       if (xps != NULL) {
+               spin_lock_init(&xps->xps_lock);
+               kref_init(&xps->xps_kref);
+               xps->xps_nxprts = 0;
+               INIT_LIST_HEAD(&xps->xps_xprt_list);
+               xps->xps_iter_ops = &rpc_xprt_iter_singular;
+               xprt_switch_add_xprt_locked(xps, xprt);
+       }
+
+       return xps;
+}
+
+static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
+{
+       spin_lock(&xps->xps_lock);
+       while (!list_empty(&xps->xps_xprt_list)) {
+               struct rpc_xprt *xprt;
+
+               xprt = list_first_entry(&xps->xps_xprt_list,
+                               struct rpc_xprt, xprt_switch);
+               xprt_switch_remove_xprt_locked(xps, xprt);
+               spin_unlock(&xps->xps_lock);
+               xprt_put(xprt);
+               spin_lock(&xps->xps_lock);
+       }
+       spin_unlock(&xps->xps_lock);
+}
+
+static void xprt_switch_free(struct kref *kref)
+{
+       struct rpc_xprt_switch *xps = container_of(kref,
+                       struct rpc_xprt_switch, xps_kref);
+
+       xprt_switch_free_entries(xps);
+       kfree_rcu(xps, xps_rcu);
+}
+
+/**
+ * xprt_switch_get - Return a reference to a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Returns a reference to xps unless the refcount is already zero.
+ */
+struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
+{
+       if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
+               return xps;
+       return NULL;
+}
+
+/**
+ * xprt_switch_put - Release a reference to a rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Release the reference to xps, and free it once the refcount is zero.
+ */
+void xprt_switch_put(struct rpc_xprt_switch *xps)
+{
+       if (xps != NULL)
+               kref_put(&xps->xps_kref, xprt_switch_free);
+}
+
+/**
+ * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
+ * @xps: pointer to struct rpc_xprt_switch
+ *
+ * Sets a round-robin default policy for iterators acting on xps.
+ */
+void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
+{
+       if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
+               WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
+}
+
+static
+const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
+{
+       if (xpi->xpi_ops != NULL)
+               return xpi->xpi_ops;
+       return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
+}
+
+static
+void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
+{
+}
+
+static
+void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
+{
+       WRITE_ONCE(xpi->xpi_cursor, NULL);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
+{
+       return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch);
+}
+
+static
+struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
+{
+       struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+
+       if (xps == NULL)
+               return NULL;
+       return xprt_switch_find_first_entry(&xps->xps_xprt_list);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
+               const struct rpc_xprt *cur)
+{
+       struct rpc_xprt *pos;
+
+       list_for_each_entry_rcu(pos, head, xprt_switch) {
+               if (cur == pos)
+                       return pos;
+       }
+       return NULL;
+}
+
+static
+struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
+{
+       struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+       struct list_head *head;
+
+       if (xps == NULL)
+               return NULL;
+       head = &xps->xps_xprt_list;
+       if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
+               return xprt_switch_find_first_entry(head);
+       return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
+               const struct rpc_xprt *cur)
+{
+       struct rpc_xprt *pos, *prev = NULL;
+
+       list_for_each_entry_rcu(pos, head, xprt_switch) {
+               if (cur == prev)
+                       return pos;
+               prev = pos;
+       }
+       return NULL;
+}
+
+static
+struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head,
+               struct rpc_xprt **cursor,
+               xprt_switch_find_xprt_t find_next)
+{
+       struct rpc_xprt *cur, *pos, *old;
+
+       cur = READ_ONCE(*cursor);
+       for (;;) {
+               old = cur;
+               pos = find_next(head, old);
+               if (pos == NULL)
+                       break;
+               cur = cmpxchg_relaxed(cursor, old, pos);
+               if (cur == old)
+                       break;
+       }
+       return pos;
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
+               xprt_switch_find_xprt_t find_next)
+{
+       struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
+       struct list_head *head;
+
+       if (xps == NULL)
+               return NULL;
+       head = &xps->xps_xprt_list;
+       if (xps->xps_nxprts < 2)
+               return xprt_switch_find_first_entry(head);
+       return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
+}
+
+static
+struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
+               const struct rpc_xprt *cur)
+{
+       struct rpc_xprt *ret;
+
+       ret = xprt_switch_find_next_entry(head, cur);
+       if (ret != NULL)
+               return ret;
+       return xprt_switch_find_first_entry(head);
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
+{
+       return xprt_iter_next_entry_multiple(xpi,
+                       xprt_switch_find_next_entry_roundrobin);
+}
+
+static
+struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
+{
+       return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry);
+}
+
+/*
+ * xprt_iter_rewind - Resets the xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Resets xpi to ensure that it points to the first entry in the list
+ * of transports.
+ */
+static
+void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
+{
+       rcu_read_lock();
+       xprt_iter_ops(xpi)->xpi_rewind(xpi);
+       rcu_read_unlock();
+}
+
+static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
+               struct rpc_xprt_switch *xps,
+               const struct rpc_xprt_iter_ops *ops)
+{
+       rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
+       xpi->xpi_cursor = NULL;
+       xpi->xpi_ops = ops;
+}
+
+/**
+ * xprt_iter_init - Initialise an xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to rpc_xprt_switch
+ *
+ * Initialises the iterator to use the default iterator ops
+ * as set in xps. This function is mainly intended for internal
+ * use in the rpc_client.
+ */
+void xprt_iter_init(struct rpc_xprt_iter *xpi,
+               struct rpc_xprt_switch *xps)
+{
+       __xprt_iter_init(xpi, xps, NULL);
+}
+
+/**
+ * xprt_iter_init_listall - Initialise an xprt iterator
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to rpc_xprt_switch
+ *
+ * Initialises the iterator to iterate once through the entire list
+ * of entries in xps.
+ */
+void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
+               struct rpc_xprt_switch *xps)
+{
+       __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
+}
+
+/**
+ * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
+ * @xpi: pointer to rpc_xprt_iter
+ * @xps: pointer to a new rpc_xprt_switch or NULL
+ *
+ * Swaps out the existing xpi->xpi_xpswitch with a new value.
+ */
+struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
+               struct rpc_xprt_switch *newswitch)
+{
+       struct rpc_xprt_switch __rcu *oldswitch;
+
+       /* Atomically swap out the old xpswitch */
+       oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
+       if (newswitch != NULL)
+               xprt_iter_rewind(xpi);
+       return rcu_dereference_protected(oldswitch, true);
+}
+
+/**
+ * xprt_iter_destroy - Destroys the xprt iterator
+ * @xpi pointer to rpc_xprt_iter
+ */
+void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
+{
+       xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
+}
+
+/**
+ * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a pointer to the struct rpc_xprt that is currently
+ * pointed to by the cursor.
+ * Caller must be holding rcu_read_lock().
+ */
+struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
+{
+       WARN_ON_ONCE(!rcu_read_lock_held());
+       return xprt_iter_ops(xpi)->xpi_xprt(xpi);
+}
+
+static
+struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
+               struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
+{
+       struct rpc_xprt *ret;
+
+       do {
+               ret = fn(xpi);
+               if (ret == NULL)
+                       break;
+               ret = xprt_get(ret);
+       } while (ret == NULL);
+       return ret;
+}
+
+/**
+ * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a reference to the struct rpc_xprt that is currently
+ * pointed to by the cursor.
+ */
+struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
+{
+       struct rpc_xprt *xprt;
+
+       rcu_read_lock();
+       xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
+       rcu_read_unlock();
+       return xprt;
+}
+
+/**
+ * xprt_iter_get_next - Returns the next rpc_xprt following the cursor
+ * @xpi: pointer to rpc_xprt_iter
+ *
+ * Returns a reference to the struct rpc_xprt that immediately follows the
+ * entry pointed to by the cursor.
+ */
+struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
+{
+       struct rpc_xprt *xprt;
+
+       rcu_read_lock();
+       xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
+       rcu_read_unlock();
+       return xprt;
+}
+
+/* Policy for always returning the first entry in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
+       .xpi_rewind = xprt_iter_no_rewind,
+       .xpi_xprt = xprt_iter_first_entry,
+       .xpi_next = xprt_iter_first_entry,
+};
+
+/* Policy for round-robin iteration of entries in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
+       .xpi_rewind = xprt_iter_default_rewind,
+       .xpi_xprt = xprt_iter_current_entry,
+       .xpi_next = xprt_iter_next_entry_roundrobin,
+};
+
+/* Policy for once-through iteration of entries in the rpc_xprt_switch */
+static
+const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
+       .xpi_rewind = xprt_iter_default_rewind,
+       .xpi_xprt = xprt_iter_current_entry,
+       .xpi_next = xprt_iter_next_entry_all,
+};
index c14f3a4bff6826aea365804eb4201c24d3e59b84..b289e106540bf9d85d0d1e1e476786af21b6e1ae 100644 (file)
@@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
                if (!r)
                        goto out;
 
-               r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
-                                            sizeof(u64), GFP_KERNEL);
-               if (!r->r.fmr.physaddrs)
+               r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+                                          sizeof(u64), GFP_KERNEL);
+               if (!r->fmr.physaddrs)
                        goto out_free;
 
-               r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
-               if (IS_ERR(r->r.fmr.fmr))
+               r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+               if (IS_ERR(r->fmr.fmr))
                        goto out_fmr_err;
 
                list_add(&r->mw_list, &buf->rb_mws);
@@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
        return 0;
 
 out_fmr_err:
-       rc = PTR_ERR(r->r.fmr.fmr);
+       rc = PTR_ERR(r->fmr.fmr);
        dprintk("RPC:       %s: ib_alloc_fmr status %i\n", __func__, rc);
-       kfree(r->r.fmr.physaddrs);
+       kfree(r->fmr.physaddrs);
 out_free:
        kfree(r);
 out:
@@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r)
 {
        LIST_HEAD(l);
 
-       list_add(&r->r.fmr.fmr->list, &l);
+       list_add(&r->fmr.fmr->list, &l);
        return ib_unmap_fmr(&l);
 }
 
@@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
                nsegs = RPCRDMA_MAX_FMR_SGES;
        for (i = 0; i < nsegs;) {
                rpcrdma_map_one(device, seg, direction);
-               mw->r.fmr.physaddrs[i] = seg->mr_dma;
+               mw->fmr.physaddrs[i] = seg->mr_dma;
                len += seg->mr_len;
                ++seg;
                ++i;
@@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
                        break;
        }
 
-       rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+       rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
                             i, seg1->mr_dma);
        if (rc)
                goto out_maperr;
 
        seg1->rl_mw = mw;
-       seg1->mr_rkey = mw->r.fmr.fmr->rkey;
+       seg1->mr_rkey = mw->fmr.fmr->rkey;
        seg1->mr_base = seg1->mr_dma + pageoff;
        seg1->mr_nsegs = i;
        seg1->mr_len = len;
@@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
                seg = &req->rl_segments[i];
                mw = seg->rl_mw;
 
-               list_add(&mw->r.fmr.fmr->list, &unmap_list);
+               list_add(&mw->fmr.fmr->list, &unmap_list);
 
                i += seg->mr_nsegs;
        }
@@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
        while (!list_empty(&buf->rb_all)) {
                r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
                list_del(&r->mw_all);
-               kfree(r->r.fmr.physaddrs);
+               kfree(r->fmr.physaddrs);
 
-               rc = ib_dealloc_fmr(r->r.fmr.fmr);
+               rc = ib_dealloc_fmr(r->fmr.fmr);
                if (rc)
                        dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
                                __func__, rc);
index e16567389e28f5eba1359ddf91802d63c4612c26..c250924a9fd3c6489a123a46ee9f5dc6ae67366e 100644 (file)
@@ -109,20 +109,20 @@ static void
 __frwr_recovery_worker(struct work_struct *work)
 {
        struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-                                           r.frmr.fr_work);
-       struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
+                                           frmr.fr_work);
+       struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
        unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
        struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
 
-       if (ib_dereg_mr(r->r.frmr.fr_mr))
+       if (ib_dereg_mr(r->frmr.fr_mr))
                goto out_fail;
 
-       r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
-       if (IS_ERR(r->r.frmr.fr_mr))
+       r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
+       if (IS_ERR(r->frmr.fr_mr))
                goto out_fail;
 
        dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
-       r->r.frmr.fr_state = FRMR_IS_INVALID;
+       r->frmr.fr_state = FRMR_IS_INVALID;
        rpcrdma_put_mw(r_xprt, r);
        return;
 
@@ -137,15 +137,15 @@ out_fail:
 static void
 __frwr_queue_recovery(struct rpcrdma_mw *r)
 {
-       INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
-       queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+       INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
+       queue_work(frwr_recovery_wq, &r->frmr.fr_work);
 }
 
 static int
 __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
            unsigned int depth)
 {
-       struct rpcrdma_frmr *f = &r->r.frmr;
+       struct rpcrdma_frmr *f = &r->frmr;
        int rc;
 
        f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
@@ -158,6 +158,8 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
 
        sg_init_table(f->sg, depth);
 
+       init_completion(&f->fr_linv_done);
+
        return 0;
 
 out_mr_err:
@@ -179,11 +181,11 @@ __frwr_release(struct rpcrdma_mw *r)
 {
        int rc;
 
-       rc = ib_dereg_mr(r->r.frmr.fr_mr);
+       rc = ib_dereg_mr(r->frmr.fr_mr);
        if (rc)
                dprintk("RPC:       %s: ib_dereg_mr status %i\n",
                        __func__, rc);
-       kfree(r->r.frmr.sg);
+       kfree(r->frmr.sg);
 }
 
 static int
@@ -244,39 +246,76 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
                     rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
 }
 
-/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs
- * to be reset.
+static void
+__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
+                           const char *wr)
+{
+       frmr->fr_state = FRMR_IS_STALE;
+       if (wc->status != IB_WC_WR_FLUSH_ERR)
+               pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
+                      wr, ib_wc_status_msg(wc->status),
+                      wc->status, wc->vendor_err);
+}
+
+/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
+ * @cq:        completion queue (ignored)
+ * @wc:        completed WR
  *
- * WARNING: Only wr_id and status are reliable at this point
  */
 static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
+frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
 {
-       if (likely(wc->status == IB_WC_SUCCESS))
-               return;
-
-       /* WARNING: Only wr_id and status are reliable at this point */
-       r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-       if (wc->status == IB_WC_WR_FLUSH_ERR)
-               dprintk("RPC:       %s: frmr %p flushed\n", __func__, r);
-       else
-               pr_warn("RPC:       %s: frmr %p error, status %s (%d)\n",
-                       __func__, r, ib_wc_status_msg(wc->status), wc->status);
+       struct rpcrdma_frmr *frmr;
+       struct ib_cqe *cqe;
 
-       r->r.frmr.fr_state = FRMR_IS_STALE;
+       /* WARNING: Only wr_cqe and status are reliable at this point */
+       if (wc->status != IB_WC_SUCCESS) {
+               cqe = wc->wr_cqe;
+               frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+               __frwr_sendcompletion_flush(wc, frmr, "fastreg");
+       }
 }
 
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq:        completion queue (ignored)
+ * @wc:        completed WR
+ *
+ */
 static void
-frwr_sendcompletion(struct ib_wc *wc)
+frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
 {
-       struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-       struct rpcrdma_frmr *f = &r->r.frmr;
+       struct rpcrdma_frmr *frmr;
+       struct ib_cqe *cqe;
 
-       if (unlikely(wc->status != IB_WC_SUCCESS))
-               __frwr_sendcompletion_flush(wc, r);
+       /* WARNING: Only wr_cqe and status are reliable at this point */
+       if (wc->status != IB_WC_SUCCESS) {
+               cqe = wc->wr_cqe;
+               frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+               __frwr_sendcompletion_flush(wc, frmr, "localinv");
+       }
+}
 
-       if (f->fr_waiter)
-               complete(&f->fr_linv_done);
+/**
+ * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * @cq:        completion queue (ignored)
+ * @wc:        completed WR
+ *
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void
+frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+       struct rpcrdma_frmr *frmr;
+       struct ib_cqe *cqe;
+
+       /* WARNING: Only wr_cqe and status are reliable at this point */
+       cqe = wc->wr_cqe;
+       frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
+       if (wc->status != IB_WC_SUCCESS)
+               __frwr_sendcompletion_flush(wc, frmr, "localinv");
+       complete_all(&frmr->fr_linv_done);
 }
 
 static int
@@ -313,8 +352,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
 
                list_add(&r->mw_list, &buf->rb_mws);
                list_add(&r->mw_all, &buf->rb_all);
-               r->mw_sendcompletion = frwr_sendcompletion;
-               r->r.frmr.fr_xprt = r_xprt;
+               r->frmr.fr_xprt = r_xprt;
        }
 
        return 0;
@@ -347,10 +385,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
                mw = rpcrdma_get_mw(r_xprt);
                if (!mw)
                        return -ENOMEM;
-       } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
-       frmr = &mw->r.frmr;
+       } while (mw->frmr.fr_state != FRMR_IS_INVALID);
+       frmr = &mw->frmr;
        frmr->fr_state = FRMR_IS_VALID;
-       frmr->fr_waiter = false;
        mr = frmr->fr_mr;
        reg_wr = &frmr->fr_regwr;
 
@@ -400,7 +437,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
        reg_wr->wr.next = NULL;
        reg_wr->wr.opcode = IB_WR_REG_MR;
-       reg_wr->wr.wr_id = (uintptr_t)mw;
+       frmr->fr_cqe.done = frwr_wc_fastreg;
+       reg_wr->wr.wr_cqe = &frmr->fr_cqe;
        reg_wr->wr.num_sge = 0;
        reg_wr->wr.send_flags = 0;
        reg_wr->mr = mr;
@@ -434,15 +472,15 @@ static struct ib_send_wr *
 __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
 {
        struct rpcrdma_mw *mw = seg->rl_mw;
-       struct rpcrdma_frmr *f = &mw->r.frmr;
+       struct rpcrdma_frmr *f = &mw->frmr;
        struct ib_send_wr *invalidate_wr;
 
-       f->fr_waiter = false;
        f->fr_state = FRMR_IS_INVALID;
        invalidate_wr = &f->fr_invwr;
 
        memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-       invalidate_wr->wr_id = (unsigned long)(void *)mw;
+       f->fr_cqe.done = frwr_wc_localinv;
+       invalidate_wr->wr_cqe = &f->fr_cqe;
        invalidate_wr->opcode = IB_WR_LOCAL_INV;
        invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
 
@@ -455,7 +493,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 {
        struct ib_device *device = r_xprt->rx_ia.ri_device;
        struct rpcrdma_mw *mw = seg->rl_mw;
-       struct rpcrdma_frmr *f = &mw->r.frmr;
+       struct rpcrdma_frmr *f = &mw->frmr;
 
        seg->rl_mw = NULL;
 
@@ -504,15 +542,15 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 
                i += seg->mr_nsegs;
        }
-       f = &seg->rl_mw->r.frmr;
+       f = &seg->rl_mw->frmr;
 
        /* Strong send queue ordering guarantees that when the
         * last WR in the chain completes, all WRs in the chain
         * are complete.
         */
        f->fr_invwr.send_flags = IB_SEND_SIGNALED;
-       f->fr_waiter = true;
-       init_completion(&f->fr_linv_done);
+       f->fr_cqe.done = frwr_wc_localinv_wake;
+       reinit_completion(&f->fr_linv_done);
        INIT_CQCOUNT(&r_xprt->rx_ep);
 
        /* Transport disconnect drains the receive CQ before it
@@ -520,14 +558,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
         * unless ri_id->qp is a valid pointer.
         */
        rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
-       if (rc)
+       if (rc) {
                pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
+               rdma_disconnect(ia->ri_id);
+               goto unmap;
+       }
 
        wait_for_completion(&f->fr_linv_done);
 
        /* ORDER: Now DMA unmap all of the req's MRs, and return
         * them to the free MW list.
         */
+unmap:
        for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
                seg = &req->rl_segments[i];
 
@@ -549,7 +591,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
        struct rpcrdma_mr_seg *seg1 = seg;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_mw *mw = seg1->rl_mw;
-       struct rpcrdma_frmr *frmr = &mw->r.frmr;
+       struct rpcrdma_frmr *frmr = &mw->frmr;
        struct ib_send_wr *invalidate_wr, *bad_wr;
        int rc, nsegs = seg->mr_nsegs;
 
@@ -557,10 +599,11 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
 
        seg1->rl_mw = NULL;
        frmr->fr_state = FRMR_IS_INVALID;
-       invalidate_wr = &mw->r.frmr.fr_invwr;
+       invalidate_wr = &mw->frmr.fr_invwr;
 
        memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-       invalidate_wr->wr_id = (uintptr_t)mw;
+       frmr->fr_cqe.done = frwr_wc_localinv;
+       invalidate_wr->wr_cqe = &frmr->fr_cqe;
        invalidate_wr->opcode = IB_WR_LOCAL_INV;
        invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
        DECR_CQCOUNT(&r_xprt->rx_ep);
index dbb302ecf59012702f03ca1b77b8efdd9624d52b..481b9b6f4a150e9ed9d76f7cc4b187a59cc43b91 100644 (file)
@@ -68,7 +68,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
        rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
        seg->mr_rkey = ia->ri_dma_mr->rkey;
        seg->mr_base = seg->mr_dma;
-       seg->mr_nsegs = 1;
        return 1;
 }
 
index 0f28f2d743eda144dcd6cde7bd0a4fdb118bccc2..888823bb6dae40d3c3143823110237ce69741a7a 100644 (file)
@@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
        return tlen;
 }
 
+/* Split "vec" on page boundaries into segments. FMR registers pages,
+ * not a byte range. Other modes coalesce these segments into a single
+ * MR when they can.
+ */
+static int
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+                    int n, int nsegs)
+{
+       size_t page_offset;
+       u32 remaining;
+       char *base;
+
+       base = vec->iov_base;
+       page_offset = offset_in_page(base);
+       remaining = vec->iov_len;
+       while (remaining && n < nsegs) {
+               seg[n].mr_page = NULL;
+               seg[n].mr_offset = base;
+               seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+               remaining -= seg[n].mr_len;
+               base += seg[n].mr_len;
+               ++n;
+               page_offset = 0;
+       }
+       return n;
+}
+
 /*
  * Chunk assembly from upper layer xdr_buf.
  *
@@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
        int page_base;
        struct page **ppages;
 
-       if (pos == 0 && xdrbuf->head[0].iov_len) {
-               seg[n].mr_page = NULL;
-               seg[n].mr_offset = xdrbuf->head[0].iov_base;
-               seg[n].mr_len = xdrbuf->head[0].iov_len;
-               ++n;
+       if (pos == 0) {
+               n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
+               if (n == nsegs)
+                       return -EIO;
        }
 
        len = xdrbuf->page_len;
@@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
                 * xdr pad bytes, saving the server an RDMA operation. */
                if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
                        return n;
+               n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
                if (n == nsegs)
-                       /* Tail remains, but we're out of segments */
                        return -EIO;
-               seg[n].mr_page = NULL;
-               seg[n].mr_offset = xdrbuf->tail[0].iov_base;
-               seg[n].mr_len = xdrbuf->tail[0].iov_len;
-               ++n;
        }
 
        return n;
@@ -773,20 +795,17 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        __be32 *iptr;
-       int rdmalen, status;
+       int rdmalen, status, rmerr;
        unsigned long cwnd;
-       u32 credits;
 
        dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
        if (rep->rr_len == RPCRDMA_BAD_LEN)
                goto out_badstatus;
-       if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+       if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
                goto out_shortreply;
 
        headerp = rdmab_to_msg(rep->rr_rdmabuf);
-       if (headerp->rm_vers != rpcrdma_version)
-               goto out_badversion;
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        if (rpcrdma_is_bcall(headerp))
                goto out_bcall;
@@ -809,15 +828,16 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
         */
        list_del_init(&rqst->rq_list);
        spin_unlock_bh(&xprt->transport_lock);
-       dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
-               "                   RPC request 0x%p xid 0x%08x\n",
-                       __func__, rep, req, rqst,
-                       be32_to_cpu(headerp->rm_xid));
+       dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
+               __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
        /* from here on, the reply is no longer an orphan */
        req->rl_reply = rep;
        xprt->reestablish_timeout = 0;
 
+       if (headerp->rm_vers != rpcrdma_version)
+               goto out_badversion;
+
        /* check for expected message types */
        /* The order of some of these tests is important. */
        switch (headerp->rm_type) {
@@ -878,6 +898,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
                status = rdmalen;
                break;
 
+       case rdma_error:
+               goto out_rdmaerr;
+
 badheader:
        default:
                dprintk("%s: invalid rpcrdma reply header (type %d):"
@@ -893,6 +916,7 @@ badheader:
                break;
        }
 
+out:
        /* Invalidate and flush the data payloads before waking the
         * waiting application. This guarantees the memory region is
         * properly fenced from the server before the application
@@ -903,15 +927,9 @@ badheader:
        if (req->rl_nchunks)
                r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
 
-       credits = be32_to_cpu(headerp->rm_credit);
-       if (credits == 0)
-               credits = 1;    /* don't deadlock */
-       else if (credits > r_xprt->rx_buf.rb_max_requests)
-               credits = r_xprt->rx_buf.rb_max_requests;
-
        spin_lock_bh(&xprt->transport_lock);
        cwnd = xprt->cwnd;
-       xprt->cwnd = credits << RPC_CWNDSHIFT;
+       xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
                xprt_release_rqst_cong(rqst->rq_task);
 
@@ -935,13 +953,43 @@ out_bcall:
        return;
 #endif
 
-out_shortreply:
-       dprintk("RPC:       %s: short/invalid reply\n", __func__);
-       goto repost;
-
+/* If the incoming reply terminated a pending RPC, the next
+ * RPC call will post a replacement receive buffer as it is
+ * being marshaled.
+ */
 out_badversion:
        dprintk("RPC:       %s: invalid version %d\n",
                __func__, be32_to_cpu(headerp->rm_vers));
+       status = -EIO;
+       r_xprt->rx_stats.bad_reply_count++;
+       goto out;
+
+out_rdmaerr:
+       rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
+       switch (rmerr) {
+       case ERR_VERS:
+               pr_err("%s: server reports header version error (%u-%u)\n",
+                      __func__,
+                      be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
+                      be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
+               break;
+       case ERR_CHUNK:
+               pr_err("%s: server reports header decoding error\n",
+                      __func__);
+               break;
+       default:
+               pr_err("%s: server reports unknown error %d\n",
+                      __func__, rmerr);
+       }
+       status = -EREMOTEIO;
+       r_xprt->rx_stats.bad_reply_count++;
+       goto out;
+
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
+out_shortreply:
+       dprintk("RPC:       %s: short/invalid reply\n", __func__);
        goto repost;
 
 out_nomatch:
index 878f1bfb1db98f6972e641130d36a80cdf97faf8..f5ed9f982cd71b12606d7f8953a6283447509029 100644 (file)
@@ -112,89 +112,65 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
        }
 }
 
+/**
+ * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq:        completion queue (ignored)
+ * @wc:        completed WR
+ *
+ */
 static void
-rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
+rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 {
-       struct rpcrdma_ep *ep = context;
-
-       pr_err("RPC:       %s: %s on device %s ep %p\n",
-              __func__, ib_event_msg(event->event),
-               event->device->name, context);
-       if (ep->rep_connected == 1) {
-               ep->rep_connected = -EIO;
-               rpcrdma_conn_func(ep);
-               wake_up_all(&ep->rep_connect_wait);
-       }
+       /* WARNING: Only wr_cqe and status are reliable at this point */
+       if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
+               pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
+                      ib_wc_status_msg(wc->status),
+                      wc->status, wc->vendor_err);
 }
 
 static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
+rpcrdma_receive_worker(struct work_struct *work)
 {
-       /* WARNING: Only wr_id and status are reliable at this point */
-       if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
-               if (wc->status != IB_WC_SUCCESS &&
-                   wc->status != IB_WC_WR_FLUSH_ERR)
-                       pr_err("RPC:       %s: SEND: %s\n",
-                              __func__, ib_wc_status_msg(wc->status));
-       } else {
-               struct rpcrdma_mw *r;
+       struct rpcrdma_rep *rep =
+                       container_of(work, struct rpcrdma_rep, rr_work);
 
-               r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-               r->mw_sendcompletion(wc);
-       }
+       rpcrdma_reply_handler(rep);
 }
 
-/* The common case is a single send completion is waiting. By
- * passing two WC entries to ib_poll_cq, a return code of 1
- * means there is exactly one WC waiting and no more. We don't
- * have to invoke ib_poll_cq again to know that the CQ has been
- * properly drained.
+/* Perform basic sanity checking to avoid using garbage
+ * to update the credit grant value.
  */
 static void
-rpcrdma_sendcq_poll(struct ib_cq *cq)
+rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 {
-       struct ib_wc *pos, wcs[2];
-       int count, rc;
+       struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
+       struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
+       u32 credits;
 
-       do {
-               pos = wcs;
+       if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
+               return;
 
-               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
-               if (rc < 0)
-                       break;
+       credits = be32_to_cpu(rmsgp->rm_credit);
+       if (credits == 0)
+               credits = 1;    /* don't deadlock */
+       else if (credits > buffer->rb_max_requests)
+               credits = buffer->rb_max_requests;
 
-               count = rc;
-               while (count-- > 0)
-                       rpcrdma_sendcq_process_wc(pos++);
-       } while (rc == ARRAY_SIZE(wcs));
-       return;
+       atomic_set(&buffer->rb_credits, credits);
 }
 
-/* Handle provider send completion upcalls.
+/**
+ * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * @cq:        completion queue (ignored)
+ * @wc:        completed WR
+ *
  */
 static void
-rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
+rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
 {
-       do {
-               rpcrdma_sendcq_poll(cq);
-       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
-                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
-}
-
-static void
-rpcrdma_receive_worker(struct work_struct *work)
-{
-       struct rpcrdma_rep *rep =
-                       container_of(work, struct rpcrdma_rep, rr_work);
-
-       rpcrdma_reply_handler(rep);
-}
-
-static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc)
-{
-       struct rpcrdma_rep *rep =
-                       (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
+                                              rr_cqe);
 
        /* WARNING: Only wr_id and status are reliable at this point */
        if (wc->status != IB_WC_SUCCESS)
@@ -211,7 +187,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
        ib_dma_sync_single_for_cpu(rep->rr_device,
                                   rdmab_addr(rep->rr_rdmabuf),
                                   rep->rr_len, DMA_FROM_DEVICE);
-       prefetch(rdmab_to_msg(rep->rr_rdmabuf));
+
+       rpcrdma_update_granted_credits(rep);
 
 out_schedule:
        queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -219,57 +196,20 @@ out_schedule:
 
 out_fail:
        if (wc->status != IB_WC_WR_FLUSH_ERR)
-               pr_err("RPC:       %s: rep %p: %s\n",
-                      __func__, rep, ib_wc_status_msg(wc->status));
+               pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
+                      ib_wc_status_msg(wc->status),
+                      wc->status, wc->vendor_err);
        rep->rr_len = RPCRDMA_BAD_LEN;
        goto out_schedule;
 }
 
-/* The wc array is on stack: automatic memory is always CPU-local.
- *
- * struct ib_wc is 64 bytes, making the poll array potentially
- * large. But this is at the bottom of the call chain. Further
- * substantial work is done in another thread.
- */
-static void
-rpcrdma_recvcq_poll(struct ib_cq *cq)
-{
-       struct ib_wc *pos, wcs[4];
-       int count, rc;
-
-       do {
-               pos = wcs;
-
-               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
-               if (rc < 0)
-                       break;
-
-               count = rc;
-               while (count-- > 0)
-                       rpcrdma_recvcq_process_wc(pos++);
-       } while (rc == ARRAY_SIZE(wcs));
-}
-
-/* Handle provider receive completion upcalls.
- */
-static void
-rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
-{
-       do {
-               rpcrdma_recvcq_poll(cq);
-       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
-                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
-}
-
 static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
        struct ib_wc wc;
 
        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-               rpcrdma_recvcq_process_wc(&wc);
-       while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
-               rpcrdma_sendcq_process_wc(&wc);
+               rpcrdma_receive_wc(NULL, &wc);
 }
 
 static int
@@ -330,6 +270,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 connected:
                dprintk("RPC:       %s: %sconnected\n",
                                        __func__, connstate > 0 ? "" : "dis");
+               atomic_set(&xprt->rx_buf.rb_credits, 1);
                ep->rep_connected = connstate;
                rpcrdma_conn_func(ep);
                wake_up_all(&ep->rep_connect_wait);
@@ -560,9 +501,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                                struct rpcrdma_create_data_internal *cdata)
 {
        struct ib_cq *sendcq, *recvcq;
-       struct ib_cq_init_attr cq_attr = {};
        unsigned int max_qp_wr;
-       int rc, err;
+       int rc;
 
        if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
                dprintk("RPC:       %s: insufficient sge's available\n",
@@ -614,9 +554,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        init_waitqueue_head(&ep->rep_connect_wait);
        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
-       cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
-       sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
-                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+       sendcq = ib_alloc_cq(ia->ri_device, NULL,
+                            ep->rep_attr.cap.max_send_wr + 1,
+                            0, IB_POLL_SOFTIRQ);
        if (IS_ERR(sendcq)) {
                rc = PTR_ERR(sendcq);
                dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -624,16 +564,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                goto out1;
        }
 
-       rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
-       if (rc) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               goto out2;
-       }
-
-       cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
-       recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
-                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
+       recvcq = ib_alloc_cq(ia->ri_device, NULL,
+                            ep->rep_attr.cap.max_recv_wr + 1,
+                            0, IB_POLL_SOFTIRQ);
        if (IS_ERR(recvcq)) {
                rc = PTR_ERR(recvcq);
                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -641,14 +574,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                goto out2;
        }
 
-       rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
-       if (rc) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               ib_destroy_cq(recvcq);
-               goto out2;
-       }
-
        ep->rep_attr.send_cq = sendcq;
        ep->rep_attr.recv_cq = recvcq;
 
@@ -673,10 +598,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        return 0;
 
 out2:
-       err = ib_destroy_cq(sendcq);
-       if (err)
-               dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-                       __func__, err);
+       ib_free_cq(sendcq);
 out1:
        if (ia->ri_dma_mr)
                ib_dereg_mr(ia->ri_dma_mr);
@@ -711,15 +633,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
                ia->ri_id->qp = NULL;
        }
 
-       rc = ib_destroy_cq(ep->rep_attr.recv_cq);
-       if (rc)
-               dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-                       __func__, rc);
-
-       rc = ib_destroy_cq(ep->rep_attr.send_cq);
-       if (rc)
-               dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
-                       __func__, rc);
+       ib_free_cq(ep->rep_attr.recv_cq);
+       ib_free_cq(ep->rep_attr.send_cq);
 
        if (ia->ri_dma_mr) {
                rc = ib_dereg_mr(ia->ri_dma_mr);
@@ -898,6 +813,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
        spin_lock(&buffer->rb_reqslock);
        list_add(&req->rl_all, &buffer->rb_allreqs);
        spin_unlock(&buffer->rb_reqslock);
+       req->rl_cqe.done = rpcrdma_wc_send;
        req->rl_buffer = &r_xprt->rx_buf;
        return req;
 }
@@ -923,6 +839,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
        }
 
        rep->rr_device = ia->ri_device;
+       rep->rr_cqe.done = rpcrdma_receive_wc;
        rep->rr_rxprt = r_xprt;
        INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
        return rep;
@@ -943,6 +860,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        buf->rb_max_requests = r_xprt->rx_data.max_requests;
        buf->rb_bc_srv_max_requests = 0;
        spin_lock_init(&buf->rb_lock);
+       atomic_set(&buf->rb_credits, 1);
 
        rc = ia->ri_ops->ro_init(r_xprt);
        if (rc)
@@ -1259,7 +1177,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
        }
 
        send_wr.next = NULL;
-       send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
+       send_wr.wr_cqe = &req->rl_cqe;
        send_wr.sg_list = iov;
        send_wr.num_sge = req->rl_niovs;
        send_wr.opcode = IB_WR_SEND;
@@ -1297,7 +1215,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
        int rc;
 
        recv_wr.next = NULL;
-       recv_wr.wr_id = (u64) (unsigned long) rep;
+       recv_wr.wr_cqe = &rep->rr_cqe;
        recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
        recv_wr.num_sge = 1;
 
index 38fe11b0987528c3d345676202db487cae269a87..2ebc743cb96f4835205550fa210ecef37d55963a 100644 (file)
@@ -95,10 +95,6 @@ struct rpcrdma_ep {
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
 #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
 
-/* Force completion handler to ignore the signal
- */
-#define RPCRDMA_IGNORE_COMPLETION      (0ULL)
-
 /* Pre-allocate extra Work Requests for handling backward receives
  * and sends. This is a fixed value because the Work Queues are
  * allocated when the forward channel is set up.
@@ -171,6 +167,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
 struct rpcrdma_buffer;
 
 struct rpcrdma_rep {
+       struct ib_cqe           rr_cqe;
        unsigned int            rr_len;
        struct ib_device        *rr_device;
        struct rpcrdma_xprt     *rr_rxprt;
@@ -204,11 +201,11 @@ struct rpcrdma_frmr {
        struct scatterlist              *sg;
        int                             sg_nents;
        struct ib_mr                    *fr_mr;
+       struct ib_cqe                   fr_cqe;
        enum rpcrdma_frmr_state         fr_state;
+       struct completion               fr_linv_done;
        struct work_struct              fr_work;
        struct rpcrdma_xprt             *fr_xprt;
-       bool                            fr_waiter;
-       struct completion               fr_linv_done;;
        union {
                struct ib_reg_wr        fr_regwr;
                struct ib_send_wr       fr_invwr;
@@ -224,8 +221,7 @@ struct rpcrdma_mw {
        union {
                struct rpcrdma_fmr      fmr;
                struct rpcrdma_frmr     frmr;
-       } r;
-       void                    (*mw_sendcompletion)(struct ib_wc *);
+       };
        struct list_head        mw_list;
        struct list_head        mw_all;
 };
@@ -281,6 +277,7 @@ struct rpcrdma_req {
        struct rpcrdma_regbuf   *rl_sendbuf;
        struct rpcrdma_mr_seg   rl_segments[RPCRDMA_MAX_SEGS];
 
+       struct ib_cqe           rl_cqe;
        struct list_head        rl_all;
        bool                    rl_backchannel;
 };
@@ -311,6 +308,7 @@ struct rpcrdma_buffer {
        struct list_head        rb_send_bufs;
        struct list_head        rb_recv_bufs;
        u32                     rb_max_requests;
+       atomic_t                rb_credits;     /* most recent credit grant */
 
        u32                     rb_bc_srv_max_requests;
        spinlock_t              rb_reqslock;    /* protect rb_allreqs */
index fde2138b81e7dfc99a33ef42ce5d11ded58a0617..65e759569e4873619735b966cd055b9d634d2d3a 100644 (file)
@@ -1844,9 +1844,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
  */
 static void xs_local_rpcbind(struct rpc_task *task)
 {
-       rcu_read_lock();
-       xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
-       rcu_read_unlock();
+       xprt_set_bound(task->tk_xprt);
 }
 
 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)