4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
46 spinlock_t obd_types_lock;
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks);
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device *obd_device_alloc(void)
71 struct obd_device *obd;
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 obd->obd_magic = OBD_DEVICE_MAGIC;
79 static void obd_device_free(struct obd_device *obd)
82 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84 if (obd->obd_namespace != NULL) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd, obd->obd_namespace, obd->obd_force);
89 lu_ref_fini(&obd->obd_reference);
90 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 struct obd_type *class_search_type(const char *name)
95 struct list_head *tmp;
96 struct obd_type *type;
98 spin_lock(&obd_types_lock);
99 list_for_each(tmp, &obd_types) {
100 type = list_entry(tmp, struct obd_type, typ_chain);
101 if (strcmp(type->typ_name, name) == 0) {
102 spin_unlock(&obd_types_lock);
106 spin_unlock(&obd_types_lock);
109 EXPORT_SYMBOL(class_search_type);
111 struct obd_type *class_get_type(const char *name)
113 struct obd_type *type = class_search_type(name);
116 const char *modname = name;
118 if (strcmp(modname, "obdfilter") == 0)
121 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
122 modname = LUSTRE_OSP_NAME;
124 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
125 modname = LUSTRE_MDT_NAME;
127 if (!request_module("%s", modname)) {
128 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
129 type = class_search_type(name);
131 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136 spin_lock(&type->obd_type_lock);
138 try_module_get(type->typ_dt_ops->o_owner);
139 spin_unlock(&type->obd_type_lock);
143 EXPORT_SYMBOL(class_get_type);
145 void class_put_type(struct obd_type *type)
148 spin_lock(&type->obd_type_lock);
150 module_put(type->typ_dt_ops->o_owner);
151 spin_unlock(&type->obd_type_lock);
153 EXPORT_SYMBOL(class_put_type);
155 #define CLASS_MAX_NAME 1024
157 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
159 struct lu_device_type *ldt)
161 struct obd_type *type;
165 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167 if (class_search_type(name)) {
168 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173 type = kzalloc(sizeof(*type), GFP_NOFS);
177 type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
178 type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
179 type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
181 if (!type->typ_dt_ops ||
186 *(type->typ_dt_ops) = *dt_ops;
187 /* md_ops is optional */
189 *(type->typ_md_ops) = *md_ops;
190 strcpy(type->typ_name, name);
191 spin_lock_init(&type->obd_type_lock);
193 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
196 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
197 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
199 type->typ_debugfs_entry = NULL;
203 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
204 if (!type->typ_kobj) {
211 rc = lu_device_type_init(ldt);
216 spin_lock(&obd_types_lock);
217 list_add(&type->typ_chain, &obd_types);
218 spin_unlock(&obd_types_lock);
224 kobject_put(type->typ_kobj);
225 kfree(type->typ_name);
226 kfree(type->typ_md_ops);
227 kfree(type->typ_dt_ops);
231 EXPORT_SYMBOL(class_register_type);
233 int class_unregister_type(const char *name)
235 struct obd_type *type = class_search_type(name);
238 CERROR("unknown obd type\n");
242 if (type->typ_refcnt) {
243 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
244 /* This is a bad situation, let's make the best of it */
245 /* Remove ops, but leave the name for debugging */
246 kfree(type->typ_dt_ops);
247 kfree(type->typ_md_ops);
252 kobject_put(type->typ_kobj);
254 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
255 ldebugfs_remove(&type->typ_debugfs_entry);
258 lu_device_type_fini(type->typ_lu);
260 spin_lock(&obd_types_lock);
261 list_del(&type->typ_chain);
262 spin_unlock(&obd_types_lock);
263 kfree(type->typ_name);
264 kfree(type->typ_dt_ops);
265 kfree(type->typ_md_ops);
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
272 * Create a new obd device.
274 * Find an empty slot in ::obd_devs[], create a new obd device in it.
276 * \param[in] type_name obd device type string.
277 * \param[in] name obd device name.
279 * \retval NULL if create fails, otherwise return the obd device
282 struct obd_device *class_newdev(const char *type_name, const char *name)
284 struct obd_device *result = NULL;
285 struct obd_device *newdev;
286 struct obd_type *type = NULL;
288 int new_obd_minor = 0;
290 if (strlen(name) >= MAX_OBD_NAME) {
291 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
292 return ERR_PTR(-EINVAL);
295 type = class_get_type(type_name);
297 CERROR("OBD: unknown type: %s\n", type_name);
298 return ERR_PTR(-ENODEV);
301 newdev = obd_device_alloc();
303 result = ERR_PTR(-ENOMEM);
307 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
309 write_lock(&obd_dev_lock);
310 for (i = 0; i < class_devno_max(); i++) {
311 struct obd_device *obd = class_num2obd(i);
313 if (obd && (strcmp(name, obd->obd_name) == 0)) {
314 CERROR("Device %s already exists at %d, won't add\n",
317 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
318 "%p obd_magic %08x != %08x\n", result,
319 result->obd_magic, OBD_DEVICE_MAGIC);
320 LASSERTF(result->obd_minor == new_obd_minor,
321 "%p obd_minor %d != %d\n", result,
322 result->obd_minor, new_obd_minor);
324 obd_devs[result->obd_minor] = NULL;
325 result->obd_name[0] = '\0';
327 result = ERR_PTR(-EEXIST);
330 if (!result && !obd) {
332 result->obd_minor = i;
334 result->obd_type = type;
335 strncpy(result->obd_name, name,
336 sizeof(result->obd_name) - 1);
337 obd_devs[i] = result;
340 write_unlock(&obd_dev_lock);
342 if (!result && i >= class_devno_max()) {
343 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
345 result = ERR_PTR(-EOVERFLOW);
352 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
353 result->obd_name, result);
357 obd_device_free(newdev);
359 class_put_type(type);
363 void class_release_dev(struct obd_device *obd)
365 struct obd_type *obd_type = obd->obd_type;
367 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
368 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
369 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
370 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
371 LASSERT(obd_type != NULL);
373 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
374 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
376 write_lock(&obd_dev_lock);
377 obd_devs[obd->obd_minor] = NULL;
378 write_unlock(&obd_dev_lock);
379 obd_device_free(obd);
381 class_put_type(obd_type);
384 int class_name2dev(const char *name)
391 read_lock(&obd_dev_lock);
392 for (i = 0; i < class_devno_max(); i++) {
393 struct obd_device *obd = class_num2obd(i);
395 if (obd && strcmp(name, obd->obd_name) == 0) {
396 /* Make sure we finished attaching before we give
397 out any references */
398 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
399 if (obd->obd_attached) {
400 read_unlock(&obd_dev_lock);
406 read_unlock(&obd_dev_lock);
410 EXPORT_SYMBOL(class_name2dev);
412 struct obd_device *class_name2obd(const char *name)
414 int dev = class_name2dev(name);
416 if (dev < 0 || dev > class_devno_max())
418 return class_num2obd(dev);
420 EXPORT_SYMBOL(class_name2obd);
422 int class_uuid2dev(struct obd_uuid *uuid)
426 read_lock(&obd_dev_lock);
427 for (i = 0; i < class_devno_max(); i++) {
428 struct obd_device *obd = class_num2obd(i);
430 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
431 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
432 read_unlock(&obd_dev_lock);
436 read_unlock(&obd_dev_lock);
440 EXPORT_SYMBOL(class_uuid2dev);
442 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
444 int dev = class_uuid2dev(uuid);
448 return class_num2obd(dev);
450 EXPORT_SYMBOL(class_uuid2obd);
453 * Get obd device from ::obd_devs[]
455 * \param num [in] array index
457 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
458 * otherwise return the obd device there.
460 struct obd_device *class_num2obd(int num)
462 struct obd_device *obd = NULL;
464 if (num < class_devno_max()) {
469 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
470 "%p obd_magic %08x != %08x\n",
471 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
472 LASSERTF(obd->obd_minor == num,
473 "%p obd_minor %0d != %0d\n",
474 obd, obd->obd_minor, num);
479 EXPORT_SYMBOL(class_num2obd);
482 * Get obd devices count. Device in any
484 * \retval obd device count
486 int get_devices_count(void)
488 int index, max_index = class_devno_max(), dev_count = 0;
490 read_lock(&obd_dev_lock);
491 for (index = 0; index <= max_index; index++) {
492 struct obd_device *obd = class_num2obd(index);
497 read_unlock(&obd_dev_lock);
501 EXPORT_SYMBOL(get_devices_count);
503 void class_obd_list(void)
508 read_lock(&obd_dev_lock);
509 for (i = 0; i < class_devno_max(); i++) {
510 struct obd_device *obd = class_num2obd(i);
514 if (obd->obd_stopping)
516 else if (obd->obd_set_up)
518 else if (obd->obd_attached)
522 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
523 i, status, obd->obd_type->typ_name,
524 obd->obd_name, obd->obd_uuid.uuid,
525 atomic_read(&obd->obd_refcount));
527 read_unlock(&obd_dev_lock);
530 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
531 specified, then only the client with that uuid is returned,
532 otherwise any client connected to the tgt is returned. */
533 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
534 const char *typ_name,
535 struct obd_uuid *grp_uuid)
539 read_lock(&obd_dev_lock);
540 for (i = 0; i < class_devno_max(); i++) {
541 struct obd_device *obd = class_num2obd(i);
545 if ((strncmp(obd->obd_type->typ_name, typ_name,
546 strlen(typ_name)) == 0)) {
547 if (obd_uuid_equals(tgt_uuid,
548 &obd->u.cli.cl_target_uuid) &&
549 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
550 &obd->obd_uuid) : 1)) {
551 read_unlock(&obd_dev_lock);
556 read_unlock(&obd_dev_lock);
560 EXPORT_SYMBOL(class_find_client_obd);
562 /* Iterate the obd_device list looking devices have grp_uuid. Start
563 searching at *next, and if a device is found, the next index to look
564 at is saved in *next. If next is NULL, then the first matching device
565 will always be returned. */
566 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
572 else if (*next >= 0 && *next < class_devno_max())
577 read_lock(&obd_dev_lock);
578 for (; i < class_devno_max(); i++) {
579 struct obd_device *obd = class_num2obd(i);
583 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
586 read_unlock(&obd_dev_lock);
590 read_unlock(&obd_dev_lock);
594 EXPORT_SYMBOL(class_devices_in_group);
597 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
598 * adjust sptlrpc settings accordingly.
600 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
602 struct obd_device *obd;
606 LASSERT(namelen > 0);
608 read_lock(&obd_dev_lock);
609 for (i = 0; i < class_devno_max(); i++) {
610 obd = class_num2obd(i);
612 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
615 /* only notify mdc, osc, mdt, ost */
616 type = obd->obd_type->typ_name;
617 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
618 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
619 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
620 strcmp(type, LUSTRE_OST_NAME) != 0)
623 if (strncmp(obd->obd_name, fsname, namelen))
626 class_incref(obd, __func__, obd);
627 read_unlock(&obd_dev_lock);
628 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
629 sizeof(KEY_SPTLRPC_CONF),
630 KEY_SPTLRPC_CONF, 0, NULL, NULL);
632 class_decref(obd, __func__, obd);
633 read_lock(&obd_dev_lock);
635 read_unlock(&obd_dev_lock);
638 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
640 void obd_cleanup_caches(void)
642 kmem_cache_destroy(obd_device_cachep);
643 obd_device_cachep = NULL;
644 kmem_cache_destroy(obdo_cachep);
646 kmem_cache_destroy(import_cachep);
647 import_cachep = NULL;
648 kmem_cache_destroy(capa_cachep);
652 int obd_init_caches(void)
654 LASSERT(!obd_device_cachep);
655 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
656 sizeof(struct obd_device),
658 if (!obd_device_cachep)
661 LASSERT(!obdo_cachep);
662 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
667 LASSERT(!import_cachep);
668 import_cachep = kmem_cache_create("ll_import_cache",
669 sizeof(struct obd_import),
674 LASSERT(!capa_cachep);
675 capa_cachep = kmem_cache_create("capa_cache",
676 sizeof(struct obd_capa), 0, 0, NULL);
682 obd_cleanup_caches();
687 /* map connection to client */
688 struct obd_export *class_conn2export(struct lustre_handle *conn)
690 struct obd_export *export;
693 CDEBUG(D_CACHE, "looking for null handle\n");
697 if (conn->cookie == -1) { /* this means assign a new connection */
698 CDEBUG(D_CACHE, "want a new connection\n");
702 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
703 export = class_handle2object(conn->cookie);
706 EXPORT_SYMBOL(class_conn2export);
708 struct obd_device *class_exp2obd(struct obd_export *exp)
714 EXPORT_SYMBOL(class_exp2obd);
716 struct obd_device *class_conn2obd(struct lustre_handle *conn)
718 struct obd_export *export;
720 export = class_conn2export(conn);
722 struct obd_device *obd = export->exp_obd;
724 class_export_put(export);
729 EXPORT_SYMBOL(class_conn2obd);
731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
733 struct obd_device *obd = exp->exp_obd;
737 return obd->u.cli.cl_import;
739 EXPORT_SYMBOL(class_exp2cliimp);
741 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
743 struct obd_device *obd = class_conn2obd(conn);
747 return obd->u.cli.cl_import;
749 EXPORT_SYMBOL(class_conn2cliimp);
751 /* Export management functions */
752 static void class_export_destroy(struct obd_export *exp)
754 struct obd_device *obd = exp->exp_obd;
756 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
757 LASSERT(obd != NULL);
759 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
760 exp->exp_client_uuid.uuid, obd->obd_name);
762 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
763 if (exp->exp_connection)
764 ptlrpc_put_connection_superhack(exp->exp_connection);
766 LASSERT(list_empty(&exp->exp_outstanding_replies));
767 LASSERT(list_empty(&exp->exp_uncommitted_replies));
768 LASSERT(list_empty(&exp->exp_req_replay_queue));
769 LASSERT(list_empty(&exp->exp_hp_rpcs));
770 obd_destroy_export(exp);
771 class_decref(obd, "export", exp);
773 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
776 static void export_handle_addref(void *export)
778 class_export_get(export);
781 static struct portals_handle_ops export_handle_ops = {
782 .hop_addref = export_handle_addref,
786 struct obd_export *class_export_get(struct obd_export *exp)
788 atomic_inc(&exp->exp_refcount);
789 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
790 atomic_read(&exp->exp_refcount));
793 EXPORT_SYMBOL(class_export_get);
795 void class_export_put(struct obd_export *exp)
797 LASSERT(exp != NULL);
798 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
799 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
800 atomic_read(&exp->exp_refcount) - 1);
802 if (atomic_dec_and_test(&exp->exp_refcount)) {
803 LASSERT(!list_empty(&exp->exp_obd_chain));
804 CDEBUG(D_IOCTL, "final put %p/%s\n",
805 exp, exp->exp_client_uuid.uuid);
807 /* release nid stat refererence */
808 lprocfs_exp_cleanup(exp);
810 obd_zombie_export_add(exp);
813 EXPORT_SYMBOL(class_export_put);
815 /* Creates a new export, adds it to the hash table, and returns a
816 * pointer to it. The refcount is 2: one for the hash reference, and
817 * one for the pointer returned by this function. */
818 struct obd_export *class_new_export(struct obd_device *obd,
819 struct obd_uuid *cluuid)
821 struct obd_export *export;
822 struct cfs_hash *hash = NULL;
825 export = kzalloc(sizeof(*export), GFP_NOFS);
827 return ERR_PTR(-ENOMEM);
829 export->exp_conn_cnt = 0;
830 export->exp_lock_hash = NULL;
831 export->exp_flock_hash = NULL;
832 atomic_set(&export->exp_refcount, 2);
833 atomic_set(&export->exp_rpc_count, 0);
834 atomic_set(&export->exp_cb_count, 0);
835 atomic_set(&export->exp_locks_count, 0);
836 #if LUSTRE_TRACKS_LOCK_EXP_REFS
837 INIT_LIST_HEAD(&export->exp_locks_list);
838 spin_lock_init(&export->exp_locks_list_guard);
840 atomic_set(&export->exp_replay_count, 0);
841 export->exp_obd = obd;
842 INIT_LIST_HEAD(&export->exp_outstanding_replies);
843 spin_lock_init(&export->exp_uncommitted_replies_lock);
844 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
845 INIT_LIST_HEAD(&export->exp_req_replay_queue);
846 INIT_LIST_HEAD(&export->exp_handle.h_link);
847 INIT_LIST_HEAD(&export->exp_hp_rpcs);
848 class_handle_hash(&export->exp_handle, &export_handle_ops);
849 export->exp_last_request_time = get_seconds();
850 spin_lock_init(&export->exp_lock);
851 spin_lock_init(&export->exp_rpc_lock);
852 INIT_HLIST_NODE(&export->exp_uuid_hash);
853 INIT_HLIST_NODE(&export->exp_nid_hash);
854 spin_lock_init(&export->exp_bl_list_lock);
855 INIT_LIST_HEAD(&export->exp_bl_list);
857 export->exp_sp_peer = LUSTRE_SP_ANY;
858 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
859 export->exp_client_uuid = *cluuid;
860 obd_init_export(export);
862 spin_lock(&obd->obd_dev_lock);
863 /* shouldn't happen, but might race */
864 if (obd->obd_stopping) {
869 hash = cfs_hash_getref(obd->obd_uuid_hash);
874 spin_unlock(&obd->obd_dev_lock);
876 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
877 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
879 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
880 obd->obd_name, cluuid->uuid, rc);
886 spin_lock(&obd->obd_dev_lock);
887 if (obd->obd_stopping) {
888 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
893 class_incref(obd, "export", export);
894 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
895 list_add_tail(&export->exp_obd_chain_timed,
896 &export->exp_obd->obd_exports_timed);
897 export->exp_obd->obd_num_exports++;
898 spin_unlock(&obd->obd_dev_lock);
899 cfs_hash_putref(hash);
903 spin_unlock(&obd->obd_dev_lock);
906 cfs_hash_putref(hash);
907 class_handle_unhash(&export->exp_handle);
908 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
909 obd_destroy_export(export);
913 EXPORT_SYMBOL(class_new_export);
915 void class_unlink_export(struct obd_export *exp)
917 class_handle_unhash(&exp->exp_handle);
919 spin_lock(&exp->exp_obd->obd_dev_lock);
920 /* delete an uuid-export hashitem from hashtables */
921 if (!hlist_unhashed(&exp->exp_uuid_hash))
922 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
923 &exp->exp_client_uuid,
924 &exp->exp_uuid_hash);
926 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
927 list_del_init(&exp->exp_obd_chain_timed);
928 exp->exp_obd->obd_num_exports--;
929 spin_unlock(&exp->exp_obd->obd_dev_lock);
930 class_export_put(exp);
932 EXPORT_SYMBOL(class_unlink_export);
934 /* Import management functions */
935 static void class_import_destroy(struct obd_import *imp)
937 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
938 imp->imp_obd->obd_name);
940 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
942 ptlrpc_put_connection_superhack(imp->imp_connection);
944 while (!list_empty(&imp->imp_conn_list)) {
945 struct obd_import_conn *imp_conn;
947 imp_conn = list_entry(imp->imp_conn_list.next,
948 struct obd_import_conn, oic_item);
949 list_del_init(&imp_conn->oic_item);
950 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
954 LASSERT(!imp->imp_sec);
955 class_decref(imp->imp_obd, "import", imp);
956 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
959 static void import_handle_addref(void *import)
961 class_import_get(import);
964 static struct portals_handle_ops import_handle_ops = {
965 .hop_addref = import_handle_addref,
969 struct obd_import *class_import_get(struct obd_import *import)
971 atomic_inc(&import->imp_refcount);
972 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
973 atomic_read(&import->imp_refcount),
974 import->imp_obd->obd_name);
977 EXPORT_SYMBOL(class_import_get);
979 void class_import_put(struct obd_import *imp)
981 LASSERT(list_empty(&imp->imp_zombie_chain));
982 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
984 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
985 atomic_read(&imp->imp_refcount) - 1,
986 imp->imp_obd->obd_name);
988 if (atomic_dec_and_test(&imp->imp_refcount)) {
989 CDEBUG(D_INFO, "final put import %p\n", imp);
990 obd_zombie_import_add(imp);
993 /* catch possible import put race */
994 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
996 EXPORT_SYMBOL(class_import_put);
998 static void init_imp_at(struct imp_at *at)
1002 at_init(&at->iat_net_latency, 0, 0);
1003 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1004 /* max service estimates are tracked on the server side, so
1005 don't use the AT history here, just use the last reported
1006 val. (But keep hist for proc histogram, worst_ever) */
1007 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1012 struct obd_import *class_new_import(struct obd_device *obd)
1014 struct obd_import *imp;
1016 imp = kzalloc(sizeof(*imp), GFP_NOFS);
1020 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1021 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1022 INIT_LIST_HEAD(&imp->imp_replay_list);
1023 INIT_LIST_HEAD(&imp->imp_sending_list);
1024 INIT_LIST_HEAD(&imp->imp_delayed_list);
1025 INIT_LIST_HEAD(&imp->imp_committed_list);
1026 imp->imp_replay_cursor = &imp->imp_committed_list;
1027 spin_lock_init(&imp->imp_lock);
1028 imp->imp_last_success_conn = 0;
1029 imp->imp_state = LUSTRE_IMP_NEW;
1030 imp->imp_obd = class_incref(obd, "import", imp);
1031 mutex_init(&imp->imp_sec_mutex);
1032 init_waitqueue_head(&imp->imp_recovery_waitq);
1034 atomic_set(&imp->imp_refcount, 2);
1035 atomic_set(&imp->imp_unregistering, 0);
1036 atomic_set(&imp->imp_inflight, 0);
1037 atomic_set(&imp->imp_replay_inflight, 0);
1038 atomic_set(&imp->imp_inval_count, 0);
1039 INIT_LIST_HEAD(&imp->imp_conn_list);
1040 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1041 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1042 init_imp_at(&imp->imp_at);
1044 /* the default magic is V2, will be used in connect RPC, and
1045 * then adjusted according to the flags in request/reply. */
1046 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1050 EXPORT_SYMBOL(class_new_import);
1052 void class_destroy_import(struct obd_import *import)
1054 LASSERT(import != NULL);
1055 LASSERT(import != LP_POISON);
1057 class_handle_unhash(&import->imp_handle);
1059 spin_lock(&import->imp_lock);
1060 import->imp_generation++;
1061 spin_unlock(&import->imp_lock);
1062 class_import_put(import);
1064 EXPORT_SYMBOL(class_destroy_import);
1066 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1068 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1070 spin_lock(&exp->exp_locks_list_guard);
1072 LASSERT(lock->l_exp_refs_nr >= 0);
1074 if (lock->l_exp_refs_target != NULL &&
1075 lock->l_exp_refs_target != exp) {
1076 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1077 exp, lock, lock->l_exp_refs_target);
1079 if ((lock->l_exp_refs_nr++) == 0) {
1080 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1081 lock->l_exp_refs_target = exp;
1083 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1084 lock, exp, lock->l_exp_refs_nr);
1085 spin_unlock(&exp->exp_locks_list_guard);
1087 EXPORT_SYMBOL(__class_export_add_lock_ref);
1089 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1091 spin_lock(&exp->exp_locks_list_guard);
1092 LASSERT(lock->l_exp_refs_nr > 0);
1093 if (lock->l_exp_refs_target != exp) {
1094 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1095 lock, lock->l_exp_refs_target, exp);
1097 if (-- lock->l_exp_refs_nr == 0) {
1098 list_del_init(&lock->l_exp_refs_link);
1099 lock->l_exp_refs_target = NULL;
1101 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1102 lock, exp, lock->l_exp_refs_nr);
1103 spin_unlock(&exp->exp_locks_list_guard);
1105 EXPORT_SYMBOL(__class_export_del_lock_ref);
1108 /* A connection defines an export context in which preallocation can
1109 be managed. This releases the export pointer reference, and returns
1110 the export handle, so the export refcount is 1 when this function
1112 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1113 struct obd_uuid *cluuid)
1115 struct obd_export *export;
1117 LASSERT(conn != NULL);
1118 LASSERT(obd != NULL);
1119 LASSERT(cluuid != NULL);
1121 export = class_new_export(obd, cluuid);
1123 return PTR_ERR(export);
1125 conn->cookie = export->exp_handle.h_cookie;
1126 class_export_put(export);
1128 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1129 cluuid->uuid, conn->cookie);
1132 EXPORT_SYMBOL(class_connect);
1134 /* if export is involved in recovery then clean up related things */
1135 static void class_export_recovery_cleanup(struct obd_export *exp)
1137 struct obd_device *obd = exp->exp_obd;
1139 spin_lock(&obd->obd_recovery_task_lock);
1140 if (exp->exp_delayed)
1141 obd->obd_delayed_clients--;
1142 if (obd->obd_recovering) {
1143 if (exp->exp_in_recovery) {
1144 spin_lock(&exp->exp_lock);
1145 exp->exp_in_recovery = 0;
1146 spin_unlock(&exp->exp_lock);
1147 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1148 atomic_dec(&obd->obd_connected_clients);
1151 /* if called during recovery then should update
1152 * obd_stale_clients counter,
1153 * lightweight exports are not counted */
1154 if (exp->exp_failed &&
1155 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1156 exp->exp_obd->obd_stale_clients++;
1158 spin_unlock(&obd->obd_recovery_task_lock);
1160 spin_lock(&exp->exp_lock);
1161 /** Cleanup req replay fields */
1162 if (exp->exp_req_replay_needed) {
1163 exp->exp_req_replay_needed = 0;
1165 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1166 atomic_dec(&obd->obd_req_replay_clients);
1169 /** Cleanup lock replay data */
1170 if (exp->exp_lock_replay_needed) {
1171 exp->exp_lock_replay_needed = 0;
1173 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1174 atomic_dec(&obd->obd_lock_replay_clients);
1176 spin_unlock(&exp->exp_lock);
1179 /* This function removes 1-3 references from the export:
1180 * 1 - for export pointer passed
1181 * and if disconnect really need
1182 * 2 - removing from hash
1183 * 3 - in client_unlink_export
1184 * The export pointer passed to this function can destroyed */
1185 int class_disconnect(struct obd_export *export)
1187 int already_disconnected;
1190 CWARN("attempting to free NULL export %p\n", export);
1194 spin_lock(&export->exp_lock);
1195 already_disconnected = export->exp_disconnected;
1196 export->exp_disconnected = 1;
1197 spin_unlock(&export->exp_lock);
1199 /* class_cleanup(), abort_recovery(), and class_fail_export()
1200 * all end up in here, and if any of them race we shouldn't
1201 * call extra class_export_puts(). */
1202 if (already_disconnected) {
1203 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1207 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1208 export->exp_handle.h_cookie);
1210 if (!hlist_unhashed(&export->exp_nid_hash))
1211 cfs_hash_del(export->exp_obd->obd_nid_hash,
1212 &export->exp_connection->c_peer.nid,
1213 &export->exp_nid_hash);
1215 class_export_recovery_cleanup(export);
1216 class_unlink_export(export);
1218 class_export_put(export);
1221 EXPORT_SYMBOL(class_disconnect);
1223 /* Return non-zero for a fully connected export */
1224 int class_connected_export(struct obd_export *exp)
1229 spin_lock(&exp->exp_lock);
1230 connected = exp->exp_conn_cnt > 0;
1231 spin_unlock(&exp->exp_lock);
1236 EXPORT_SYMBOL(class_connected_export);
1238 static void class_disconnect_export_list(struct list_head *list,
1239 enum obd_option flags)
1242 struct obd_export *exp;
1244 /* It's possible that an export may disconnect itself, but
1245 * nothing else will be added to this list. */
1246 while (!list_empty(list)) {
1247 exp = list_entry(list->next, struct obd_export,
1249 /* need for safe call CDEBUG after obd_disconnect */
1250 class_export_get(exp);
1252 spin_lock(&exp->exp_lock);
1253 exp->exp_flags = flags;
1254 spin_unlock(&exp->exp_lock);
1256 if (obd_uuid_equals(&exp->exp_client_uuid,
1257 &exp->exp_obd->obd_uuid)) {
1259 "exp %p export uuid == obd uuid, don't discon\n",
1261 /* Need to delete this now so we don't end up pointing
1262 * to work_list later when this export is cleaned up. */
1263 list_del_init(&exp->exp_obd_chain);
1264 class_export_put(exp);
1268 class_export_get(exp);
1269 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1270 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1271 exp, exp->exp_last_request_time);
1272 /* release one export reference anyway */
1273 rc = obd_disconnect(exp);
1275 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1276 obd_export_nid2str(exp), exp, rc);
1277 class_export_put(exp);
1281 void class_disconnect_exports(struct obd_device *obd)
1283 struct list_head work_list;
1285 /* Move all of the exports from obd_exports to a work list, en masse. */
1286 INIT_LIST_HEAD(&work_list);
1287 spin_lock(&obd->obd_dev_lock);
1288 list_splice_init(&obd->obd_exports, &work_list);
1289 list_splice_init(&obd->obd_delayed_exports, &work_list);
1290 spin_unlock(&obd->obd_dev_lock);
1292 if (!list_empty(&work_list)) {
1293 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1294 obd->obd_minor, obd);
1295 class_disconnect_export_list(&work_list,
1296 exp_flags_from_obd(obd));
1298 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1299 obd->obd_minor, obd);
1301 EXPORT_SYMBOL(class_disconnect_exports);
1303 /* Remove exports that have not completed recovery.
1305 void class_disconnect_stale_exports(struct obd_device *obd,
1306 int (*test_export)(struct obd_export *))
1308 struct list_head work_list;
1309 struct obd_export *exp, *n;
1312 INIT_LIST_HEAD(&work_list);
1313 spin_lock(&obd->obd_dev_lock);
1314 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1316 /* don't count self-export as client */
1317 if (obd_uuid_equals(&exp->exp_client_uuid,
1318 &exp->exp_obd->obd_uuid))
1321 /* don't evict clients which have no slot in last_rcvd
1322 * (e.g. lightweight connection) */
1323 if (exp->exp_target_data.ted_lr_idx == -1)
1326 spin_lock(&exp->exp_lock);
1327 if (exp->exp_failed || test_export(exp)) {
1328 spin_unlock(&exp->exp_lock);
1331 exp->exp_failed = 1;
1332 spin_unlock(&exp->exp_lock);
1334 list_move(&exp->exp_obd_chain, &work_list);
1336 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1337 obd->obd_name, exp->exp_client_uuid.uuid,
1338 !exp->exp_connection ? "<unknown>" :
1339 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1340 print_export_data(exp, "EVICTING", 0);
1342 spin_unlock(&obd->obd_dev_lock);
1345 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1346 obd->obd_name, evicted);
1348 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1349 OBD_OPT_ABORT_RECOV);
1351 EXPORT_SYMBOL(class_disconnect_stale_exports);
1353 void class_fail_export(struct obd_export *exp)
1355 int rc, already_failed;
1357 spin_lock(&exp->exp_lock);
1358 already_failed = exp->exp_failed;
1359 exp->exp_failed = 1;
1360 spin_unlock(&exp->exp_lock);
1362 if (already_failed) {
1363 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1364 exp, exp->exp_client_uuid.uuid);
1368 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1369 exp, exp->exp_client_uuid.uuid);
1371 if (obd_dump_on_timeout)
1372 libcfs_debug_dumplog();
1374 /* need for safe call CDEBUG after obd_disconnect */
1375 class_export_get(exp);
1377 /* Most callers into obd_disconnect are removing their own reference
1378 * (request, for example) in addition to the one from the hash table.
1379 * We don't have such a reference here, so make one. */
1380 class_export_get(exp);
1381 rc = obd_disconnect(exp);
1383 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1385 CDEBUG(D_HA, "disconnected export %p/%s\n",
1386 exp, exp->exp_client_uuid.uuid);
1387 class_export_put(exp);
1389 EXPORT_SYMBOL(class_fail_export);
1391 char *obd_export_nid2str(struct obd_export *exp)
1393 if (exp->exp_connection != NULL)
1394 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1398 EXPORT_SYMBOL(obd_export_nid2str);
1400 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1402 struct cfs_hash *nid_hash;
1403 struct obd_export *doomed_exp = NULL;
1404 int exports_evicted = 0;
1406 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1408 spin_lock(&obd->obd_dev_lock);
1409 /* umount has run already, so evict thread should leave
1410 * its task to umount thread now */
1411 if (obd->obd_stopping) {
1412 spin_unlock(&obd->obd_dev_lock);
1413 return exports_evicted;
1415 nid_hash = obd->obd_nid_hash;
1416 cfs_hash_getref(nid_hash);
1417 spin_unlock(&obd->obd_dev_lock);
1420 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1424 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1425 "nid %s found, wanted nid %s, requested nid %s\n",
1426 obd_export_nid2str(doomed_exp),
1427 libcfs_nid2str(nid_key), nid);
1428 LASSERTF(doomed_exp != obd->obd_self_export,
1429 "self-export is hashed by NID?\n");
1431 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1433 obd_uuid2str(&doomed_exp->exp_client_uuid),
1434 obd_export_nid2str(doomed_exp));
1435 class_fail_export(doomed_exp);
1436 class_export_put(doomed_exp);
1439 cfs_hash_putref(nid_hash);
1441 if (!exports_evicted)
1443 "%s: can't disconnect NID '%s': no exports found\n",
1444 obd->obd_name, nid);
1445 return exports_evicted;
1447 EXPORT_SYMBOL(obd_export_evict_by_nid);
1449 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1451 struct cfs_hash *uuid_hash;
1452 struct obd_export *doomed_exp = NULL;
1453 struct obd_uuid doomed_uuid;
1454 int exports_evicted = 0;
1456 spin_lock(&obd->obd_dev_lock);
1457 if (obd->obd_stopping) {
1458 spin_unlock(&obd->obd_dev_lock);
1459 return exports_evicted;
1461 uuid_hash = obd->obd_uuid_hash;
1462 cfs_hash_getref(uuid_hash);
1463 spin_unlock(&obd->obd_dev_lock);
1465 obd_str2uuid(&doomed_uuid, uuid);
1466 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1467 CERROR("%s: can't evict myself\n", obd->obd_name);
1468 cfs_hash_putref(uuid_hash);
1469 return exports_evicted;
1472 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1475 CERROR("%s: can't disconnect %s: no exports found\n",
1476 obd->obd_name, uuid);
1478 CWARN("%s: evicting %s at administrative request\n",
1479 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1480 class_fail_export(doomed_exp);
1481 class_export_put(doomed_exp);
1484 cfs_hash_putref(uuid_hash);
1486 return exports_evicted;
1488 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1490 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1491 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1492 EXPORT_SYMBOL(class_export_dump_hook);
1495 static void print_export_data(struct obd_export *exp, const char *status,
1498 struct ptlrpc_reply_state *rs;
1499 struct ptlrpc_reply_state *first_reply = NULL;
1502 spin_lock(&exp->exp_lock);
1503 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1509 spin_unlock(&exp->exp_lock);
1511 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1512 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1513 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1514 atomic_read(&exp->exp_rpc_count),
1515 atomic_read(&exp->exp_cb_count),
1516 atomic_read(&exp->exp_locks_count),
1517 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1518 nreplies, first_reply, nreplies > 3 ? "..." : "",
1519 exp->exp_last_committed);
1520 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1521 if (locks && class_export_dump_hook != NULL)
1522 class_export_dump_hook(exp);
1526 void dump_exports(struct obd_device *obd, int locks)
1528 struct obd_export *exp;
1530 spin_lock(&obd->obd_dev_lock);
1531 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1532 print_export_data(exp, "ACTIVE", locks);
1533 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1534 print_export_data(exp, "UNLINKED", locks);
1535 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1536 print_export_data(exp, "DELAYED", locks);
1537 spin_unlock(&obd->obd_dev_lock);
1538 spin_lock(&obd_zombie_impexp_lock);
1539 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1540 print_export_data(exp, "ZOMBIE", locks);
1541 spin_unlock(&obd_zombie_impexp_lock);
1543 EXPORT_SYMBOL(dump_exports);
1545 void obd_exports_barrier(struct obd_device *obd)
1549 LASSERT(list_empty(&obd->obd_exports));
1550 spin_lock(&obd->obd_dev_lock);
1551 while (!list_empty(&obd->obd_unlinked_exports)) {
1552 spin_unlock(&obd->obd_dev_lock);
1553 set_current_state(TASK_UNINTERRUPTIBLE);
1554 schedule_timeout(cfs_time_seconds(waited));
1555 if (waited > 5 && IS_PO2(waited)) {
1556 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1557 obd->obd_name, waited,
1558 atomic_read(&obd->obd_refcount));
1559 dump_exports(obd, 1);
1562 spin_lock(&obd->obd_dev_lock);
1564 spin_unlock(&obd->obd_dev_lock);
1566 EXPORT_SYMBOL(obd_exports_barrier);
1568 /* Total amount of zombies to be destroyed */
1569 static int zombies_count;
1572 * kill zombie imports and exports
1574 void obd_zombie_impexp_cull(void)
1576 struct obd_import *import;
1577 struct obd_export *export;
1580 spin_lock(&obd_zombie_impexp_lock);
1583 if (!list_empty(&obd_zombie_imports)) {
1584 import = list_entry(obd_zombie_imports.next,
1587 list_del_init(&import->imp_zombie_chain);
1591 if (!list_empty(&obd_zombie_exports)) {
1592 export = list_entry(obd_zombie_exports.next,
1595 list_del_init(&export->exp_obd_chain);
1598 spin_unlock(&obd_zombie_impexp_lock);
1600 if (import != NULL) {
1601 class_import_destroy(import);
1602 spin_lock(&obd_zombie_impexp_lock);
1604 spin_unlock(&obd_zombie_impexp_lock);
1607 if (export != NULL) {
1608 class_export_destroy(export);
1609 spin_lock(&obd_zombie_impexp_lock);
1611 spin_unlock(&obd_zombie_impexp_lock);
1615 } while (import != NULL || export != NULL);
1618 static struct completion obd_zombie_start;
1619 static struct completion obd_zombie_stop;
1620 static unsigned long obd_zombie_flags;
1621 static wait_queue_head_t obd_zombie_waitq;
1622 static pid_t obd_zombie_pid;
1625 OBD_ZOMBIE_STOP = 0x0001,
1629 * check for work for kill zombie import/export thread.
1631 static int obd_zombie_impexp_check(void *arg)
1635 spin_lock(&obd_zombie_impexp_lock);
1636 rc = (zombies_count == 0) &&
1637 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1638 spin_unlock(&obd_zombie_impexp_lock);
1644 * Add export to the obd_zombie thread and notify it.
1646 static void obd_zombie_export_add(struct obd_export *exp)
1648 spin_lock(&exp->exp_obd->obd_dev_lock);
1649 LASSERT(!list_empty(&exp->exp_obd_chain));
1650 list_del_init(&exp->exp_obd_chain);
1651 spin_unlock(&exp->exp_obd->obd_dev_lock);
1652 spin_lock(&obd_zombie_impexp_lock);
1654 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1655 spin_unlock(&obd_zombie_impexp_lock);
1657 obd_zombie_impexp_notify();
1661 * Add import to the obd_zombie thread and notify it.
1663 static void obd_zombie_import_add(struct obd_import *imp)
1665 LASSERT(!imp->imp_sec);
1666 LASSERT(!imp->imp_rq_pool);
1667 spin_lock(&obd_zombie_impexp_lock);
1668 LASSERT(list_empty(&imp->imp_zombie_chain));
1670 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1671 spin_unlock(&obd_zombie_impexp_lock);
1673 obd_zombie_impexp_notify();
1677 * notify import/export destroy thread about new zombie.
1679 static void obd_zombie_impexp_notify(void)
1682 * Make sure obd_zombie_impexp_thread get this notification.
1683 * It is possible this signal only get by obd_zombie_barrier, and
1684 * barrier gulps this notification and sleeps away and hangs ensues
1686 wake_up_all(&obd_zombie_waitq);
1690 * check whether obd_zombie is idle
1692 static int obd_zombie_is_idle(void)
1696 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1697 spin_lock(&obd_zombie_impexp_lock);
1698 rc = (zombies_count == 0);
1699 spin_unlock(&obd_zombie_impexp_lock);
1704 * wait when obd_zombie import/export queues become empty
1706 void obd_zombie_barrier(void)
1708 struct l_wait_info lwi = { 0 };
1710 if (obd_zombie_pid == current_pid())
1711 /* don't wait for myself */
1713 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1715 EXPORT_SYMBOL(obd_zombie_barrier);
1719 * destroy zombie export/import thread.
1721 static int obd_zombie_impexp_thread(void *unused)
1723 unshare_fs_struct();
1724 complete(&obd_zombie_start);
1726 obd_zombie_pid = current_pid();
1728 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1729 struct l_wait_info lwi = { 0 };
1731 l_wait_event(obd_zombie_waitq,
1732 !obd_zombie_impexp_check(NULL), &lwi);
1733 obd_zombie_impexp_cull();
1736 * Notify obd_zombie_barrier callers that queues
1739 wake_up(&obd_zombie_waitq);
1742 complete(&obd_zombie_stop);
1749 * start destroy zombie import/export thread
1751 int obd_zombie_impexp_init(void)
1753 struct task_struct *task;
1755 INIT_LIST_HEAD(&obd_zombie_imports);
1756 INIT_LIST_HEAD(&obd_zombie_exports);
1757 spin_lock_init(&obd_zombie_impexp_lock);
1758 init_completion(&obd_zombie_start);
1759 init_completion(&obd_zombie_stop);
1760 init_waitqueue_head(&obd_zombie_waitq);
1763 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1765 return PTR_ERR(task);
1767 wait_for_completion(&obd_zombie_start);
1771 * stop destroy zombie import/export thread
1773 void obd_zombie_impexp_stop(void)
1775 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1776 obd_zombie_impexp_notify();
1777 wait_for_completion(&obd_zombie_stop);
1780 /***** Kernel-userspace comm helpers *******/
1782 /* Get length of entire message, including header */
1783 int kuc_len(int payload_len)
1785 return sizeof(struct kuc_hdr) + payload_len;
1787 EXPORT_SYMBOL(kuc_len);
1789 /* Get a pointer to kuc header, given a ptr to the payload
1790 * @param p Pointer to payload area
1791 * @returns Pointer to kuc header
1793 struct kuc_hdr *kuc_ptr(void *p)
1795 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1797 LASSERT(lh->kuc_magic == KUC_MAGIC);
1800 EXPORT_SYMBOL(kuc_ptr);
1802 /* Test if payload is part of kuc message
1803 * @param p Pointer to payload area
1806 int kuc_ispayload(void *p)
1808 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1810 if (kh->kuc_magic == KUC_MAGIC)
1815 EXPORT_SYMBOL(kuc_ispayload);
1817 /* Alloc space for a message, and fill in header
1818 * @return Pointer to payload area
1820 void *kuc_alloc(int payload_len, int transport, int type)
1823 int len = kuc_len(payload_len);
1825 lh = kzalloc(len, GFP_NOFS);
1827 return ERR_PTR(-ENOMEM);
1829 lh->kuc_magic = KUC_MAGIC;
1830 lh->kuc_transport = transport;
1831 lh->kuc_msgtype = type;
1832 lh->kuc_msglen = len;
1834 return (void *)(lh + 1);
1836 EXPORT_SYMBOL(kuc_alloc);
1838 /* Takes pointer to payload area */
1839 inline void kuc_free(void *p, int payload_len)
1841 struct kuc_hdr *lh = kuc_ptr(p);
1845 EXPORT_SYMBOL(kuc_free);