Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jikos/hid
[linux-2.6-block.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 # include <linux/module.h>
40 # include <linux/pagemap.h>
41 # include <linux/miscdevice.h>
42 # include <linux/init.h>
43
44 #include <lustre_acl.h>
45 #include <obd_class.h>
46 #include <lustre_dlm.h>
47 /* fid_res_name_eq() */
48 #include <lustre_fid.h>
49 #include <lprocfs_status.h>
50 #include "mdc_internal.h"
51
52 struct mdc_getattr_args {
53         struct obd_export          *ga_exp;
54         struct md_enqueue_info      *ga_minfo;
55         struct ldlm_enqueue_info    *ga_einfo;
56 };
57
58 int it_disposition(struct lookup_intent *it, int flag)
59 {
60         return it->d.lustre.it_disposition & flag;
61 }
62 EXPORT_SYMBOL(it_disposition);
63
64 void it_set_disposition(struct lookup_intent *it, int flag)
65 {
66         it->d.lustre.it_disposition |= flag;
67 }
68 EXPORT_SYMBOL(it_set_disposition);
69
70 void it_clear_disposition(struct lookup_intent *it, int flag)
71 {
72         it->d.lustre.it_disposition &= ~flag;
73 }
74 EXPORT_SYMBOL(it_clear_disposition);
75
76 int it_open_error(int phase, struct lookup_intent *it)
77 {
78         if (it_disposition(it, DISP_OPEN_OPEN)) {
79                 if (phase >= DISP_OPEN_OPEN)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_OPEN_CREATE)) {
86                 if (phase >= DISP_OPEN_CREATE)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91
92         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
93                 if (phase >= DISP_LOOKUP_EXECD)
94                         return it->d.lustre.it_status;
95                 else
96                         return 0;
97         }
98
99         if (it_disposition(it, DISP_IT_EXECD)) {
100                 if (phase >= DISP_IT_EXECD)
101                         return it->d.lustre.it_status;
102                 else
103                         return 0;
104         }
105         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
106                it->d.lustre.it_status);
107         LBUG();
108         return 0;
109 }
110 EXPORT_SYMBOL(it_open_error);
111
112 /* this must be called on a lockh that is known to have a referenced lock */
113 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
114                       __u64 *bits)
115 {
116         struct ldlm_lock *lock;
117         struct inode *new_inode = data;
118         ENTRY;
119
120         if(bits)
121                 *bits = 0;
122
123         if (!*lockh)
124                 RETURN(0);
125
126         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
127
128         LASSERT(lock != NULL);
129         lock_res_and_lock(lock);
130         if (lock->l_resource->lr_lvb_inode &&
131             lock->l_resource->lr_lvb_inode != data) {
132                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
133                 LASSERTF(old_inode->i_state & I_FREEING,
134                          "Found existing inode %p/%lu/%u state %lu in lock: "
135                          "setting data to %p/%lu/%u\n", old_inode,
136                          old_inode->i_ino, old_inode->i_generation,
137                          old_inode->i_state,
138                          new_inode, new_inode->i_ino, new_inode->i_generation);
139         }
140         lock->l_resource->lr_lvb_inode = new_inode;
141         if (bits)
142                 *bits = lock->l_policy_data.l_inodebits.bits;
143
144         unlock_res_and_lock(lock);
145         LDLM_LOCK_PUT(lock);
146
147         RETURN(0);
148 }
149
150 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
151                            const struct lu_fid *fid, ldlm_type_t type,
152                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
153                            struct lustre_handle *lockh)
154 {
155         struct ldlm_res_id res_id;
156         ldlm_mode_t rc;
157         ENTRY;
158
159         fid_build_reg_res_name(fid, &res_id);
160         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
161                              &res_id, type, policy, mode, lockh, 0);
162         RETURN(rc);
163 }
164
165 int mdc_cancel_unused(struct obd_export *exp,
166                       const struct lu_fid *fid,
167                       ldlm_policy_data_t *policy,
168                       ldlm_mode_t mode,
169                       ldlm_cancel_flags_t flags,
170                       void *opaque)
171 {
172         struct ldlm_res_id res_id;
173         struct obd_device *obd = class_exp2obd(exp);
174         int rc;
175
176         ENTRY;
177
178         fid_build_reg_res_name(fid, &res_id);
179         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
180                                              policy, mode, flags, opaque);
181         RETURN(rc);
182 }
183
184 int mdc_null_inode(struct obd_export *exp,
185                    const struct lu_fid *fid)
186 {
187         struct ldlm_res_id res_id;
188         struct ldlm_resource *res;
189         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
190         ENTRY;
191
192         LASSERTF(ns != NULL, "no namespace passed\n");
193
194         fid_build_reg_res_name(fid, &res_id);
195
196         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
197         if(res == NULL)
198                 RETURN(0);
199
200         lock_res(res);
201         res->lr_lvb_inode = NULL;
202         unlock_res(res);
203
204         ldlm_resource_putref(res);
205         RETURN(0);
206 }
207
208 /* find any ldlm lock of the inode in mdc
209  * return 0    not find
210  *      1    find one
211  *      < 0    error */
212 int mdc_find_cbdata(struct obd_export *exp,
213                     const struct lu_fid *fid,
214                     ldlm_iterator_t it, void *data)
215 {
216         struct ldlm_res_id res_id;
217         int rc = 0;
218         ENTRY;
219
220         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
221         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
222                                    it, data);
223         if (rc == LDLM_ITER_STOP)
224                 RETURN(1);
225         else if (rc == LDLM_ITER_CONTINUE)
226                 RETURN(0);
227         RETURN(rc);
228 }
229
230 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
231 {
232         /* Don't hold error requests for replay. */
233         if (req->rq_replay) {
234                 spin_lock(&req->rq_lock);
235                 req->rq_replay = 0;
236                 spin_unlock(&req->rq_lock);
237         }
238         if (rc && req->rq_transno != 0) {
239                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240                 LBUG();
241         }
242 }
243
244 /* Save a large LOV EA into the request buffer so that it is available
245  * for replay.  We don't do this in the initial request because the
246  * original request doesn't need this buffer (at most it sends just the
247  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
248  * buffer and may also be difficult to allocate and save a very large
249  * request buffer for each open. (bug 5707)
250  *
251  * OOM here may cause recovery failure if lmm is needed (only for the
252  * original open if the MDS crashed just when this client also OOM'd)
253  * but this is incredibly unlikely, and questionable whether the client
254  * could do MDS recovery under OOM anyways... */
255 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
256                                 struct mdt_body *body)
257 {
258         int     rc;
259
260         /* FIXME: remove this explicit offset. */
261         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
262                                         body->eadatasize);
263         if (rc) {
264                 CERROR("Can't enlarge segment %d size to %d\n",
265                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
266                 body->valid &= ~OBD_MD_FLEASIZE;
267                 body->eadatasize = 0;
268         }
269 }
270
271 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
272                                                    struct lookup_intent *it,
273                                                    struct md_op_data *op_data,
274                                                    void *lmm, int lmmsize,
275                                                    void *cb_data)
276 {
277         struct ptlrpc_request *req;
278         struct obd_device     *obddev = class_exp2obd(exp);
279         struct ldlm_intent    *lit;
280         LIST_HEAD(cancels);
281         int                 count = 0;
282         int                 mode;
283         int                 rc;
284         ENTRY;
285
286         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
287
288         /* XXX: openlock is not cancelled for cross-refs. */
289         /* If inode is known, cancel conflicting OPEN locks. */
290         if (fid_is_sane(&op_data->op_fid2)) {
291                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
292                         mode = LCK_CW;
293 #ifdef FMODE_EXEC
294                 else if (it->it_flags & FMODE_EXEC)
295                         mode = LCK_PR;
296 #endif
297                 else
298                         mode = LCK_CR;
299                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300                                                 &cancels, mode,
301                                                 MDS_INODELOCK_OPEN);
302         }
303
304         /* If CREATE, cancel parent's UPDATE lock. */
305         if (it->it_op & IT_CREAT)
306                 mode = LCK_EX;
307         else
308                 mode = LCK_CR;
309         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
310                                          &cancels, mode,
311                                          MDS_INODELOCK_UPDATE);
312
313         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
314                                    &RQF_LDLM_INTENT_OPEN);
315         if (req == NULL) {
316                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
317                 RETURN(ERR_PTR(-ENOMEM));
318         }
319
320         /* parent capability */
321         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
322         /* child capability, reserve the size according to parent capa, it will
323          * be filled after we get the reply */
324         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
325
326         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
327                              op_data->op_namelen + 1);
328         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
329                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
330
331         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
332         if (rc) {
333                 ptlrpc_request_free(req);
334                 return NULL;
335         }
336
337         spin_lock(&req->rq_lock);
338         req->rq_replay = req->rq_import->imp_replayable;
339         spin_unlock(&req->rq_lock);
340
341         /* pack the intent */
342         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
343         lit->opc = (__u64)it->it_op;
344
345         /* pack the intended request */
346         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
347                       lmmsize);
348
349         /* for remote client, fetch remote perm for current user */
350         if (client_is_remote(exp))
351                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
352                                      sizeof(struct mdt_remote_perm));
353         ptlrpc_request_set_replen(req);
354         return req;
355 }
356
357 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
358                                                      struct lookup_intent *it,
359                                                      struct md_op_data *op_data)
360 {
361         struct ptlrpc_request *req;
362         struct obd_device     *obddev = class_exp2obd(exp);
363         struct ldlm_intent    *lit;
364         int                 rc;
365         ENTRY;
366
367         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
368                                    &RQF_LDLM_INTENT_UNLINK);
369         if (req == NULL)
370                 RETURN(ERR_PTR(-ENOMEM));
371
372         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
373         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
374                              op_data->op_namelen + 1);
375
376         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
377         if (rc) {
378                 ptlrpc_request_free(req);
379                 RETURN(ERR_PTR(rc));
380         }
381
382         /* pack the intent */
383         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
384         lit->opc = (__u64)it->it_op;
385
386         /* pack the intended request */
387         mdc_unlink_pack(req, op_data);
388
389         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
390                              obddev->u.cli.cl_max_mds_easize);
391         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
392                              obddev->u.cli.cl_max_mds_cookiesize);
393         ptlrpc_request_set_replen(req);
394         RETURN(req);
395 }
396
397 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
398                                                       struct lookup_intent *it,
399                                                       struct md_op_data *op_data)
400 {
401         struct ptlrpc_request *req;
402         struct obd_device     *obddev = class_exp2obd(exp);
403         obd_valid             valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
404                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
405                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
406                                        (client_is_remote(exp) ?
407                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
408         struct ldlm_intent    *lit;
409         int                 rc;
410         ENTRY;
411
412         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
413                                    &RQF_LDLM_INTENT_GETATTR);
414         if (req == NULL)
415                 RETURN(ERR_PTR(-ENOMEM));
416
417         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
418         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
419                              op_data->op_namelen + 1);
420
421         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
422         if (rc) {
423                 ptlrpc_request_free(req);
424                 RETURN(ERR_PTR(rc));
425         }
426
427         /* pack the intent */
428         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
429         lit->opc = (__u64)it->it_op;
430
431         /* pack the intended request */
432         mdc_getattr_pack(req, valid, it->it_flags, op_data,
433                          obddev->u.cli.cl_max_mds_easize);
434
435         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
436                              obddev->u.cli.cl_max_mds_easize);
437         if (client_is_remote(exp))
438                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
439                                      sizeof(struct mdt_remote_perm));
440         ptlrpc_request_set_replen(req);
441         RETURN(req);
442 }
443
444 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
445                                                      struct lookup_intent *it,
446                                                      struct md_op_data *unused)
447 {
448         struct obd_device     *obd = class_exp2obd(exp);
449         struct ptlrpc_request *req;
450         struct ldlm_intent    *lit;
451         struct layout_intent  *layout;
452         int rc;
453         ENTRY;
454
455         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
456                                 &RQF_LDLM_INTENT_LAYOUT);
457         if (req == NULL)
458                 RETURN(ERR_PTR(-ENOMEM));
459
460         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
461         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
462         if (rc) {
463                 ptlrpc_request_free(req);
464                 RETURN(ERR_PTR(rc));
465         }
466
467         /* pack the intent */
468         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
469         lit->opc = (__u64)it->it_op;
470
471         /* pack the layout intent request */
472         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
473         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
474          * set for replication */
475         layout->li_opc = LAYOUT_INTENT_ACCESS;
476
477         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
478                         obd->u.cli.cl_max_mds_easize);
479         ptlrpc_request_set_replen(req);
480         RETURN(req);
481 }
482
483 static struct ptlrpc_request *
484 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
485 {
486         struct ptlrpc_request *req;
487         int rc;
488         ENTRY;
489
490         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
491         if (req == NULL)
492                 RETURN(ERR_PTR(-ENOMEM));
493
494         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
495         if (rc) {
496                 ptlrpc_request_free(req);
497                 RETURN(ERR_PTR(rc));
498         }
499
500         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
501         ptlrpc_request_set_replen(req);
502         RETURN(req);
503 }
504
505 static int mdc_finish_enqueue(struct obd_export *exp,
506                               struct ptlrpc_request *req,
507                               struct ldlm_enqueue_info *einfo,
508                               struct lookup_intent *it,
509                               struct lustre_handle *lockh,
510                               int rc)
511 {
512         struct req_capsule  *pill = &req->rq_pill;
513         struct ldlm_request *lockreq;
514         struct ldlm_reply   *lockrep;
515         struct lustre_intent_data *intent = &it->d.lustre;
516         struct ldlm_lock    *lock;
517         void            *lvb_data = NULL;
518         int               lvb_len = 0;
519         ENTRY;
520
521         LASSERT(rc >= 0);
522         /* Similarly, if we're going to replay this request, we don't want to
523          * actually get a lock, just perform the intent. */
524         if (req->rq_transno || req->rq_replay) {
525                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
526                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
527         }
528
529         if (rc == ELDLM_LOCK_ABORTED) {
530                 einfo->ei_mode = 0;
531                 memset(lockh, 0, sizeof(*lockh));
532                 rc = 0;
533         } else { /* rc = 0 */
534                 lock = ldlm_handle2lock(lockh);
535                 LASSERT(lock != NULL);
536
537                 /* If the server gave us back a different lock mode, we should
538                  * fix up our variables. */
539                 if (lock->l_req_mode != einfo->ei_mode) {
540                         ldlm_lock_addref(lockh, lock->l_req_mode);
541                         ldlm_lock_decref(lockh, einfo->ei_mode);
542                         einfo->ei_mode = lock->l_req_mode;
543                 }
544                 LDLM_LOCK_PUT(lock);
545         }
546
547         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
548         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
549
550         intent->it_disposition = (int)lockrep->lock_policy_res1;
551         intent->it_status = (int)lockrep->lock_policy_res2;
552         intent->it_lock_mode = einfo->ei_mode;
553         intent->it_lock_handle = lockh->cookie;
554         intent->it_data = req;
555
556         /* Technically speaking rq_transno must already be zero if
557          * it_status is in error, so the check is a bit redundant */
558         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
559                 mdc_clear_replay_flag(req, intent->it_status);
560
561         /* If we're doing an IT_OPEN which did not result in an actual
562          * successful open, then we need to remove the bit which saves
563          * this request for unconditional replay.
564          *
565          * It's important that we do this first!  Otherwise we might exit the
566          * function without doing so, and try to replay a failed create
567          * (bug 3440) */
568         if (it->it_op & IT_OPEN && req->rq_replay &&
569             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
570                 mdc_clear_replay_flag(req, intent->it_status);
571
572         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
573                   it->it_op, intent->it_disposition, intent->it_status);
574
575         /* We know what to expect, so we do any byte flipping required here */
576         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
577                 struct mdt_body *body;
578
579                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
580                 if (body == NULL) {
581                         CERROR ("Can't swab mdt_body\n");
582                         RETURN (-EPROTO);
583                 }
584
585                 if (it_disposition(it, DISP_OPEN_OPEN) &&
586                     !it_open_error(DISP_OPEN_OPEN, it)) {
587                         /*
588                          * If this is a successful OPEN request, we need to set
589                          * replay handler and data early, so that if replay
590                          * happens immediately after swabbing below, new reply
591                          * is swabbed by that handler correctly.
592                          */
593                         mdc_set_open_replay_data(NULL, NULL, req);
594                 }
595
596                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
597                         void *eadata;
598
599                         mdc_update_max_ea_from_body(exp, body);
600
601                         /*
602                          * The eadata is opaque; just check that it is there.
603                          * Eventually, obd_unpackmd() will check the contents.
604                          */
605                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
606                                                               body->eadatasize);
607                         if (eadata == NULL)
608                                 RETURN(-EPROTO);
609
610                         /* save lvb data and length in case this is for layout
611                          * lock */
612                         lvb_data = eadata;
613                         lvb_len = body->eadatasize;
614
615                         /*
616                          * We save the reply LOV EA in case we have to replay a
617                          * create for recovery.  If we didn't allocate a large
618                          * enough request buffer above we need to reallocate it
619                          * here to hold the actual LOV EA.
620                          *
621                          * To not save LOV EA if request is not going to replay
622                          * (for example error one).
623                          */
624                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
625                                 void *lmm;
626                                 if (req_capsule_get_size(pill, &RMF_EADATA,
627                                                          RCL_CLIENT) <
628                                     body->eadatasize)
629                                         mdc_realloc_openmsg(req, body);
630                                 else
631                                         req_capsule_shrink(pill, &RMF_EADATA,
632                                                            body->eadatasize,
633                                                            RCL_CLIENT);
634
635                                 req_capsule_set_size(pill, &RMF_EADATA,
636                                                      RCL_CLIENT,
637                                                      body->eadatasize);
638
639                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
640                                 if (lmm)
641                                         memcpy(lmm, eadata, body->eadatasize);
642                         }
643                 }
644
645                 if (body->valid & OBD_MD_FLRMTPERM) {
646                         struct mdt_remote_perm *perm;
647
648                         LASSERT(client_is_remote(exp));
649                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
650                                                 lustre_swab_mdt_remote_perm);
651                         if (perm == NULL)
652                                 RETURN(-EPROTO);
653                 }
654                 if (body->valid & OBD_MD_FLMDSCAPA) {
655                         struct lustre_capa *capa, *p;
656
657                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
658                         if (capa == NULL)
659                                 RETURN(-EPROTO);
660
661                         if (it->it_op & IT_OPEN) {
662                                 /* client fid capa will be checked in replay */
663                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
664                                 LASSERT(p);
665                                 *p = *capa;
666                         }
667                 }
668                 if (body->valid & OBD_MD_FLOSSCAPA) {
669                         struct lustre_capa *capa;
670
671                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
672                         if (capa == NULL)
673                                 RETURN(-EPROTO);
674                 }
675         } else if (it->it_op & IT_LAYOUT) {
676                 /* maybe the lock was granted right away and layout
677                  * is packed into RMF_DLM_LVB of req */
678                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
679                 if (lvb_len > 0) {
680                         lvb_data = req_capsule_server_sized_get(pill,
681                                                         &RMF_DLM_LVB, lvb_len);
682                         if (lvb_data == NULL)
683                                 RETURN(-EPROTO);
684                 }
685         }
686
687         /* fill in stripe data for layout lock */
688         lock = ldlm_handle2lock(lockh);
689         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
690                 void *lmm;
691
692                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
693                         ldlm_it2str(it->it_op), lvb_len);
694
695                 OBD_ALLOC_LARGE(lmm, lvb_len);
696                 if (lmm == NULL) {
697                         LDLM_LOCK_PUT(lock);
698                         RETURN(-ENOMEM);
699                 }
700                 memcpy(lmm, lvb_data, lvb_len);
701
702                 /* install lvb_data */
703                 lock_res_and_lock(lock);
704                 if (lock->l_lvb_data == NULL) {
705                         lock->l_lvb_data = lmm;
706                         lock->l_lvb_len = lvb_len;
707                         lmm = NULL;
708                 }
709                 unlock_res_and_lock(lock);
710                 if (lmm != NULL)
711                         OBD_FREE_LARGE(lmm, lvb_len);
712         }
713         if (lock != NULL)
714                 LDLM_LOCK_PUT(lock);
715
716         RETURN(rc);
717 }
718
719 /* We always reserve enough space in the reply packet for a stripe MD, because
720  * we don't know in advance the file type. */
721 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
722                 struct lookup_intent *it, struct md_op_data *op_data,
723                 struct lustre_handle *lockh, void *lmm, int lmmsize,
724                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
725 {
726         struct obd_device     *obddev = class_exp2obd(exp);
727         struct ptlrpc_request *req = NULL;
728         __u64             flags, saved_flags = extra_lock_flags;
729         int                 rc;
730         struct ldlm_res_id res_id;
731         static const ldlm_policy_data_t lookup_policy =
732                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
733         static const ldlm_policy_data_t update_policy =
734                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
735         static const ldlm_policy_data_t layout_policy =
736                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
737         ldlm_policy_data_t const *policy = &lookup_policy;
738         int                 generation, resends = 0;
739         struct ldlm_reply     *lockrep;
740         enum lvb_type          lvb_type = 0;
741         ENTRY;
742
743         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
744                  einfo->ei_type);
745
746         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
747
748         if (it) {
749                 saved_flags |= LDLM_FL_HAS_INTENT;
750                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
751                         policy = &update_policy;
752                 else if (it->it_op & IT_LAYOUT)
753                         policy = &layout_policy;
754         }
755
756         LASSERT(reqp == NULL);
757
758         generation = obddev->u.cli.cl_import->imp_generation;
759 resend:
760         flags = saved_flags;
761         if (!it) {
762                 /* The only way right now is FLOCK, in this case we hide flock
763                    policy as lmm, but lmmsize is 0 */
764                 LASSERT(lmm && lmmsize == 0);
765                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
766                          einfo->ei_type);
767                 policy = (ldlm_policy_data_t *)lmm;
768                 res_id.name[3] = LDLM_FLOCK;
769         } else if (it->it_op & IT_OPEN) {
770                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
771                                            einfo->ei_cbdata);
772                 policy = &update_policy;
773                 einfo->ei_cbdata = NULL;
774                 lmm = NULL;
775         } else if (it->it_op & IT_UNLINK) {
776                 req = mdc_intent_unlink_pack(exp, it, op_data);
777         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
778                 req = mdc_intent_getattr_pack(exp, it, op_data);
779         } else if (it->it_op & IT_READDIR) {
780                 req = mdc_enqueue_pack(exp, 0);
781         } else if (it->it_op & IT_LAYOUT) {
782                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
783                         RETURN(-EOPNOTSUPP);
784
785                 req = mdc_intent_layout_pack(exp, it, op_data);
786                 lvb_type = LVB_T_LAYOUT;
787         } else {
788                 LBUG();
789                 RETURN(-EINVAL);
790         }
791
792         if (IS_ERR(req))
793                 RETURN(PTR_ERR(req));
794
795         if (req != NULL && it && it->it_op & IT_CREAT)
796                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
797                  * retry logic */
798                 req->rq_no_retry_einprogress = 1;
799
800         if (resends) {
801                 req->rq_generation_set = 1;
802                 req->rq_import_generation = generation;
803                 req->rq_sent = cfs_time_current_sec() + resends;
804         }
805
806         /* It is important to obtain rpc_lock first (if applicable), so that
807          * threads that are serialised with rpc_lock are not polluting our
808          * rpcs in flight counter. We do not do flock request limiting, though*/
809         if (it) {
810                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
811                 rc = mdc_enter_request(&obddev->u.cli);
812                 if (rc != 0) {
813                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
814                         mdc_clear_replay_flag(req, 0);
815                         ptlrpc_req_finished(req);
816                         RETURN(rc);
817                 }
818         }
819
820         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
821                               0, lvb_type, lockh, 0);
822         if (!it) {
823                 /* For flock requests we immediatelly return without further
824                    delay and let caller deal with the rest, since rest of
825                    this function metadata processing makes no sense for flock
826                    requests anyway */
827                 RETURN(rc);
828         }
829
830         mdc_exit_request(&obddev->u.cli);
831         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
832
833         if (rc < 0) {
834                 CERROR("ldlm_cli_enqueue: %d\n", rc);
835                 mdc_clear_replay_flag(req, rc);
836                 ptlrpc_req_finished(req);
837                 RETURN(rc);
838         }
839
840         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
841         LASSERT(lockrep != NULL);
842
843         /* Retry the create infinitely when we get -EINPROGRESS from
844          * server. This is required by the new quota design. */
845         if (it && it->it_op & IT_CREAT &&
846             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
847                 mdc_clear_replay_flag(req, rc);
848                 ptlrpc_req_finished(req);
849                 resends++;
850
851                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
852                        obddev->obd_name, resends, it->it_op,
853                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
854
855                 if (generation == obddev->u.cli.cl_import->imp_generation) {
856                         goto resend;
857                 } else {
858                         CDEBUG(D_HA, "resend cross eviction\n");
859                         RETURN(-EIO);
860                 }
861         }
862
863         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
864         if (rc < 0) {
865                 if (lustre_handle_is_used(lockh)) {
866                         ldlm_lock_decref(lockh, einfo->ei_mode);
867                         memset(lockh, 0, sizeof(*lockh));
868                 }
869                 ptlrpc_req_finished(req);
870         }
871         RETURN(rc);
872 }
873
874 static int mdc_finish_intent_lock(struct obd_export *exp,
875                                   struct ptlrpc_request *request,
876                                   struct md_op_data *op_data,
877                                   struct lookup_intent *it,
878                                   struct lustre_handle *lockh)
879 {
880         struct lustre_handle old_lock;
881         struct mdt_body *mdt_body;
882         struct ldlm_lock *lock;
883         int rc;
884
885
886         LASSERT(request != NULL);
887         LASSERT(request != LP_POISON);
888         LASSERT(request->rq_repmsg != LP_POISON);
889
890         if (!it_disposition(it, DISP_IT_EXECD)) {
891                 /* The server failed before it even started executing the
892                  * intent, i.e. because it couldn't unpack the request. */
893                 LASSERT(it->d.lustre.it_status != 0);
894                 RETURN(it->d.lustre.it_status);
895         }
896         rc = it_open_error(DISP_IT_EXECD, it);
897         if (rc)
898                 RETURN(rc);
899
900         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
901         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
902
903         /* If we were revalidating a fid/name pair, mark the intent in
904          * case we fail and get called again from lookup */
905         if (fid_is_sane(&op_data->op_fid2) &&
906             it->it_create_mode & M_CHECK_STALE &&
907             it->it_op != IT_GETATTR) {
908                 it_set_disposition(it, DISP_ENQ_COMPLETE);
909
910                 /* Also: did we find the same inode? */
911                 /* sever can return one of two fids:
912                  * op_fid2 - new allocated fid - if file is created.
913                  * op_fid3 - existent fid - if file only open.
914                  * op_fid3 is saved in lmv_intent_open */
915                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
916                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
917                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
918                                "\n", PFID(&op_data->op_fid2),
919                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
920                         RETURN(-ESTALE);
921                 }
922         }
923
924         rc = it_open_error(DISP_LOOKUP_EXECD, it);
925         if (rc)
926                 RETURN(rc);
927
928         /* keep requests around for the multiple phases of the call
929          * this shows the DISP_XX must guarantee we make it into the call
930          */
931         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
932             it_disposition(it, DISP_OPEN_CREATE) &&
933             !it_open_error(DISP_OPEN_CREATE, it)) {
934                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
935                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
936         }
937         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
938             it_disposition(it, DISP_OPEN_OPEN) &&
939             !it_open_error(DISP_OPEN_OPEN, it)) {
940                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
941                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
942                 /* BUG 11546 - eviction in the middle of open rpc processing */
943                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
944         }
945
946         if (it->it_op & IT_CREAT) {
947                 /* XXX this belongs in ll_create_it */
948         } else if (it->it_op == IT_OPEN) {
949                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
950         } else {
951                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
952         }
953
954         /* If we already have a matching lock, then cancel the new
955          * one.  We have to set the data here instead of in
956          * mdc_enqueue, because we need to use the child's inode as
957          * the l_ast_data to match, and that's not available until
958          * intent_finish has performed the iget().) */
959         lock = ldlm_handle2lock(lockh);
960         if (lock) {
961                 ldlm_policy_data_t policy = lock->l_policy_data;
962                 LDLM_DEBUG(lock, "matching against this");
963
964                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
965                                          &lock->l_resource->lr_name),
966                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
967                          (unsigned long)lock->l_resource->lr_name.name[0],
968                          (unsigned long)lock->l_resource->lr_name.name[1],
969                          (unsigned long)lock->l_resource->lr_name.name[2],
970                          (unsigned long)fid_seq(&mdt_body->fid1),
971                          (unsigned long)fid_oid(&mdt_body->fid1),
972                          (unsigned long)fid_ver(&mdt_body->fid1));
973                 LDLM_LOCK_PUT(lock);
974
975                 memcpy(&old_lock, lockh, sizeof(*lockh));
976                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
977                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
978                         ldlm_lock_decref_and_cancel(lockh,
979                                                     it->d.lustre.it_lock_mode);
980                         memcpy(lockh, &old_lock, sizeof(old_lock));
981                         it->d.lustre.it_lock_handle = lockh->cookie;
982                 }
983         }
984         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
985                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
986                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
987         RETURN(rc);
988 }
989
990 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
991                         struct lu_fid *fid, __u64 *bits)
992 {
993         /* We could just return 1 immediately, but since we should only
994          * be called in revalidate_it if we already have a lock, let's
995          * verify that. */
996         struct ldlm_res_id res_id;
997         struct lustre_handle lockh;
998         ldlm_policy_data_t policy;
999         ldlm_mode_t mode;
1000         ENTRY;
1001
1002         if (it->d.lustre.it_lock_handle) {
1003                 lockh.cookie = it->d.lustre.it_lock_handle;
1004                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1005         } else {
1006                 fid_build_reg_res_name(fid, &res_id);
1007                 switch (it->it_op) {
1008                 case IT_GETATTR:
1009                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1010                         break;
1011                 case IT_LAYOUT:
1012                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1013                         break;
1014                 default:
1015                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1016                         break;
1017                 }
1018                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1019                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1020                                        LDLM_IBITS, &policy,
1021                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1022         }
1023
1024         if (mode) {
1025                 it->d.lustre.it_lock_handle = lockh.cookie;
1026                 it->d.lustre.it_lock_mode = mode;
1027         } else {
1028                 it->d.lustre.it_lock_handle = 0;
1029                 it->d.lustre.it_lock_mode = 0;
1030         }
1031
1032         RETURN(!!mode);
1033 }
1034
1035 /*
1036  * This long block is all about fixing up the lock and request state
1037  * so that it is correct as of the moment _before_ the operation was
1038  * applied; that way, the VFS will think that everything is normal and
1039  * call Lustre's regular VFS methods.
1040  *
1041  * If we're performing a creation, that means that unless the creation
1042  * failed with EEXIST, we should fake up a negative dentry.
1043  *
1044  * For everything else, we want to lookup to succeed.
1045  *
1046  * One additional note: if CREATE or OPEN succeeded, we add an extra
1047  * reference to the request because we need to keep it around until
1048  * ll_create/ll_open gets called.
1049  *
1050  * The server will return to us, in it_disposition, an indication of
1051  * exactly what d.lustre.it_status refers to.
1052  *
1053  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1054  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1055  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1056  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1057  * was successful.
1058  *
1059  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1060  * child lookup.
1061  */
1062 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1063                     void *lmm, int lmmsize, struct lookup_intent *it,
1064                     int lookup_flags, struct ptlrpc_request **reqp,
1065                     ldlm_blocking_callback cb_blocking,
1066                     __u64 extra_lock_flags)
1067 {
1068         struct lustre_handle lockh;
1069         int rc = 0;
1070         ENTRY;
1071         LASSERT(it);
1072
1073         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1074                ", intent: %s flags %#o\n", op_data->op_namelen,
1075                op_data->op_name, PFID(&op_data->op_fid2),
1076                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1077                it->it_flags);
1078
1079         lockh.cookie = 0;
1080         if (fid_is_sane(&op_data->op_fid2) &&
1081             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1082                 /* We could just return 1 immediately, but since we should only
1083                  * be called in revalidate_it if we already have a lock, let's
1084                  * verify that. */
1085                 it->d.lustre.it_lock_handle = 0;
1086                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1087                 /* Only return failure if it was not GETATTR by cfid
1088                    (from inode_revalidate) */
1089                 if (rc || op_data->op_namelen != 0)
1090                         RETURN(rc);
1091         }
1092
1093         /* lookup_it may be called only after revalidate_it has run, because
1094          * revalidate_it cannot return errors, only zero.  Returning zero causes
1095          * this call to lookup, which *can* return an error.
1096          *
1097          * We only want to execute the request associated with the intent one
1098          * time, however, so don't send the request again.  Instead, skip past
1099          * this and use the request from revalidate.  In this case, revalidate
1100          * never dropped its reference, so the refcounts are all OK */
1101         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1102                 struct ldlm_enqueue_info einfo =
1103                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1104                           ldlm_completion_ast, NULL, NULL, NULL };
1105
1106                 /* For case if upper layer did not alloc fid, do it now. */
1107                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1108                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1109                         if (rc < 0) {
1110                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1111                                 RETURN(rc);
1112                         }
1113                 }
1114                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1115                                  lmm, lmmsize, NULL, extra_lock_flags);
1116                 if (rc < 0)
1117                         RETURN(rc);
1118         } else if (!fid_is_sane(&op_data->op_fid2) ||
1119                    !(it->it_create_mode & M_CHECK_STALE)) {
1120                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1121                  * request referenced from this intent, saved for subsequent
1122                  * lookup.  This path is executed when we proceed to this
1123                  * lookup, so we clear DISP_ENQ_COMPLETE */
1124                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1125         }
1126         *reqp = it->d.lustre.it_data;
1127         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1128         RETURN(rc);
1129 }
1130
1131 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1132                                               struct ptlrpc_request *req,
1133                                               void *args, int rc)
1134 {
1135         struct mdc_getattr_args  *ga = args;
1136         struct obd_export       *exp = ga->ga_exp;
1137         struct md_enqueue_info   *minfo = ga->ga_minfo;
1138         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1139         struct lookup_intent     *it;
1140         struct lustre_handle     *lockh;
1141         struct obd_device       *obddev;
1142         __u64                flags = LDLM_FL_HAS_INTENT;
1143         ENTRY;
1144
1145         it    = &minfo->mi_it;
1146         lockh = &minfo->mi_lockh;
1147
1148         obddev = class_exp2obd(exp);
1149
1150         mdc_exit_request(&obddev->u.cli);
1151         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1152                 rc = -ETIMEDOUT;
1153
1154         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1155                                    &flags, NULL, 0, lockh, rc);
1156         if (rc < 0) {
1157                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1158                 mdc_clear_replay_flag(req, rc);
1159                 GOTO(out, rc);
1160         }
1161
1162         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1163         if (rc)
1164                 GOTO(out, rc);
1165
1166         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1167         EXIT;
1168
1169 out:
1170         OBD_FREE_PTR(einfo);
1171         minfo->mi_cb(req, minfo, rc);
1172         return 0;
1173 }
1174
1175 int mdc_intent_getattr_async(struct obd_export *exp,
1176                              struct md_enqueue_info *minfo,
1177                              struct ldlm_enqueue_info *einfo)
1178 {
1179         struct md_op_data       *op_data = &minfo->mi_data;
1180         struct lookup_intent    *it = &minfo->mi_it;
1181         struct ptlrpc_request   *req;
1182         struct mdc_getattr_args *ga;
1183         struct obd_device       *obddev = class_exp2obd(exp);
1184         struct ldlm_res_id       res_id;
1185         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1186          *     for statahead currently. Consider CMD in future, such two bits
1187          *     maybe managed by different MDS, should be adjusted then. */
1188         ldlm_policy_data_t       policy = {
1189                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1190                                                          MDS_INODELOCK_UPDATE }
1191                                  };
1192         int                   rc = 0;
1193         __u64               flags = LDLM_FL_HAS_INTENT;
1194         ENTRY;
1195
1196         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1197                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1198                ldlm_it2str(it->it_op), it->it_flags);
1199
1200         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1201         req = mdc_intent_getattr_pack(exp, it, op_data);
1202         if (!req)
1203                 RETURN(-ENOMEM);
1204
1205         rc = mdc_enter_request(&obddev->u.cli);
1206         if (rc != 0) {
1207                 ptlrpc_req_finished(req);
1208                 RETURN(rc);
1209         }
1210
1211         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1212                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1213         if (rc < 0) {
1214                 mdc_exit_request(&obddev->u.cli);
1215                 ptlrpc_req_finished(req);
1216                 RETURN(rc);
1217         }
1218
1219         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1220         ga = ptlrpc_req_async_args(req);
1221         ga->ga_exp = exp;
1222         ga->ga_minfo = minfo;
1223         ga->ga_einfo = einfo;
1224
1225         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1226         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1227
1228         RETURN(0);
1229 }