Merge tag 'ext4_for_linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tytso...
[linux-2.6-block.git] / drivers / staging / lustre / lustre / mdc / mdc_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 # include <linux/module.h>
36 # include <linux/kernel.h>
37
38 #include "../include/obd_class.h"
39 #include "mdc_internal.h"
40 #include "../include/lustre_fid.h"
41
42 /* mdc_setattr does its own semaphore handling */
43 static int mdc_reint(struct ptlrpc_request *request,
44                      struct mdc_rpc_lock *rpc_lock,
45                      int level)
46 {
47         int rc;
48
49         request->rq_send_state = level;
50
51         mdc_get_rpc_lock(rpc_lock, NULL);
52         rc = ptlrpc_queue_wait(request);
53         mdc_put_rpc_lock(rpc_lock, NULL);
54         if (rc)
55                 CDEBUG(D_INFO, "error in handling %d\n", rc);
56         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY))
57                 rc = -EPROTO;
58
59         return rc;
60 }
61
62 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
63  * found by @fid. Found locks are added into @cancel list. Returns the amount of
64  * locks added to @cancels list.
65  */
66 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
67                             struct list_head *cancels, enum ldlm_mode mode,
68                             __u64 bits)
69 {
70         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
71         ldlm_policy_data_t policy = {};
72         struct ldlm_res_id res_id;
73         struct ldlm_resource *res;
74         int count;
75
76         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
77          * export) but disabled through procfs (flag in NS).
78          *
79          * This distinguishes from a case when ELC is not supported originally,
80          * when we still want to cancel locks in advance and just cancel them
81          * locally, without sending any RPC.
82          */
83         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
84                 return 0;
85
86         fid_build_reg_res_name(fid, &res_id);
87         res = ldlm_resource_get(exp->exp_obd->obd_namespace,
88                                 NULL, &res_id, 0, 0);
89         if (!res)
90                 return 0;
91         LDLM_RESOURCE_ADDREF(res);
92         /* Initialize ibits lock policy. */
93         policy.l_inodebits.bits = bits;
94         count = ldlm_cancel_resource_local(res, cancels, &policy,
95                                            mode, 0, 0, NULL);
96         LDLM_RESOURCE_DELREF(res);
97         ldlm_resource_putref(res);
98         return count;
99 }
100
101 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
102                 void *ea, int ealen, void *ea2, int ea2len,
103                 struct ptlrpc_request **request, struct md_open_data **mod)
104 {
105         LIST_HEAD(cancels);
106         struct ptlrpc_request *req;
107         struct mdc_rpc_lock *rpc_lock;
108         struct obd_device *obd = exp->exp_obd;
109         int count = 0, rc;
110         __u64 bits;
111
112         bits = MDS_INODELOCK_UPDATE;
113         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
114                 bits |= MDS_INODELOCK_LOOKUP;
115         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
116             (fid_is_sane(&op_data->op_fid1)) &&
117             !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
118                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
119                                                 &cancels, LCK_EX, bits);
120         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
121                                    &RQF_MDS_REINT_SETATTR);
122         if (!req) {
123                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
124                 return -ENOMEM;
125         }
126         if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
127                 req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
128                                      0);
129         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
130         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
131                              ea2len);
132
133         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
134         if (rc) {
135                 ptlrpc_request_free(req);
136                 return rc;
137         }
138
139         rpc_lock = obd->u.cli.cl_rpc_lock;
140
141         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
142                 CDEBUG(D_INODE, "setting mtime %ld, ctime %ld\n",
143                        LTIME_S(op_data->op_attr.ia_mtime),
144                        LTIME_S(op_data->op_attr.ia_ctime));
145         mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
146
147         ptlrpc_request_set_replen(req);
148         if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
149             req->rq_import->imp_replayable) {
150                 LASSERT(!*mod);
151
152                 *mod = obd_mod_alloc();
153                 if (!*mod) {
154                         DEBUG_REQ(D_ERROR, req, "Can't allocate md_open_data");
155                 } else {
156                         req->rq_replay = 1;
157                         req->rq_cb_data = *mod;
158                         (*mod)->mod_open_req = req;
159                         req->rq_commit_cb = mdc_commit_open;
160                         (*mod)->mod_is_create = true;
161                         /**
162                          * Take an extra reference on \var mod, it protects \var
163                          * mod from being freed on eviction (commit callback is
164                          * called despite rq_replay flag).
165                          * Will be put on mdc_done_writing().
166                          */
167                         obd_mod_get(*mod);
168                 }
169         }
170
171         rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
172
173         /* Save the obtained info in the original RPC for the replay case. */
174         if (rc == 0 && (op_data->op_flags & MF_EPOCH_OPEN)) {
175                 struct mdt_ioepoch *epoch;
176                 struct mdt_body  *body;
177
178                 epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
179                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
180                 epoch->handle = body->handle;
181                 epoch->ioepoch = body->ioepoch;
182                 req->rq_replay_cb = mdc_replay_open;
183         /** bug 3633, open may be committed and estale answer is not error */
184         } else if (rc == -ESTALE && (op_data->op_flags & MF_SOM_CHANGE)) {
185                 rc = 0;
186         } else if (rc == -ERESTARTSYS) {
187                 rc = 0;
188         }
189         *request = req;
190         if (rc && req->rq_commit_cb) {
191                 /* Put an extra reference on \var mod on error case. */
192                 if (mod && *mod)
193                         obd_mod_put(*mod);
194                 req->rq_commit_cb(req);
195         }
196         return rc;
197 }
198
199 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
200                const void *data, int datalen, int mode, __u32 uid, __u32 gid,
201                cfs_cap_t cap_effective, __u64 rdev,
202                struct ptlrpc_request **request)
203 {
204         struct ptlrpc_request *req;
205         int level, rc;
206         int count, resends = 0;
207         struct obd_import *import = exp->exp_obd->u.cli.cl_import;
208         int generation = import->imp_generation;
209         LIST_HEAD(cancels);
210
211         /* For case if upper layer did not alloc fid, do it now. */
212         if (!fid_is_sane(&op_data->op_fid2)) {
213                 /*
214                  * mdc_fid_alloc() may return errno 1 in case of switch to new
215                  * sequence, handle this.
216                  */
217                 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
218                 if (rc < 0) {
219                         CERROR("Can't alloc new fid, rc %d\n", rc);
220                         return rc;
221                 }
222         }
223
224 rebuild:
225         count = 0;
226         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
227             (fid_is_sane(&op_data->op_fid1)))
228                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
229                                                 &cancels, LCK_EX,
230                                                 MDS_INODELOCK_UPDATE);
231
232         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
233                                    &RQF_MDS_REINT_CREATE_ACL);
234         if (!req) {
235                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
236                 return -ENOMEM;
237         }
238         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
239                              op_data->op_namelen + 1);
240         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
241                              data && datalen ? datalen : 0);
242
243         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
244         if (rc) {
245                 ptlrpc_request_free(req);
246                 return rc;
247         }
248
249         /*
250          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
251          * tgt, for symlinks or lov MD data.
252          */
253         mdc_create_pack(req, op_data, data, datalen, mode, uid,
254                         gid, cap_effective, rdev);
255
256         ptlrpc_request_set_replen(req);
257
258         /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
259          * logic here
260          */
261         req->rq_no_retry_einprogress = 1;
262
263         if (resends) {
264                 req->rq_generation_set = 1;
265                 req->rq_import_generation = generation;
266                 req->rq_sent = ktime_get_real_seconds() + resends;
267         }
268         level = LUSTRE_IMP_FULL;
269  resend:
270         rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
271
272         /* Resend if we were told to. */
273         if (rc == -ERESTARTSYS) {
274                 level = LUSTRE_IMP_RECOVER;
275                 goto resend;
276         } else if (rc == -EINPROGRESS) {
277                 /* Retry create infinitely until succeed or get other
278                  * error code.
279                  */
280                 ptlrpc_req_finished(req);
281                 resends++;
282
283                 CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
284                        exp->exp_obd->obd_name, resends,
285                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
286
287                 if (generation == import->imp_generation) {
288                         goto rebuild;
289                 } else {
290                         CDEBUG(D_HA, "resend cross eviction\n");
291                         return -EIO;
292                 }
293         }
294
295         *request = req;
296         return rc;
297 }
298
299 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
300                struct ptlrpc_request **request)
301 {
302         LIST_HEAD(cancels);
303         struct obd_device *obd = class_exp2obd(exp);
304         struct ptlrpc_request *req = *request;
305         int count = 0, rc;
306
307         LASSERT(!req);
308
309         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
310             (fid_is_sane(&op_data->op_fid1)) &&
311             !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
312                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
313                                                 &cancels, LCK_EX,
314                                                 MDS_INODELOCK_UPDATE);
315         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
316             (fid_is_sane(&op_data->op_fid3)) &&
317             !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
318                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
319                                                  &cancels, LCK_EX,
320                                                  MDS_INODELOCK_FULL);
321         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
322                                    &RQF_MDS_REINT_UNLINK);
323         if (!req) {
324                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
325                 return -ENOMEM;
326         }
327         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
328                              op_data->op_namelen + 1);
329
330         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
331         if (rc) {
332                 ptlrpc_request_free(req);
333                 return rc;
334         }
335
336         mdc_unlink_pack(req, op_data);
337
338         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
339                              obd->u.cli.cl_default_mds_easize);
340         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
341                              obd->u.cli.cl_default_mds_cookiesize);
342         ptlrpc_request_set_replen(req);
343
344         *request = req;
345
346         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
347         if (rc == -ERESTARTSYS)
348                 rc = 0;
349         return rc;
350 }
351
352 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
353              struct ptlrpc_request **request)
354 {
355         LIST_HEAD(cancels);
356         struct obd_device *obd = exp->exp_obd;
357         struct ptlrpc_request *req;
358         int count = 0, rc;
359
360         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
361             (fid_is_sane(&op_data->op_fid2)))
362                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
363                                                 &cancels, LCK_EX,
364                                                 MDS_INODELOCK_UPDATE);
365         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
366             (fid_is_sane(&op_data->op_fid1)))
367                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
368                                                  &cancels, LCK_EX,
369                                                  MDS_INODELOCK_UPDATE);
370
371         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
372         if (!req) {
373                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
374                 return -ENOMEM;
375         }
376         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
377                              op_data->op_namelen + 1);
378
379         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
380         if (rc) {
381                 ptlrpc_request_free(req);
382                 return rc;
383         }
384
385         mdc_link_pack(req, op_data);
386         ptlrpc_request_set_replen(req);
387
388         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
389         *request = req;
390         if (rc == -ERESTARTSYS)
391                 rc = 0;
392
393         return rc;
394 }
395
396 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
397                const char *old, int oldlen, const char *new, int newlen,
398                struct ptlrpc_request **request)
399 {
400         LIST_HEAD(cancels);
401         struct obd_device *obd = exp->exp_obd;
402         struct ptlrpc_request *req;
403         int count = 0, rc;
404
405         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
406             (fid_is_sane(&op_data->op_fid1)))
407                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
408                                                 &cancels, LCK_EX,
409                                                 MDS_INODELOCK_UPDATE);
410         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
411             (fid_is_sane(&op_data->op_fid2)))
412                 count += mdc_resource_get_unused(exp, &op_data->op_fid2,
413                                                  &cancels, LCK_EX,
414                                                  MDS_INODELOCK_UPDATE);
415         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
416             (fid_is_sane(&op_data->op_fid3)))
417                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
418                                                  &cancels, LCK_EX,
419                                                  MDS_INODELOCK_LOOKUP);
420         if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
421             (fid_is_sane(&op_data->op_fid4)))
422                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
423                                                  &cancels, LCK_EX,
424                                                  MDS_INODELOCK_FULL);
425
426         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
427                                    &RQF_MDS_REINT_RENAME);
428         if (!req) {
429                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
430                 return -ENOMEM;
431         }
432
433         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
434         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
435
436         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
437         if (rc) {
438                 ptlrpc_request_free(req);
439                 return rc;
440         }
441
442         if (exp_connect_cancelset(exp) && req)
443                 ldlm_cli_cancel_list(&cancels, count, req, 0);
444
445         mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
446
447         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
448                              obd->u.cli.cl_default_mds_easize);
449         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
450                              obd->u.cli.cl_default_mds_cookiesize);
451         ptlrpc_request_set_replen(req);
452
453         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
454         *request = req;
455         if (rc == -ERESTARTSYS)
456                 rc = 0;
457
458         return rc;
459 }