4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
39 # include <linux/module.h>
40 # include <linux/pagemap.h>
41 # include <linux/miscdevice.h>
42 # include <linux/init.h>
44 #include <lustre_acl.h>
45 #include <obd_class.h>
46 #include <lustre_dlm.h>
47 /* fid_res_name_eq() */
48 #include <lustre_fid.h>
49 #include <lprocfs_status.h>
50 #include "mdc_internal.h"
52 struct mdc_getattr_args {
53 struct obd_export *ga_exp;
54 struct md_enqueue_info *ga_minfo;
55 struct ldlm_enqueue_info *ga_einfo;
58 int it_disposition(struct lookup_intent *it, int flag)
60 return it->d.lustre.it_disposition & flag;
62 EXPORT_SYMBOL(it_disposition);
64 void it_set_disposition(struct lookup_intent *it, int flag)
66 it->d.lustre.it_disposition |= flag;
68 EXPORT_SYMBOL(it_set_disposition);
70 void it_clear_disposition(struct lookup_intent *it, int flag)
72 it->d.lustre.it_disposition &= ~flag;
74 EXPORT_SYMBOL(it_clear_disposition);
76 int it_open_error(int phase, struct lookup_intent *it)
78 if (it_disposition(it, DISP_OPEN_OPEN)) {
79 if (phase >= DISP_OPEN_OPEN)
80 return it->d.lustre.it_status;
85 if (it_disposition(it, DISP_OPEN_CREATE)) {
86 if (phase >= DISP_OPEN_CREATE)
87 return it->d.lustre.it_status;
92 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
93 if (phase >= DISP_LOOKUP_EXECD)
94 return it->d.lustre.it_status;
99 if (it_disposition(it, DISP_IT_EXECD)) {
100 if (phase >= DISP_IT_EXECD)
101 return it->d.lustre.it_status;
105 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
106 it->d.lustre.it_status);
110 EXPORT_SYMBOL(it_open_error);
112 /* this must be called on a lockh that is known to have a referenced lock */
113 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
116 struct ldlm_lock *lock;
117 struct inode *new_inode = data;
126 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
128 LASSERT(lock != NULL);
129 lock_res_and_lock(lock);
130 if (lock->l_resource->lr_lvb_inode &&
131 lock->l_resource->lr_lvb_inode != data) {
132 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
133 LASSERTF(old_inode->i_state & I_FREEING,
134 "Found existing inode %p/%lu/%u state %lu in lock: "
135 "setting data to %p/%lu/%u\n", old_inode,
136 old_inode->i_ino, old_inode->i_generation,
138 new_inode, new_inode->i_ino, new_inode->i_generation);
140 lock->l_resource->lr_lvb_inode = new_inode;
142 *bits = lock->l_policy_data.l_inodebits.bits;
144 unlock_res_and_lock(lock);
150 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
151 const struct lu_fid *fid, ldlm_type_t type,
152 ldlm_policy_data_t *policy, ldlm_mode_t mode,
153 struct lustre_handle *lockh)
155 struct ldlm_res_id res_id;
159 fid_build_reg_res_name(fid, &res_id);
160 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
161 &res_id, type, policy, mode, lockh, 0);
165 int mdc_cancel_unused(struct obd_export *exp,
166 const struct lu_fid *fid,
167 ldlm_policy_data_t *policy,
169 ldlm_cancel_flags_t flags,
172 struct ldlm_res_id res_id;
173 struct obd_device *obd = class_exp2obd(exp);
178 fid_build_reg_res_name(fid, &res_id);
179 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
180 policy, mode, flags, opaque);
184 int mdc_null_inode(struct obd_export *exp,
185 const struct lu_fid *fid)
187 struct ldlm_res_id res_id;
188 struct ldlm_resource *res;
189 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
192 LASSERTF(ns != NULL, "no namespace passed\n");
194 fid_build_reg_res_name(fid, &res_id);
196 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
201 res->lr_lvb_inode = NULL;
204 ldlm_resource_putref(res);
208 /* find any ldlm lock of the inode in mdc
212 int mdc_find_cbdata(struct obd_export *exp,
213 const struct lu_fid *fid,
214 ldlm_iterator_t it, void *data)
216 struct ldlm_res_id res_id;
220 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
221 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223 if (rc == LDLM_ITER_STOP)
225 else if (rc == LDLM_ITER_CONTINUE)
230 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 /* Don't hold error requests for replay. */
233 if (req->rq_replay) {
234 spin_lock(&req->rq_lock);
236 spin_unlock(&req->rq_lock);
238 if (rc && req->rq_transno != 0) {
239 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
244 /* Save a large LOV EA into the request buffer so that it is available
245 * for replay. We don't do this in the initial request because the
246 * original request doesn't need this buffer (at most it sends just the
247 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
248 * buffer and may also be difficult to allocate and save a very large
249 * request buffer for each open. (bug 5707)
251 * OOM here may cause recovery failure if lmm is needed (only for the
252 * original open if the MDS crashed just when this client also OOM'd)
253 * but this is incredibly unlikely, and questionable whether the client
254 * could do MDS recovery under OOM anyways... */
255 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
256 struct mdt_body *body)
260 /* FIXME: remove this explicit offset. */
261 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
264 CERROR("Can't enlarge segment %d size to %d\n",
265 DLM_INTENT_REC_OFF + 4, body->eadatasize);
266 body->valid &= ~OBD_MD_FLEASIZE;
267 body->eadatasize = 0;
271 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
272 struct lookup_intent *it,
273 struct md_op_data *op_data,
274 void *lmm, int lmmsize,
277 struct ptlrpc_request *req;
278 struct obd_device *obddev = class_exp2obd(exp);
279 struct ldlm_intent *lit;
286 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
288 /* XXX: openlock is not cancelled for cross-refs. */
289 /* If inode is known, cancel conflicting OPEN locks. */
290 if (fid_is_sane(&op_data->op_fid2)) {
291 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
294 else if (it->it_flags & FMODE_EXEC)
299 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
304 /* If CREATE, cancel parent's UPDATE lock. */
305 if (it->it_op & IT_CREAT)
309 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
311 MDS_INODELOCK_UPDATE);
313 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
314 &RQF_LDLM_INTENT_OPEN);
316 ldlm_lock_list_put(&cancels, l_bl_ast, count);
317 RETURN(ERR_PTR(-ENOMEM));
320 /* parent capability */
321 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
322 /* child capability, reserve the size according to parent capa, it will
323 * be filled after we get the reply */
324 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
326 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
327 op_data->op_namelen + 1);
328 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
329 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
331 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
333 ptlrpc_request_free(req);
337 spin_lock(&req->rq_lock);
338 req->rq_replay = req->rq_import->imp_replayable;
339 spin_unlock(&req->rq_lock);
341 /* pack the intent */
342 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
343 lit->opc = (__u64)it->it_op;
345 /* pack the intended request */
346 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
349 /* for remote client, fetch remote perm for current user */
350 if (client_is_remote(exp))
351 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
352 sizeof(struct mdt_remote_perm));
353 ptlrpc_request_set_replen(req);
357 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
358 struct lookup_intent *it,
359 struct md_op_data *op_data)
361 struct ptlrpc_request *req;
362 struct obd_device *obddev = class_exp2obd(exp);
363 struct ldlm_intent *lit;
367 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
368 &RQF_LDLM_INTENT_UNLINK);
370 RETURN(ERR_PTR(-ENOMEM));
372 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
373 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
374 op_data->op_namelen + 1);
376 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
378 ptlrpc_request_free(req);
382 /* pack the intent */
383 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
384 lit->opc = (__u64)it->it_op;
386 /* pack the intended request */
387 mdc_unlink_pack(req, op_data);
389 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
390 obddev->u.cli.cl_max_mds_easize);
391 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
392 obddev->u.cli.cl_max_mds_cookiesize);
393 ptlrpc_request_set_replen(req);
397 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
398 struct lookup_intent *it,
399 struct md_op_data *op_data)
401 struct ptlrpc_request *req;
402 struct obd_device *obddev = class_exp2obd(exp);
403 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
404 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
405 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
406 (client_is_remote(exp) ?
407 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
408 struct ldlm_intent *lit;
412 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
413 &RQF_LDLM_INTENT_GETATTR);
415 RETURN(ERR_PTR(-ENOMEM));
417 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
418 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
419 op_data->op_namelen + 1);
421 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
423 ptlrpc_request_free(req);
427 /* pack the intent */
428 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
429 lit->opc = (__u64)it->it_op;
431 /* pack the intended request */
432 mdc_getattr_pack(req, valid, it->it_flags, op_data,
433 obddev->u.cli.cl_max_mds_easize);
435 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
436 obddev->u.cli.cl_max_mds_easize);
437 if (client_is_remote(exp))
438 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
439 sizeof(struct mdt_remote_perm));
440 ptlrpc_request_set_replen(req);
444 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
445 struct lookup_intent *it,
446 struct md_op_data *unused)
448 struct obd_device *obd = class_exp2obd(exp);
449 struct ptlrpc_request *req;
450 struct ldlm_intent *lit;
451 struct layout_intent *layout;
455 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
456 &RQF_LDLM_INTENT_LAYOUT);
458 RETURN(ERR_PTR(-ENOMEM));
460 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
461 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
463 ptlrpc_request_free(req);
467 /* pack the intent */
468 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
469 lit->opc = (__u64)it->it_op;
471 /* pack the layout intent request */
472 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
473 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
474 * set for replication */
475 layout->li_opc = LAYOUT_INTENT_ACCESS;
477 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
478 obd->u.cli.cl_max_mds_easize);
479 ptlrpc_request_set_replen(req);
483 static struct ptlrpc_request *
484 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
486 struct ptlrpc_request *req;
490 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
492 RETURN(ERR_PTR(-ENOMEM));
494 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
496 ptlrpc_request_free(req);
500 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
501 ptlrpc_request_set_replen(req);
505 static int mdc_finish_enqueue(struct obd_export *exp,
506 struct ptlrpc_request *req,
507 struct ldlm_enqueue_info *einfo,
508 struct lookup_intent *it,
509 struct lustre_handle *lockh,
512 struct req_capsule *pill = &req->rq_pill;
513 struct ldlm_request *lockreq;
514 struct ldlm_reply *lockrep;
515 struct lustre_intent_data *intent = &it->d.lustre;
516 struct ldlm_lock *lock;
517 void *lvb_data = NULL;
522 /* Similarly, if we're going to replay this request, we don't want to
523 * actually get a lock, just perform the intent. */
524 if (req->rq_transno || req->rq_replay) {
525 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
526 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
529 if (rc == ELDLM_LOCK_ABORTED) {
531 memset(lockh, 0, sizeof(*lockh));
533 } else { /* rc = 0 */
534 lock = ldlm_handle2lock(lockh);
535 LASSERT(lock != NULL);
537 /* If the server gave us back a different lock mode, we should
538 * fix up our variables. */
539 if (lock->l_req_mode != einfo->ei_mode) {
540 ldlm_lock_addref(lockh, lock->l_req_mode);
541 ldlm_lock_decref(lockh, einfo->ei_mode);
542 einfo->ei_mode = lock->l_req_mode;
547 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
548 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
550 intent->it_disposition = (int)lockrep->lock_policy_res1;
551 intent->it_status = (int)lockrep->lock_policy_res2;
552 intent->it_lock_mode = einfo->ei_mode;
553 intent->it_lock_handle = lockh->cookie;
554 intent->it_data = req;
556 /* Technically speaking rq_transno must already be zero if
557 * it_status is in error, so the check is a bit redundant */
558 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
559 mdc_clear_replay_flag(req, intent->it_status);
561 /* If we're doing an IT_OPEN which did not result in an actual
562 * successful open, then we need to remove the bit which saves
563 * this request for unconditional replay.
565 * It's important that we do this first! Otherwise we might exit the
566 * function without doing so, and try to replay a failed create
568 if (it->it_op & IT_OPEN && req->rq_replay &&
569 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
570 mdc_clear_replay_flag(req, intent->it_status);
572 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
573 it->it_op, intent->it_disposition, intent->it_status);
575 /* We know what to expect, so we do any byte flipping required here */
576 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
577 struct mdt_body *body;
579 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
581 CERROR ("Can't swab mdt_body\n");
585 if (it_disposition(it, DISP_OPEN_OPEN) &&
586 !it_open_error(DISP_OPEN_OPEN, it)) {
588 * If this is a successful OPEN request, we need to set
589 * replay handler and data early, so that if replay
590 * happens immediately after swabbing below, new reply
591 * is swabbed by that handler correctly.
593 mdc_set_open_replay_data(NULL, NULL, req);
596 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
599 mdc_update_max_ea_from_body(exp, body);
602 * The eadata is opaque; just check that it is there.
603 * Eventually, obd_unpackmd() will check the contents.
605 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
610 /* save lvb data and length in case this is for layout
613 lvb_len = body->eadatasize;
616 * We save the reply LOV EA in case we have to replay a
617 * create for recovery. If we didn't allocate a large
618 * enough request buffer above we need to reallocate it
619 * here to hold the actual LOV EA.
621 * To not save LOV EA if request is not going to replay
622 * (for example error one).
624 if ((it->it_op & IT_OPEN) && req->rq_replay) {
626 if (req_capsule_get_size(pill, &RMF_EADATA,
629 mdc_realloc_openmsg(req, body);
631 req_capsule_shrink(pill, &RMF_EADATA,
635 req_capsule_set_size(pill, &RMF_EADATA,
639 lmm = req_capsule_client_get(pill, &RMF_EADATA);
641 memcpy(lmm, eadata, body->eadatasize);
645 if (body->valid & OBD_MD_FLRMTPERM) {
646 struct mdt_remote_perm *perm;
648 LASSERT(client_is_remote(exp));
649 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
650 lustre_swab_mdt_remote_perm);
654 if (body->valid & OBD_MD_FLMDSCAPA) {
655 struct lustre_capa *capa, *p;
657 capa = req_capsule_server_get(pill, &RMF_CAPA1);
661 if (it->it_op & IT_OPEN) {
662 /* client fid capa will be checked in replay */
663 p = req_capsule_client_get(pill, &RMF_CAPA2);
668 if (body->valid & OBD_MD_FLOSSCAPA) {
669 struct lustre_capa *capa;
671 capa = req_capsule_server_get(pill, &RMF_CAPA2);
675 } else if (it->it_op & IT_LAYOUT) {
676 /* maybe the lock was granted right away and layout
677 * is packed into RMF_DLM_LVB of req */
678 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
680 lvb_data = req_capsule_server_sized_get(pill,
681 &RMF_DLM_LVB, lvb_len);
682 if (lvb_data == NULL)
687 /* fill in stripe data for layout lock */
688 lock = ldlm_handle2lock(lockh);
689 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
692 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
693 ldlm_it2str(it->it_op), lvb_len);
695 OBD_ALLOC_LARGE(lmm, lvb_len);
700 memcpy(lmm, lvb_data, lvb_len);
702 /* install lvb_data */
703 lock_res_and_lock(lock);
704 if (lock->l_lvb_data == NULL) {
705 lock->l_lvb_data = lmm;
706 lock->l_lvb_len = lvb_len;
709 unlock_res_and_lock(lock);
711 OBD_FREE_LARGE(lmm, lvb_len);
719 /* We always reserve enough space in the reply packet for a stripe MD, because
720 * we don't know in advance the file type. */
721 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
722 struct lookup_intent *it, struct md_op_data *op_data,
723 struct lustre_handle *lockh, void *lmm, int lmmsize,
724 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
726 struct obd_device *obddev = class_exp2obd(exp);
727 struct ptlrpc_request *req = NULL;
728 __u64 flags, saved_flags = extra_lock_flags;
730 struct ldlm_res_id res_id;
731 static const ldlm_policy_data_t lookup_policy =
732 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
733 static const ldlm_policy_data_t update_policy =
734 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
735 static const ldlm_policy_data_t layout_policy =
736 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
737 ldlm_policy_data_t const *policy = &lookup_policy;
738 int generation, resends = 0;
739 struct ldlm_reply *lockrep;
740 enum lvb_type lvb_type = 0;
743 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
746 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
749 saved_flags |= LDLM_FL_HAS_INTENT;
750 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
751 policy = &update_policy;
752 else if (it->it_op & IT_LAYOUT)
753 policy = &layout_policy;
756 LASSERT(reqp == NULL);
758 generation = obddev->u.cli.cl_import->imp_generation;
762 /* The only way right now is FLOCK, in this case we hide flock
763 policy as lmm, but lmmsize is 0 */
764 LASSERT(lmm && lmmsize == 0);
765 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
767 policy = (ldlm_policy_data_t *)lmm;
768 res_id.name[3] = LDLM_FLOCK;
769 } else if (it->it_op & IT_OPEN) {
770 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
772 policy = &update_policy;
773 einfo->ei_cbdata = NULL;
775 } else if (it->it_op & IT_UNLINK) {
776 req = mdc_intent_unlink_pack(exp, it, op_data);
777 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
778 req = mdc_intent_getattr_pack(exp, it, op_data);
779 } else if (it->it_op & IT_READDIR) {
780 req = mdc_enqueue_pack(exp, 0);
781 } else if (it->it_op & IT_LAYOUT) {
782 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
785 req = mdc_intent_layout_pack(exp, it, op_data);
786 lvb_type = LVB_T_LAYOUT;
793 RETURN(PTR_ERR(req));
795 if (req != NULL && it && it->it_op & IT_CREAT)
796 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
798 req->rq_no_retry_einprogress = 1;
801 req->rq_generation_set = 1;
802 req->rq_import_generation = generation;
803 req->rq_sent = cfs_time_current_sec() + resends;
806 /* It is important to obtain rpc_lock first (if applicable), so that
807 * threads that are serialised with rpc_lock are not polluting our
808 * rpcs in flight counter. We do not do flock request limiting, though*/
810 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
811 rc = mdc_enter_request(&obddev->u.cli);
813 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
814 mdc_clear_replay_flag(req, 0);
815 ptlrpc_req_finished(req);
820 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
821 0, lvb_type, lockh, 0);
823 /* For flock requests we immediatelly return without further
824 delay and let caller deal with the rest, since rest of
825 this function metadata processing makes no sense for flock
830 mdc_exit_request(&obddev->u.cli);
831 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
834 CERROR("ldlm_cli_enqueue: %d\n", rc);
835 mdc_clear_replay_flag(req, rc);
836 ptlrpc_req_finished(req);
840 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
841 LASSERT(lockrep != NULL);
843 /* Retry the create infinitely when we get -EINPROGRESS from
844 * server. This is required by the new quota design. */
845 if (it && it->it_op & IT_CREAT &&
846 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
847 mdc_clear_replay_flag(req, rc);
848 ptlrpc_req_finished(req);
851 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
852 obddev->obd_name, resends, it->it_op,
853 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
855 if (generation == obddev->u.cli.cl_import->imp_generation) {
858 CDEBUG(D_HA, "resend cross eviction\n");
863 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
865 if (lustre_handle_is_used(lockh)) {
866 ldlm_lock_decref(lockh, einfo->ei_mode);
867 memset(lockh, 0, sizeof(*lockh));
869 ptlrpc_req_finished(req);
874 static int mdc_finish_intent_lock(struct obd_export *exp,
875 struct ptlrpc_request *request,
876 struct md_op_data *op_data,
877 struct lookup_intent *it,
878 struct lustre_handle *lockh)
880 struct lustre_handle old_lock;
881 struct mdt_body *mdt_body;
882 struct ldlm_lock *lock;
886 LASSERT(request != NULL);
887 LASSERT(request != LP_POISON);
888 LASSERT(request->rq_repmsg != LP_POISON);
890 if (!it_disposition(it, DISP_IT_EXECD)) {
891 /* The server failed before it even started executing the
892 * intent, i.e. because it couldn't unpack the request. */
893 LASSERT(it->d.lustre.it_status != 0);
894 RETURN(it->d.lustre.it_status);
896 rc = it_open_error(DISP_IT_EXECD, it);
900 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
901 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
903 /* If we were revalidating a fid/name pair, mark the intent in
904 * case we fail and get called again from lookup */
905 if (fid_is_sane(&op_data->op_fid2) &&
906 it->it_create_mode & M_CHECK_STALE &&
907 it->it_op != IT_GETATTR) {
908 it_set_disposition(it, DISP_ENQ_COMPLETE);
910 /* Also: did we find the same inode? */
911 /* sever can return one of two fids:
912 * op_fid2 - new allocated fid - if file is created.
913 * op_fid3 - existent fid - if file only open.
914 * op_fid3 is saved in lmv_intent_open */
915 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
916 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
917 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
918 "\n", PFID(&op_data->op_fid2),
919 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
924 rc = it_open_error(DISP_LOOKUP_EXECD, it);
928 /* keep requests around for the multiple phases of the call
929 * this shows the DISP_XX must guarantee we make it into the call
931 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
932 it_disposition(it, DISP_OPEN_CREATE) &&
933 !it_open_error(DISP_OPEN_CREATE, it)) {
934 it_set_disposition(it, DISP_ENQ_CREATE_REF);
935 ptlrpc_request_addref(request); /* balanced in ll_create_node */
937 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
938 it_disposition(it, DISP_OPEN_OPEN) &&
939 !it_open_error(DISP_OPEN_OPEN, it)) {
940 it_set_disposition(it, DISP_ENQ_OPEN_REF);
941 ptlrpc_request_addref(request); /* balanced in ll_file_open */
942 /* BUG 11546 - eviction in the middle of open rpc processing */
943 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
946 if (it->it_op & IT_CREAT) {
947 /* XXX this belongs in ll_create_it */
948 } else if (it->it_op == IT_OPEN) {
949 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
951 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
954 /* If we already have a matching lock, then cancel the new
955 * one. We have to set the data here instead of in
956 * mdc_enqueue, because we need to use the child's inode as
957 * the l_ast_data to match, and that's not available until
958 * intent_finish has performed the iget().) */
959 lock = ldlm_handle2lock(lockh);
961 ldlm_policy_data_t policy = lock->l_policy_data;
962 LDLM_DEBUG(lock, "matching against this");
964 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
965 &lock->l_resource->lr_name),
966 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
967 (unsigned long)lock->l_resource->lr_name.name[0],
968 (unsigned long)lock->l_resource->lr_name.name[1],
969 (unsigned long)lock->l_resource->lr_name.name[2],
970 (unsigned long)fid_seq(&mdt_body->fid1),
971 (unsigned long)fid_oid(&mdt_body->fid1),
972 (unsigned long)fid_ver(&mdt_body->fid1));
975 memcpy(&old_lock, lockh, sizeof(*lockh));
976 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
977 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
978 ldlm_lock_decref_and_cancel(lockh,
979 it->d.lustre.it_lock_mode);
980 memcpy(lockh, &old_lock, sizeof(old_lock));
981 it->d.lustre.it_lock_handle = lockh->cookie;
984 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
985 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
986 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
990 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
991 struct lu_fid *fid, __u64 *bits)
993 /* We could just return 1 immediately, but since we should only
994 * be called in revalidate_it if we already have a lock, let's
996 struct ldlm_res_id res_id;
997 struct lustre_handle lockh;
998 ldlm_policy_data_t policy;
1002 if (it->d.lustre.it_lock_handle) {
1003 lockh.cookie = it->d.lustre.it_lock_handle;
1004 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1006 fid_build_reg_res_name(fid, &res_id);
1007 switch (it->it_op) {
1009 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1012 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1015 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1018 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1019 LDLM_FL_BLOCK_GRANTED, &res_id,
1020 LDLM_IBITS, &policy,
1021 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1025 it->d.lustre.it_lock_handle = lockh.cookie;
1026 it->d.lustre.it_lock_mode = mode;
1028 it->d.lustre.it_lock_handle = 0;
1029 it->d.lustre.it_lock_mode = 0;
1036 * This long block is all about fixing up the lock and request state
1037 * so that it is correct as of the moment _before_ the operation was
1038 * applied; that way, the VFS will think that everything is normal and
1039 * call Lustre's regular VFS methods.
1041 * If we're performing a creation, that means that unless the creation
1042 * failed with EEXIST, we should fake up a negative dentry.
1044 * For everything else, we want to lookup to succeed.
1046 * One additional note: if CREATE or OPEN succeeded, we add an extra
1047 * reference to the request because we need to keep it around until
1048 * ll_create/ll_open gets called.
1050 * The server will return to us, in it_disposition, an indication of
1051 * exactly what d.lustre.it_status refers to.
1053 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1054 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1055 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1056 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1059 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1062 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1063 void *lmm, int lmmsize, struct lookup_intent *it,
1064 int lookup_flags, struct ptlrpc_request **reqp,
1065 ldlm_blocking_callback cb_blocking,
1066 __u64 extra_lock_flags)
1068 struct lustre_handle lockh;
1073 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1074 ", intent: %s flags %#o\n", op_data->op_namelen,
1075 op_data->op_name, PFID(&op_data->op_fid2),
1076 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1080 if (fid_is_sane(&op_data->op_fid2) &&
1081 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1082 /* We could just return 1 immediately, but since we should only
1083 * be called in revalidate_it if we already have a lock, let's
1085 it->d.lustre.it_lock_handle = 0;
1086 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1087 /* Only return failure if it was not GETATTR by cfid
1088 (from inode_revalidate) */
1089 if (rc || op_data->op_namelen != 0)
1093 /* lookup_it may be called only after revalidate_it has run, because
1094 * revalidate_it cannot return errors, only zero. Returning zero causes
1095 * this call to lookup, which *can* return an error.
1097 * We only want to execute the request associated with the intent one
1098 * time, however, so don't send the request again. Instead, skip past
1099 * this and use the request from revalidate. In this case, revalidate
1100 * never dropped its reference, so the refcounts are all OK */
1101 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1102 struct ldlm_enqueue_info einfo =
1103 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1104 ldlm_completion_ast, NULL, NULL, NULL };
1106 /* For case if upper layer did not alloc fid, do it now. */
1107 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1108 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1110 CERROR("Can't alloc new fid, rc %d\n", rc);
1114 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1115 lmm, lmmsize, NULL, extra_lock_flags);
1118 } else if (!fid_is_sane(&op_data->op_fid2) ||
1119 !(it->it_create_mode & M_CHECK_STALE)) {
1120 /* DISP_ENQ_COMPLETE set means there is extra reference on
1121 * request referenced from this intent, saved for subsequent
1122 * lookup. This path is executed when we proceed to this
1123 * lookup, so we clear DISP_ENQ_COMPLETE */
1124 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1126 *reqp = it->d.lustre.it_data;
1127 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1131 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1132 struct ptlrpc_request *req,
1135 struct mdc_getattr_args *ga = args;
1136 struct obd_export *exp = ga->ga_exp;
1137 struct md_enqueue_info *minfo = ga->ga_minfo;
1138 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1139 struct lookup_intent *it;
1140 struct lustre_handle *lockh;
1141 struct obd_device *obddev;
1142 __u64 flags = LDLM_FL_HAS_INTENT;
1146 lockh = &minfo->mi_lockh;
1148 obddev = class_exp2obd(exp);
1150 mdc_exit_request(&obddev->u.cli);
1151 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1154 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1155 &flags, NULL, 0, lockh, rc);
1157 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1158 mdc_clear_replay_flag(req, rc);
1162 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1166 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1170 OBD_FREE_PTR(einfo);
1171 minfo->mi_cb(req, minfo, rc);
1175 int mdc_intent_getattr_async(struct obd_export *exp,
1176 struct md_enqueue_info *minfo,
1177 struct ldlm_enqueue_info *einfo)
1179 struct md_op_data *op_data = &minfo->mi_data;
1180 struct lookup_intent *it = &minfo->mi_it;
1181 struct ptlrpc_request *req;
1182 struct mdc_getattr_args *ga;
1183 struct obd_device *obddev = class_exp2obd(exp);
1184 struct ldlm_res_id res_id;
1185 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1186 * for statahead currently. Consider CMD in future, such two bits
1187 * maybe managed by different MDS, should be adjusted then. */
1188 ldlm_policy_data_t policy = {
1189 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1190 MDS_INODELOCK_UPDATE }
1193 __u64 flags = LDLM_FL_HAS_INTENT;
1196 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1197 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1198 ldlm_it2str(it->it_op), it->it_flags);
1200 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1201 req = mdc_intent_getattr_pack(exp, it, op_data);
1205 rc = mdc_enter_request(&obddev->u.cli);
1207 ptlrpc_req_finished(req);
1211 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1212 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1214 mdc_exit_request(&obddev->u.cli);
1215 ptlrpc_req_finished(req);
1219 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1220 ga = ptlrpc_req_async_args(req);
1222 ga->ga_minfo = minfo;
1223 ga->ga_einfo = einfo;
1225 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1226 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);