nfs41: close sequence setup/done support
[linux-2.6-block.git] / fs / nfs / nfs4proc.c
index 4674f8092da8aaa52ae4af6835c6b150341efe66..a34fada51446b9e753c0a8727354216e006a35f4 100644 (file)
@@ -271,6 +271,317 @@ static void renew_lease(const struct nfs_server *server, unsigned long timestamp
        spin_unlock(&clp->cl_lock);
 }
 
+#if defined(CONFIG_NFS_V4_1)
+
+/*
+ * nfs4_free_slot - free a slot and efficiently update slot table.
+ *
+ * freeing a slot is trivially done by clearing its respective bit
+ * in the bitmap.
+ * If the freed slotid equals highest_used_slotid we want to update it
+ * so that the server would be able to size down the slot table if needed,
+ * otherwise we know that the highest_used_slotid is still in use.
+ * When updating highest_used_slotid there may be "holes" in the bitmap
+ * so we need to scan down from highest_used_slotid to 0 looking for the now
+ * highest slotid in use.
+ * If none found, highest_used_slotid is set to -1.
+ */
+static void
+nfs4_free_slot(struct nfs4_slot_table *tbl, u8 free_slotid)
+{
+       int slotid = free_slotid;
+
+       spin_lock(&tbl->slot_tbl_lock);
+       /* clear used bit in bitmap */
+       __clear_bit(slotid, tbl->used_slots);
+
+       /* update highest_used_slotid when it is freed */
+       if (slotid == tbl->highest_used_slotid) {
+               slotid = find_last_bit(tbl->used_slots, tbl->max_slots);
+               if (slotid >= 0 && slotid < tbl->max_slots)
+                       tbl->highest_used_slotid = slotid;
+               else
+                       tbl->highest_used_slotid = -1;
+       }
+       rpc_wake_up_next(&tbl->slot_tbl_waitq);
+       spin_unlock(&tbl->slot_tbl_lock);
+       dprintk("%s: free_slotid %u highest_used_slotid %d\n", __func__,
+               free_slotid, tbl->highest_used_slotid);
+}
+
+void nfs41_sequence_free_slot(const struct nfs_client *clp,
+                             struct nfs4_sequence_res *res)
+{
+       struct nfs4_slot_table *tbl;
+
+       if (!nfs4_has_session(clp)) {
+               dprintk("%s: No session\n", __func__);
+               return;
+       }
+       tbl = &clp->cl_session->fc_slot_table;
+       if (res->sr_slotid == NFS4_MAX_SLOT_TABLE) {
+               dprintk("%s: No slot\n", __func__);
+               /* just wake up the next guy waiting since
+                * we may have not consumed a slot after all */
+               rpc_wake_up_next(&tbl->slot_tbl_waitq);
+               return;
+       }
+       nfs4_free_slot(tbl, res->sr_slotid);
+       res->sr_slotid = NFS4_MAX_SLOT_TABLE;
+}
+
+static void nfs41_sequence_done(struct nfs_client *clp,
+                               struct nfs4_sequence_res *res,
+                               int rpc_status)
+{
+       unsigned long timestamp;
+       struct nfs4_slot_table *tbl;
+       struct nfs4_slot *slot;
+
+       /*
+        * sr_status remains 1 if an RPC level error occurred. The server
+        * may or may not have processed the sequence operation..
+        * Proceed as if the server received and processed the sequence
+        * operation.
+        */
+       if (res->sr_status == 1)
+               res->sr_status = NFS_OK;
+
+       /* -ERESTARTSYS can result in skipping nfs41_sequence_setup */
+       if (res->sr_slotid == NFS4_MAX_SLOT_TABLE)
+               goto out;
+
+       tbl = &clp->cl_session->fc_slot_table;
+       slot = tbl->slots + res->sr_slotid;
+
+       if (res->sr_status == 0) {
+               /* Update the slot's sequence and clientid lease timer */
+               ++slot->seq_nr;
+               timestamp = res->sr_renewal_time;
+               spin_lock(&clp->cl_lock);
+               if (time_before(clp->cl_last_renewal, timestamp))
+                       clp->cl_last_renewal = timestamp;
+               spin_unlock(&clp->cl_lock);
+               return;
+       }
+out:
+       /* The session may be reset by one of the error handlers. */
+       dprintk("%s: Error %d free the slot \n", __func__, res->sr_status);
+       nfs41_sequence_free_slot(clp, res);
+}
+
+/*
+ * nfs4_find_slot - efficiently look for a free slot
+ *
+ * nfs4_find_slot looks for an unset bit in the used_slots bitmap.
+ * If found, we mark the slot as used, update the highest_used_slotid,
+ * and respectively set up the sequence operation args.
+ * The slot number is returned if found, or NFS4_MAX_SLOT_TABLE otherwise.
+ *
+ * Note: must be called with under the slot_tbl_lock.
+ */
+static u8
+nfs4_find_slot(struct nfs4_slot_table *tbl, struct rpc_task *task)
+{
+       int slotid;
+       u8 ret_id = NFS4_MAX_SLOT_TABLE;
+       BUILD_BUG_ON((u8)NFS4_MAX_SLOT_TABLE != (int)NFS4_MAX_SLOT_TABLE);
+
+       dprintk("--> %s used_slots=%04lx highest_used=%d max_slots=%d\n",
+               __func__, tbl->used_slots[0], tbl->highest_used_slotid,
+               tbl->max_slots);
+       slotid = find_first_zero_bit(tbl->used_slots, tbl->max_slots);
+       if (slotid >= tbl->max_slots)
+               goto out;
+       __set_bit(slotid, tbl->used_slots);
+       if (slotid > tbl->highest_used_slotid)
+               tbl->highest_used_slotid = slotid;
+       ret_id = slotid;
+out:
+       dprintk("<-- %s used_slots=%04lx highest_used=%d slotid=%d \n",
+               __func__, tbl->used_slots[0], tbl->highest_used_slotid, ret_id);
+       return ret_id;
+}
+
+static int nfs41_setup_sequence(struct nfs4_session *session,
+                               struct nfs4_sequence_args *args,
+                               struct nfs4_sequence_res *res,
+                               int cache_reply,
+                               struct rpc_task *task)
+{
+       struct nfs4_slot *slot;
+       struct nfs4_slot_table *tbl;
+       u8 slotid;
+
+       dprintk("--> %s\n", __func__);
+       /* slot already allocated? */
+       if (res->sr_slotid != NFS4_MAX_SLOT_TABLE)
+               return 0;
+
+       memset(res, 0, sizeof(*res));
+       res->sr_slotid = NFS4_MAX_SLOT_TABLE;
+       tbl = &session->fc_slot_table;
+
+       spin_lock(&tbl->slot_tbl_lock);
+       slotid = nfs4_find_slot(tbl, task);
+       if (slotid == NFS4_MAX_SLOT_TABLE) {
+               rpc_sleep_on(&tbl->slot_tbl_waitq, task, NULL);
+               spin_unlock(&tbl->slot_tbl_lock);
+               dprintk("<-- %s: no free slots\n", __func__);
+               return -EAGAIN;
+       }
+       spin_unlock(&tbl->slot_tbl_lock);
+
+       slot = tbl->slots + slotid;
+       args->sa_slotid = slotid;
+       args->sa_cache_this = cache_reply;
+
+       dprintk("<-- %s slotid=%d seqid=%d\n", __func__, slotid, slot->seq_nr);
+
+       res->sr_slotid = slotid;
+       res->sr_renewal_time = jiffies;
+       /*
+        * sr_status is only set in decode_sequence, and so will remain
+        * set to 1 if an rpc level failure occurs.
+        */
+       res->sr_status = 1;
+       return 0;
+}
+
+int nfs4_setup_sequence(struct nfs_client *clp,
+                       struct nfs4_sequence_args *args,
+                       struct nfs4_sequence_res *res,
+                       int cache_reply,
+                       struct rpc_task *task)
+{
+       int ret = 0;
+
+       dprintk("--> %s clp %p session %p sr_slotid %d\n",
+               __func__, clp, clp->cl_session, res->sr_slotid);
+
+       if (!nfs4_has_session(clp))
+               goto out;
+       ret = nfs41_setup_sequence(clp->cl_session, args, res, cache_reply,
+                                  task);
+       if (ret != -EAGAIN) {
+               /* terminate rpc task */
+               task->tk_status = ret;
+               task->tk_action = NULL;
+       }
+out:
+       dprintk("<-- %s status=%d\n", __func__, ret);
+       return ret;
+}
+
+struct nfs41_call_sync_data {
+       struct nfs_client *clp;
+       struct nfs4_sequence_args *seq_args;
+       struct nfs4_sequence_res *seq_res;
+       int cache_reply;
+};
+
+static void nfs41_call_sync_prepare(struct rpc_task *task, void *calldata)
+{
+       struct nfs41_call_sync_data *data = calldata;
+
+       dprintk("--> %s data->clp->cl_session %p\n", __func__,
+               data->clp->cl_session);
+       if (nfs4_setup_sequence(data->clp, data->seq_args,
+                               data->seq_res, data->cache_reply, task))
+               return;
+       rpc_call_start(task);
+}
+
+static void nfs41_call_sync_done(struct rpc_task *task, void *calldata)
+{
+       struct nfs41_call_sync_data *data = calldata;
+
+       nfs41_sequence_done(data->clp, data->seq_res, task->tk_status);
+       nfs41_sequence_free_slot(data->clp, data->seq_res);
+}
+
+struct rpc_call_ops nfs41_call_sync_ops = {
+       .rpc_call_prepare = nfs41_call_sync_prepare,
+       .rpc_call_done = nfs41_call_sync_done,
+};
+
+static int nfs4_call_sync_sequence(struct nfs_client *clp,
+                                  struct rpc_clnt *clnt,
+                                  struct rpc_message *msg,
+                                  struct nfs4_sequence_args *args,
+                                  struct nfs4_sequence_res *res,
+                                  int cache_reply)
+{
+       int ret;
+       struct rpc_task *task;
+       struct nfs41_call_sync_data data = {
+               .clp = clp,
+               .seq_args = args,
+               .seq_res = res,
+               .cache_reply = cache_reply,
+       };
+       struct rpc_task_setup task_setup = {
+               .rpc_client = clnt,
+               .rpc_message = msg,
+               .callback_ops = &nfs41_call_sync_ops,
+               .callback_data = &data
+       };
+
+       res->sr_slotid = NFS4_MAX_SLOT_TABLE;
+       task = rpc_run_task(&task_setup);
+       if (IS_ERR(task))
+               ret = PTR_ERR(task);
+       else {
+               ret = task->tk_status;
+               rpc_put_task(task);
+       }
+       return ret;
+}
+
+int _nfs4_call_sync_session(struct nfs_server *server,
+                           struct rpc_message *msg,
+                           struct nfs4_sequence_args *args,
+                           struct nfs4_sequence_res *res,
+                           int cache_reply)
+{
+       return nfs4_call_sync_sequence(server->nfs_client, server->client,
+                                      msg, args, res, cache_reply);
+}
+
+#endif /* CONFIG_NFS_V4_1 */
+
+int _nfs4_call_sync(struct nfs_server *server,
+                   struct rpc_message *msg,
+                   struct nfs4_sequence_args *args,
+                   struct nfs4_sequence_res *res,
+                   int cache_reply)
+{
+       args->sa_session = res->sr_session = NULL;
+       return rpc_call_sync(server->client, msg, 0);
+}
+
+#define nfs4_call_sync(server, msg, args, res, cache_reply) \
+       (server)->nfs_client->cl_call_sync((server), (msg), &(args)->seq_args, \
+                       &(res)->seq_res, (cache_reply))
+
+static void nfs4_sequence_done(const struct nfs_server *server,
+                              struct nfs4_sequence_res *res, int rpc_status)
+{
+#ifdef CONFIG_NFS_V4_1
+       if (nfs4_has_session(server->nfs_client))
+               nfs41_sequence_done(server->nfs_client, res, rpc_status);
+#endif /* CONFIG_NFS_V4_1 */
+}
+
+/* no restart, therefore free slot here */
+static void nfs4_sequence_done_free_slot(const struct nfs_server *server,
+                                        struct nfs4_sequence_res *res,
+                                        int rpc_status)
+{
+       nfs4_sequence_done(server, res, rpc_status);
+       nfs4_sequence_free_slot(server->nfs_client, res);
+}
+
 static void update_changeattr(struct inode *dir, struct nfs4_change_info *cinfo)
 {
        struct nfs_inode *nfsi = NFS_I(dir);
@@ -343,6 +654,7 @@ static struct nfs4_opendata *nfs4_opendata_alloc(struct path *path,
        p->o_arg.server = server;
        p->o_arg.bitmask = server->attr_bitmask;
        p->o_arg.claim = NFS4_OPEN_CLAIM_NULL;
+       p->o_res.seq_res.sr_slotid = NFS4_MAX_SLOT_TABLE;
        if (flags & O_EXCL) {
                u32 *s = (u32 *) p->o_arg.u.verifier.data;
                s[0] = jiffies;
@@ -1269,7 +1581,7 @@ static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
        } else
                memcpy(&arg.stateid, &zero_stateid, sizeof(arg.stateid));
 
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &arg, &res, 1);
        if (status == 0 && state != NULL)
                renew_lease(server, timestamp);
        return status;
@@ -1318,6 +1630,7 @@ static void nfs4_close_done(struct rpc_task *task, void *data)
        struct nfs4_state *state = calldata->state;
        struct nfs_server *server = NFS_SERVER(calldata->inode);
 
+       nfs4_sequence_done(server, &calldata->res.seq_res, task->tk_status);
        if (RPC_ASSASSINATED(task))
                return;
         /* hmm. we are done with the inode, and in the process of freeing
@@ -1340,6 +1653,7 @@ static void nfs4_close_done(struct rpc_task *task, void *data)
                                return;
                        }
        }
+       nfs4_sequence_free_slot(server->nfs_client, &calldata->res.seq_res);
        nfs_refresh_inode(calldata->inode, calldata->res.fattr);
 }
 
@@ -1380,6 +1694,10 @@ static void nfs4_close_prepare(struct rpc_task *task, void *data)
                calldata->arg.fmode = FMODE_WRITE;
        }
        calldata->timestamp = jiffies;
+       if (nfs4_setup_sequence((NFS_SERVER(calldata->inode))->nfs_client,
+                               &calldata->arg.seq_args, &calldata->res.seq_res,
+                               1, task))
+               return;
        rpc_call_start(task);
 }
 
@@ -1419,7 +1737,7 @@ int nfs4_do_close(struct path *path, struct nfs4_state *state, int wait)
        };
        int status = -ENOMEM;
 
-       calldata = kmalloc(sizeof(*calldata), GFP_KERNEL);
+       calldata = kzalloc(sizeof(*calldata), GFP_KERNEL);
        if (calldata == NULL)
                goto out;
        calldata->inode = state->inode;
@@ -1435,6 +1753,7 @@ int nfs4_do_close(struct path *path, struct nfs4_state *state, int wait)
        calldata->res.fattr = &calldata->fattr;
        calldata->res.seqid = calldata->arg.seqid;
        calldata->res.server = server;
+       calldata->res.seq_res.sr_slotid = NFS4_MAX_SLOT_TABLE;
        calldata->path.mnt = mntget(path->mnt);
        calldata->path.dentry = dget(path->dentry);
 
@@ -1584,15 +1903,18 @@ void nfs4_close_context(struct nfs_open_context *ctx, int is_sync)
 
 static int _nfs4_server_capabilities(struct nfs_server *server, struct nfs_fh *fhandle)
 {
+       struct nfs4_server_caps_arg args = {
+               .fhandle = fhandle,
+       };
        struct nfs4_server_caps_res res = {};
        struct rpc_message msg = {
                .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_SERVER_CAPS],
-               .rpc_argp = fhandle,
+               .rpc_argp = &args,
                .rpc_resp = &res,
        };
        int status;
 
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &args, &res, 0);
        if (status == 0) {
                memcpy(server->attr_bitmask, res.attr_bitmask, sizeof(server->attr_bitmask));
                if (res.attr_bitmask[0] & FATTR4_WORD0_ACL)
@@ -1606,6 +1928,7 @@ static int _nfs4_server_capabilities(struct nfs_server *server, struct nfs_fh *f
                server->cache_consistency_bitmask[1] &= FATTR4_WORD1_TIME_METADATA|FATTR4_WORD1_TIME_MODIFY;
                server->acl_bitmask = res.acl_bitmask;
        }
+
        return status;
 }
 
@@ -1638,7 +1961,7 @@ static int _nfs4_lookup_root(struct nfs_server *server, struct nfs_fh *fhandle,
                .rpc_resp = &res,
        };
        nfs_fattr_init(info->fattr);
-       return rpc_call_sync(server->client, &msg, 0);
+       return nfs4_call_sync(server, &msg, &args, &res, 0);
 }
 
 static int nfs4_lookup_root(struct nfs_server *server, struct nfs_fh *fhandle,
@@ -1728,7 +2051,7 @@ static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle,
        };
        
        nfs_fattr_init(fattr);
-       return rpc_call_sync(server->client, &msg, 0);
+       return nfs4_call_sync(server, &msg, &args, &res, 0);
 }
 
 static int nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr)
@@ -1812,7 +2135,7 @@ static int _nfs4_proc_lookupfh(struct nfs_server *server, const struct nfs_fh *d
        nfs_fattr_init(fattr);
 
        dprintk("NFS call  lookupfh %s\n", name->name);
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &args, &res, 0);
        dprintk("NFS reply lookupfh: %d\n", status);
        return status;
 }
@@ -1898,7 +2221,7 @@ static int _nfs4_proc_access(struct inode *inode, struct nfs_access_entry *entry
                        args.access |= NFS4_ACCESS_EXECUTE;
        }
        nfs_fattr_init(&fattr);
-       status = rpc_call_sync(NFS_CLIENT(inode), &msg, 0);
+       status = nfs4_call_sync(server, &msg, &args, &res, 0);
        if (!status) {
                entry->mask = 0;
                if (res.access & NFS4_ACCESS_READ)
@@ -1957,13 +2280,14 @@ static int _nfs4_proc_readlink(struct inode *inode, struct page *page,
                .pglen    = pglen,
                .pages    = &page,
        };
+       struct nfs4_readlink_res res;
        struct rpc_message msg = {
                .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_READLINK],
                .rpc_argp = &args,
-               .rpc_resp = NULL,
+               .rpc_resp = &res,
        };
 
-       return rpc_call_sync(NFS_CLIENT(inode), &msg, 0);
+       return nfs4_call_sync(NFS_SERVER(inode), &msg, &args, &res, 0);
 }
 
 static int nfs4_proc_readlink(struct inode *inode, struct page *page,
@@ -2057,7 +2381,7 @@ static int _nfs4_proc_remove(struct inode *dir, struct qstr *name)
        int                     status;
 
        nfs_fattr_init(&res.dir_attr);
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &args, &res, 1);
        if (status == 0) {
                update_changeattr(dir, &res.cinfo);
                nfs_post_op_update_inode(dir, &res.dir_attr);
@@ -2125,7 +2449,7 @@ static int _nfs4_proc_rename(struct inode *old_dir, struct qstr *old_name,
        
        nfs_fattr_init(res.old_fattr);
        nfs_fattr_init(res.new_fattr);
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &arg, &res, 1);
 
        if (!status) {
                update_changeattr(old_dir, &res.old_cinfo);
@@ -2174,7 +2498,7 @@ static int _nfs4_proc_link(struct inode *inode, struct inode *dir, struct qstr *
 
        nfs_fattr_init(res.fattr);
        nfs_fattr_init(res.dir_attr);
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &arg, &res, 1);
        if (!status) {
                update_changeattr(dir, &res.cinfo);
                nfs_post_op_update_inode(dir, res.dir_attr);
@@ -2235,7 +2559,8 @@ static struct nfs4_createdata *nfs4_alloc_createdata(struct inode *dir,
 
 static int nfs4_do_create(struct inode *dir, struct dentry *dentry, struct nfs4_createdata *data)
 {
-       int status = rpc_call_sync(NFS_CLIENT(dir), &data->msg, 0);
+       int status = nfs4_call_sync(NFS_SERVER(dir), &data->msg,
+                                   &data->arg, &data->res, 1);
        if (status == 0) {
                update_changeattr(dir, &data->res.dir_cinfo);
                nfs_post_op_update_inode(dir, data->res.dir_fattr);
@@ -2344,7 +2669,7 @@ static int _nfs4_proc_readdir(struct dentry *dentry, struct rpc_cred *cred,
                        (unsigned long long)cookie);
        nfs4_setup_readdir(cookie, NFS_COOKIEVERF(dir), dentry, &args);
        res.pgbase = args.pgbase;
-       status = rpc_call_sync(NFS_CLIENT(dir), &msg, 0);
+       status = nfs4_call_sync(NFS_SERVER(dir), &msg, &args, &res, 0);
        if (status == 0)
                memcpy(NFS_COOKIEVERF(dir), res.verifier.data, NFS4_VERIFIER_SIZE);
 
@@ -2422,14 +2747,17 @@ static int _nfs4_proc_statfs(struct nfs_server *server, struct nfs_fh *fhandle,
                .fh = fhandle,
                .bitmask = server->attr_bitmask,
        };
+       struct nfs4_statfs_res res = {
+               .fsstat = fsstat,
+       };
        struct rpc_message msg = {
                .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_STATFS],
                .rpc_argp = &args,
-               .rpc_resp = fsstat,
+               .rpc_resp = &res,
        };
 
        nfs_fattr_init(fsstat->fattr);
-       return rpc_call_sync(server->client, &msg, 0);
+       return  nfs4_call_sync(server, &msg, &args, &res, 0);
 }
 
 static int nfs4_proc_statfs(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fsstat *fsstat)
@@ -2451,13 +2779,16 @@ static int _nfs4_do_fsinfo(struct nfs_server *server, struct nfs_fh *fhandle,
                .fh = fhandle,
                .bitmask = server->attr_bitmask,
        };
+       struct nfs4_fsinfo_res res = {
+               .fsinfo = fsinfo,
+       };
        struct rpc_message msg = {
                .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_FSINFO],
                .rpc_argp = &args,
-               .rpc_resp = fsinfo,
+               .rpc_resp = &res,
        };
 
-       return rpc_call_sync(server->client, &msg, 0);
+       return nfs4_call_sync(server, &msg, &args, &res, 0);
 }
 
 static int nfs4_do_fsinfo(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fsinfo *fsinfo)
@@ -2486,10 +2817,13 @@ static int _nfs4_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle
                .fh = fhandle,
                .bitmask = server->attr_bitmask,
        };
+       struct nfs4_pathconf_res res = {
+               .pathconf = pathconf,
+       };
        struct rpc_message msg = {
                .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_PATHCONF],
                .rpc_argp = &args,
-               .rpc_resp = pathconf,
+               .rpc_resp = &res,
        };
 
        /* None of the pathconf attributes are mandatory to implement */
@@ -2499,7 +2833,7 @@ static int _nfs4_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle
        }
 
        nfs_fattr_init(pathconf->fattr);
-       return rpc_call_sync(server->client, &msg, 0);
+       return nfs4_call_sync(server, &msg, &args, &res, 0);
 }
 
 static int nfs4_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle,
@@ -2742,12 +3076,14 @@ static ssize_t __nfs4_get_acl_uncached(struct inode *inode, void *buf, size_t bu
                .acl_pages = pages,
                .acl_len = buflen,
        };
-       size_t resp_len = buflen;
+       struct nfs_getaclres res = {
+               .acl_len = buflen,
+       };
        void *resp_buf;
        struct rpc_message msg = {
                .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_GETACL],
                .rpc_argp = &args,
-               .rpc_resp = &resp_len,
+               .rpc_resp = &res,
        };
        struct page *localpage = NULL;
        int ret;
@@ -2761,26 +3097,26 @@ static ssize_t __nfs4_get_acl_uncached(struct inode *inode, void *buf, size_t bu
                        return -ENOMEM;
                args.acl_pages[0] = localpage;
                args.acl_pgbase = 0;
-               resp_len = args.acl_len = PAGE_SIZE;
+               args.acl_len = PAGE_SIZE;
        } else {
                resp_buf = buf;
                buf_to_pages(buf, buflen, args.acl_pages, &args.acl_pgbase);
        }
-       ret = rpc_call_sync(NFS_CLIENT(inode), &msg, 0);
+       ret = nfs4_call_sync(NFS_SERVER(inode), &msg, &args, &res, 0);
        if (ret)
                goto out_free;
-       if (resp_len > args.acl_len)
-               nfs4_write_cached_acl(inode, NULL, resp_len);
+       if (res.acl_len > args.acl_len)
+               nfs4_write_cached_acl(inode, NULL, res.acl_len);
        else
-               nfs4_write_cached_acl(inode, resp_buf, resp_len);
+               nfs4_write_cached_acl(inode, resp_buf, res.acl_len);
        if (buf) {
                ret = -ERANGE;
-               if (resp_len > buflen)
+               if (res.acl_len > buflen)
                        goto out_free;
                if (localpage)
-                       memcpy(buf, resp_buf, resp_len);
+                       memcpy(buf, resp_buf, res.acl_len);
        }
-       ret = resp_len;
+       ret = res.acl_len;
 out_free:
        if (localpage)
                __free_page(localpage);
@@ -2827,10 +3163,11 @@ static int __nfs4_proc_set_acl(struct inode *inode, const void *buf, size_t bufl
                .acl_pages      = pages,
                .acl_len        = buflen,
        };
+       struct nfs_setaclres res;
        struct rpc_message msg = {
                .rpc_proc       = &nfs4_procedures[NFSPROC4_CLNT_SETACL],
                .rpc_argp       = &arg,
-               .rpc_resp       = NULL,
+               .rpc_resp       = &res,
        };
        int ret;
 
@@ -2838,7 +3175,7 @@ static int __nfs4_proc_set_acl(struct inode *inode, const void *buf, size_t bufl
                return -EOPNOTSUPP;
        nfs_inode_return_delegation(inode);
        buf_to_pages(buf, buflen, arg.acl_pages, &arg.acl_pgbase);
-       ret = rpc_call_sync(NFS_CLIENT(inode), &msg, 0);
+       ret = nfs4_call_sync(server, &msg, &arg, &res, 1);
        nfs_access_zap_cache(inode);
        nfs_zap_acl_cache(inode);
        return ret;
@@ -3042,6 +3379,7 @@ static int _nfs4_proc_delegreturn(struct inode *inode, struct rpc_cred *cred, co
        memcpy(&data->stateid, stateid, sizeof(data->stateid));
        data->res.fattr = &data->fattr;
        data->res.server = server;
+       data->res.seq_res.sr_slotid = NFS4_MAX_SLOT_TABLE;
        nfs_fattr_init(data->res.fattr);
        data->timestamp = jiffies;
        data->rpc_status = 0;
@@ -3127,7 +3465,7 @@ static int _nfs4_proc_getlk(struct nfs4_state *state, int cmd, struct file_lock
                goto out;
        lsp = request->fl_u.nfs4_fl.owner;
        arg.lock_owner.id = lsp->ls_id.id;
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &arg, &res, 1);
        switch (status) {
                case 0:
                        request->fl_type = F_UNLCK;
@@ -3194,6 +3532,7 @@ static struct nfs4_unlockdata *nfs4_alloc_unlockdata(struct file_lock *fl,
        p->arg.fl = &p->fl;
        p->arg.seqid = seqid;
        p->res.seqid = seqid;
+       p->res.seq_res.sr_slotid = NFS4_MAX_SLOT_TABLE;
        p->arg.stateid = &lsp->ls_stateid;
        p->lsp = lsp;
        atomic_inc(&lsp->ls_count);
@@ -3366,6 +3705,7 @@ static struct nfs4_lockdata *nfs4_alloc_lockdata(struct file_lock *fl,
        p->arg.lock_owner.clientid = server->nfs_client->cl_clientid;
        p->arg.lock_owner.id = lsp->ls_id.id;
        p->res.lock_seqid = p->arg.lock_seqid;
+       p->res.seq_res.sr_slotid = NFS4_MAX_SLOT_TABLE;
        p->lsp = lsp;
        atomic_inc(&lsp->ls_count);
        p->ctx = get_nfs_open_context(ctx);
@@ -3706,10 +4046,13 @@ int nfs4_proc_fs_locations(struct inode *dir, const struct qstr *name,
                .page = page,
                .bitmask = bitmask,
        };
+       struct nfs4_fs_locations_res res = {
+               .fs_locations = fs_locations,
+       };
        struct rpc_message msg = {
                .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_FS_LOCATIONS],
                .rpc_argp = &args,
-               .rpc_resp = fs_locations,
+               .rpc_resp = &res,
        };
        int status;
 
@@ -3717,12 +4060,46 @@ int nfs4_proc_fs_locations(struct inode *dir, const struct qstr *name,
        nfs_fattr_init(&fs_locations->fattr);
        fs_locations->server = server;
        fs_locations->nlocations = 0;
-       status = rpc_call_sync(server->client, &msg, 0);
+       status = nfs4_call_sync(server, &msg, &args, &res, 0);
        nfs_fixup_referral_attributes(&fs_locations->fattr);
        dprintk("%s: returned status = %d\n", __func__, status);
        return status;
 }
 
+#ifdef CONFIG_NFS_V4_1
+/* Destroy the slot table */
+static void nfs4_destroy_slot_table(struct nfs4_session *session)
+{
+       if (session->fc_slot_table.slots == NULL)
+               return;
+       kfree(session->fc_slot_table.slots);
+       session->fc_slot_table.slots = NULL;
+       return;
+}
+
+struct nfs4_session *nfs4_alloc_session(struct nfs_client *clp)
+{
+       struct nfs4_session *session;
+       struct nfs4_slot_table *tbl;
+
+       session = kzalloc(sizeof(struct nfs4_session), GFP_KERNEL);
+       if (!session)
+               return NULL;
+       tbl = &session->fc_slot_table;
+       spin_lock_init(&tbl->slot_tbl_lock);
+       rpc_init_wait_queue(&tbl->slot_tbl_waitq, "Slot table");
+       session->clp = clp;
+       return session;
+}
+
+void nfs4_destroy_session(struct nfs4_session *session)
+{
+       nfs4_destroy_slot_table(session);
+       kfree(session);
+}
+
+#endif /* CONFIG_NFS_V4_1 */
+
 struct nfs4_state_recovery_ops nfs4_reboot_recovery_ops = {
        .owner_flag_bit = NFS_OWNER_RECLAIM_REBOOT,
        .state_flag_bit = NFS_STATE_RECLAIM_REBOOT,