4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
47 extern struct list_head obd_types;
48 spinlock_t obd_types_lock;
50 struct kmem_cache *obd_device_cachep;
51 struct kmem_cache *obdo_cachep;
52 EXPORT_SYMBOL(obdo_cachep);
53 struct kmem_cache *import_cachep;
55 struct list_head obd_zombie_imports;
56 struct list_head obd_zombie_exports;
57 spinlock_t obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62 const char *status, int locks);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
68 * support functions: we could use inter-module communication, but this
69 * is more portable to other OS's
71 static struct obd_device *obd_device_alloc(void)
73 struct obd_device *obd;
75 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
77 obd->obd_magic = OBD_DEVICE_MAGIC;
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
112 EXPORT_SYMBOL(class_search_type);
114 struct obd_type *class_get_type(const char *name)
116 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
121 if (strcmp(modname, "obdfilter") == 0)
124 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125 modname = LUSTRE_OSP_NAME;
127 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128 modname = LUSTRE_MDT_NAME;
130 if (!request_module("%s", modname)) {
131 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132 type = class_search_type(name);
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type->obd_type_lock);
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
146 EXPORT_SYMBOL(class_get_type);
148 void class_put_type(struct obd_type *type)
151 spin_lock(&type->obd_type_lock);
153 module_put(type->typ_dt_ops->o_owner);
154 spin_unlock(&type->obd_type_lock);
156 EXPORT_SYMBOL(class_put_type);
158 #define CLASS_MAX_NAME 1024
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161 struct lprocfs_vars *vars, const char *name,
162 struct lu_device_type *ldt)
164 struct obd_type *type;
169 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
171 if (class_search_type(name)) {
172 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
177 OBD_ALLOC(type, sizeof(*type));
181 OBD_ALLOC_PTR(type->typ_dt_ops);
182 OBD_ALLOC_PTR(type->typ_md_ops);
183 OBD_ALLOC(type->typ_name, strlen(name) + 1);
185 if (type->typ_dt_ops == NULL ||
186 type->typ_md_ops == NULL ||
187 type->typ_name == NULL)
190 *(type->typ_dt_ops) = *dt_ops;
191 /* md_ops is optional */
193 *(type->typ_md_ops) = *md_ops;
194 strcpy(type->typ_name, name);
195 spin_lock_init(&type->obd_type_lock);
198 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
200 if (IS_ERR(type->typ_procroot)) {
201 rc = PTR_ERR(type->typ_procroot);
202 type->typ_procroot = NULL;
208 rc = lu_device_type_init(ldt);
213 spin_lock(&obd_types_lock);
214 list_add(&type->typ_chain, &obd_types);
215 spin_unlock(&obd_types_lock);
220 if (type->typ_name != NULL)
221 OBD_FREE(type->typ_name, strlen(name) + 1);
222 if (type->typ_md_ops != NULL)
223 OBD_FREE_PTR(type->typ_md_ops);
224 if (type->typ_dt_ops != NULL)
225 OBD_FREE_PTR(type->typ_dt_ops);
226 OBD_FREE(type, sizeof(*type));
229 EXPORT_SYMBOL(class_register_type);
231 int class_unregister_type(const char *name)
233 struct obd_type *type = class_search_type(name);
237 CERROR("unknown obd type\n");
241 if (type->typ_refcnt) {
242 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
243 /* This is a bad situation, let's make the best of it */
244 /* Remove ops, but leave the name for debugging */
245 OBD_FREE_PTR(type->typ_dt_ops);
246 OBD_FREE_PTR(type->typ_md_ops);
250 if (type->typ_procroot) {
251 lprocfs_remove(&type->typ_procroot);
255 lu_device_type_fini(type->typ_lu);
257 spin_lock(&obd_types_lock);
258 list_del(&type->typ_chain);
259 spin_unlock(&obd_types_lock);
260 OBD_FREE(type->typ_name, strlen(name) + 1);
261 if (type->typ_dt_ops != NULL)
262 OBD_FREE_PTR(type->typ_dt_ops);
263 if (type->typ_md_ops != NULL)
264 OBD_FREE_PTR(type->typ_md_ops);
265 OBD_FREE(type, sizeof(*type));
267 } /* class_unregister_type */
268 EXPORT_SYMBOL(class_unregister_type);
271 * Create a new obd device.
273 * Find an empty slot in ::obd_devs[], create a new obd device in it.
275 * \param[in] type_name obd device type string.
276 * \param[in] name obd device name.
278 * \retval NULL if create fails, otherwise return the obd device
281 struct obd_device *class_newdev(const char *type_name, const char *name)
283 struct obd_device *result = NULL;
284 struct obd_device *newdev;
285 struct obd_type *type = NULL;
287 int new_obd_minor = 0;
290 if (strlen(name) >= MAX_OBD_NAME) {
291 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
292 RETURN(ERR_PTR(-EINVAL));
295 type = class_get_type(type_name);
297 CERROR("OBD: unknown type: %s\n", type_name);
298 RETURN(ERR_PTR(-ENODEV));
301 newdev = obd_device_alloc();
303 GOTO(out_type, result = ERR_PTR(-ENOMEM));
305 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
307 write_lock(&obd_dev_lock);
308 for (i = 0; i < class_devno_max(); i++) {
309 struct obd_device *obd = class_num2obd(i);
311 if (obd && (strcmp(name, obd->obd_name) == 0)) {
312 CERROR("Device %s already exists at %d, won't add\n",
315 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
316 "%p obd_magic %08x != %08x\n", result,
317 result->obd_magic, OBD_DEVICE_MAGIC);
318 LASSERTF(result->obd_minor == new_obd_minor,
319 "%p obd_minor %d != %d\n", result,
320 result->obd_minor, new_obd_minor);
322 obd_devs[result->obd_minor] = NULL;
323 result->obd_name[0]='\0';
325 result = ERR_PTR(-EEXIST);
328 if (!result && !obd) {
330 result->obd_minor = i;
332 result->obd_type = type;
333 strncpy(result->obd_name, name,
334 sizeof(result->obd_name) - 1);
335 obd_devs[i] = result;
338 write_unlock(&obd_dev_lock);
340 if (result == NULL && i >= class_devno_max()) {
341 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
343 GOTO(out, result = ERR_PTR(-EOVERFLOW));
349 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
350 result->obd_name, result);
354 obd_device_free(newdev);
356 class_put_type(type);
360 void class_release_dev(struct obd_device *obd)
362 struct obd_type *obd_type = obd->obd_type;
364 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
365 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
366 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
367 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
368 LASSERT(obd_type != NULL);
370 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
371 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
373 write_lock(&obd_dev_lock);
374 obd_devs[obd->obd_minor] = NULL;
375 write_unlock(&obd_dev_lock);
376 obd_device_free(obd);
378 class_put_type(obd_type);
381 int class_name2dev(const char *name)
388 read_lock(&obd_dev_lock);
389 for (i = 0; i < class_devno_max(); i++) {
390 struct obd_device *obd = class_num2obd(i);
392 if (obd && strcmp(name, obd->obd_name) == 0) {
393 /* Make sure we finished attaching before we give
394 out any references */
395 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
396 if (obd->obd_attached) {
397 read_unlock(&obd_dev_lock);
403 read_unlock(&obd_dev_lock);
407 EXPORT_SYMBOL(class_name2dev);
409 struct obd_device *class_name2obd(const char *name)
411 int dev = class_name2dev(name);
413 if (dev < 0 || dev > class_devno_max())
415 return class_num2obd(dev);
417 EXPORT_SYMBOL(class_name2obd);
419 int class_uuid2dev(struct obd_uuid *uuid)
423 read_lock(&obd_dev_lock);
424 for (i = 0; i < class_devno_max(); i++) {
425 struct obd_device *obd = class_num2obd(i);
427 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
428 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
429 read_unlock(&obd_dev_lock);
433 read_unlock(&obd_dev_lock);
437 EXPORT_SYMBOL(class_uuid2dev);
439 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
441 int dev = class_uuid2dev(uuid);
444 return class_num2obd(dev);
446 EXPORT_SYMBOL(class_uuid2obd);
449 * Get obd device from ::obd_devs[]
451 * \param num [in] array index
453 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
454 * otherwise return the obd device there.
456 struct obd_device *class_num2obd(int num)
458 struct obd_device *obd = NULL;
460 if (num < class_devno_max()) {
465 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
466 "%p obd_magic %08x != %08x\n",
467 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
468 LASSERTF(obd->obd_minor == num,
469 "%p obd_minor %0d != %0d\n",
470 obd, obd->obd_minor, num);
475 EXPORT_SYMBOL(class_num2obd);
478 * Get obd devices count. Device in any
480 * \retval obd device count
482 int get_devices_count(void)
484 int index, max_index = class_devno_max(), dev_count = 0;
486 read_lock(&obd_dev_lock);
487 for (index = 0; index <= max_index; index++) {
488 struct obd_device *obd = class_num2obd(index);
492 read_unlock(&obd_dev_lock);
496 EXPORT_SYMBOL(get_devices_count);
498 void class_obd_list(void)
503 read_lock(&obd_dev_lock);
504 for (i = 0; i < class_devno_max(); i++) {
505 struct obd_device *obd = class_num2obd(i);
509 if (obd->obd_stopping)
511 else if (obd->obd_set_up)
513 else if (obd->obd_attached)
517 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
518 i, status, obd->obd_type->typ_name,
519 obd->obd_name, obd->obd_uuid.uuid,
520 atomic_read(&obd->obd_refcount));
522 read_unlock(&obd_dev_lock);
526 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
527 specified, then only the client with that uuid is returned,
528 otherwise any client connected to the tgt is returned. */
529 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
530 const char * typ_name,
531 struct obd_uuid *grp_uuid)
535 read_lock(&obd_dev_lock);
536 for (i = 0; i < class_devno_max(); i++) {
537 struct obd_device *obd = class_num2obd(i);
541 if ((strncmp(obd->obd_type->typ_name, typ_name,
542 strlen(typ_name)) == 0)) {
543 if (obd_uuid_equals(tgt_uuid,
544 &obd->u.cli.cl_target_uuid) &&
545 ((grp_uuid)? obd_uuid_equals(grp_uuid,
546 &obd->obd_uuid) : 1)) {
547 read_unlock(&obd_dev_lock);
552 read_unlock(&obd_dev_lock);
556 EXPORT_SYMBOL(class_find_client_obd);
558 /* Iterate the obd_device list looking devices have grp_uuid. Start
559 searching at *next, and if a device is found, the next index to look
560 at is saved in *next. If next is NULL, then the first matching device
561 will always be returned. */
562 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
568 else if (*next >= 0 && *next < class_devno_max())
573 read_lock(&obd_dev_lock);
574 for (; i < class_devno_max(); i++) {
575 struct obd_device *obd = class_num2obd(i);
579 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
582 read_unlock(&obd_dev_lock);
586 read_unlock(&obd_dev_lock);
590 EXPORT_SYMBOL(class_devices_in_group);
593 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
594 * adjust sptlrpc settings accordingly.
596 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
598 struct obd_device *obd;
602 LASSERT(namelen > 0);
604 read_lock(&obd_dev_lock);
605 for (i = 0; i < class_devno_max(); i++) {
606 obd = class_num2obd(i);
608 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
611 /* only notify mdc, osc, mdt, ost */
612 type = obd->obd_type->typ_name;
613 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
614 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
615 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
616 strcmp(type, LUSTRE_OST_NAME) != 0)
619 if (strncmp(obd->obd_name, fsname, namelen))
622 class_incref(obd, __FUNCTION__, obd);
623 read_unlock(&obd_dev_lock);
624 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
625 sizeof(KEY_SPTLRPC_CONF),
626 KEY_SPTLRPC_CONF, 0, NULL, NULL);
628 class_decref(obd, __FUNCTION__, obd);
629 read_lock(&obd_dev_lock);
631 read_unlock(&obd_dev_lock);
634 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
636 void obd_cleanup_caches(void)
639 if (obd_device_cachep) {
640 kmem_cache_destroy(obd_device_cachep);
641 obd_device_cachep = NULL;
644 kmem_cache_destroy(obdo_cachep);
648 kmem_cache_destroy(import_cachep);
649 import_cachep = NULL;
652 kmem_cache_destroy(capa_cachep);
658 int obd_init_caches(void)
662 LASSERT(obd_device_cachep == NULL);
663 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
664 sizeof(struct obd_device),
666 if (!obd_device_cachep)
669 LASSERT(obdo_cachep == NULL);
670 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
675 LASSERT(import_cachep == NULL);
676 import_cachep = kmem_cache_create("ll_import_cache",
677 sizeof(struct obd_import),
682 LASSERT(capa_cachep == NULL);
683 capa_cachep = kmem_cache_create("capa_cache",
684 sizeof(struct obd_capa), 0, 0, NULL);
690 obd_cleanup_caches();
695 /* map connection to client */
696 struct obd_export *class_conn2export(struct lustre_handle *conn)
698 struct obd_export *export;
702 CDEBUG(D_CACHE, "looking for null handle\n");
706 if (conn->cookie == -1) { /* this means assign a new connection */
707 CDEBUG(D_CACHE, "want a new connection\n");
711 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
712 export = class_handle2object(conn->cookie);
715 EXPORT_SYMBOL(class_conn2export);
717 struct obd_device *class_exp2obd(struct obd_export *exp)
723 EXPORT_SYMBOL(class_exp2obd);
725 struct obd_device *class_conn2obd(struct lustre_handle *conn)
727 struct obd_export *export;
728 export = class_conn2export(conn);
730 struct obd_device *obd = export->exp_obd;
731 class_export_put(export);
736 EXPORT_SYMBOL(class_conn2obd);
738 struct obd_import *class_exp2cliimp(struct obd_export *exp)
740 struct obd_device *obd = exp->exp_obd;
743 return obd->u.cli.cl_import;
745 EXPORT_SYMBOL(class_exp2cliimp);
747 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
749 struct obd_device *obd = class_conn2obd(conn);
752 return obd->u.cli.cl_import;
754 EXPORT_SYMBOL(class_conn2cliimp);
756 /* Export management functions */
757 static void class_export_destroy(struct obd_export *exp)
759 struct obd_device *obd = exp->exp_obd;
762 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
763 LASSERT(obd != NULL);
765 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
766 exp->exp_client_uuid.uuid, obd->obd_name);
768 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
769 if (exp->exp_connection)
770 ptlrpc_put_connection_superhack(exp->exp_connection);
772 LASSERT(list_empty(&exp->exp_outstanding_replies));
773 LASSERT(list_empty(&exp->exp_uncommitted_replies));
774 LASSERT(list_empty(&exp->exp_req_replay_queue));
775 LASSERT(list_empty(&exp->exp_hp_rpcs));
776 obd_destroy_export(exp);
777 class_decref(obd, "export", exp);
779 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
783 static void export_handle_addref(void *export)
785 class_export_get(export);
788 static struct portals_handle_ops export_handle_ops = {
789 .hop_addref = export_handle_addref,
793 struct obd_export *class_export_get(struct obd_export *exp)
795 atomic_inc(&exp->exp_refcount);
796 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
797 atomic_read(&exp->exp_refcount));
800 EXPORT_SYMBOL(class_export_get);
802 void class_export_put(struct obd_export *exp)
804 LASSERT(exp != NULL);
805 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
806 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
807 atomic_read(&exp->exp_refcount) - 1);
809 if (atomic_dec_and_test(&exp->exp_refcount)) {
810 LASSERT(!list_empty(&exp->exp_obd_chain));
811 CDEBUG(D_IOCTL, "final put %p/%s\n",
812 exp, exp->exp_client_uuid.uuid);
814 /* release nid stat refererence */
815 lprocfs_exp_cleanup(exp);
817 obd_zombie_export_add(exp);
820 EXPORT_SYMBOL(class_export_put);
822 /* Creates a new export, adds it to the hash table, and returns a
823 * pointer to it. The refcount is 2: one for the hash reference, and
824 * one for the pointer returned by this function. */
825 struct obd_export *class_new_export(struct obd_device *obd,
826 struct obd_uuid *cluuid)
828 struct obd_export *export;
829 cfs_hash_t *hash = NULL;
833 OBD_ALLOC_PTR(export);
835 return ERR_PTR(-ENOMEM);
837 export->exp_conn_cnt = 0;
838 export->exp_lock_hash = NULL;
839 export->exp_flock_hash = NULL;
840 atomic_set(&export->exp_refcount, 2);
841 atomic_set(&export->exp_rpc_count, 0);
842 atomic_set(&export->exp_cb_count, 0);
843 atomic_set(&export->exp_locks_count, 0);
844 #if LUSTRE_TRACKS_LOCK_EXP_REFS
845 INIT_LIST_HEAD(&export->exp_locks_list);
846 spin_lock_init(&export->exp_locks_list_guard);
848 atomic_set(&export->exp_replay_count, 0);
849 export->exp_obd = obd;
850 INIT_LIST_HEAD(&export->exp_outstanding_replies);
851 spin_lock_init(&export->exp_uncommitted_replies_lock);
852 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
853 INIT_LIST_HEAD(&export->exp_req_replay_queue);
854 INIT_LIST_HEAD(&export->exp_handle.h_link);
855 INIT_LIST_HEAD(&export->exp_hp_rpcs);
856 class_handle_hash(&export->exp_handle, &export_handle_ops);
857 export->exp_last_request_time = cfs_time_current_sec();
858 spin_lock_init(&export->exp_lock);
859 spin_lock_init(&export->exp_rpc_lock);
860 INIT_HLIST_NODE(&export->exp_uuid_hash);
861 INIT_HLIST_NODE(&export->exp_nid_hash);
862 spin_lock_init(&export->exp_bl_list_lock);
863 INIT_LIST_HEAD(&export->exp_bl_list);
865 export->exp_sp_peer = LUSTRE_SP_ANY;
866 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
867 export->exp_client_uuid = *cluuid;
868 obd_init_export(export);
870 spin_lock(&obd->obd_dev_lock);
871 /* shouldn't happen, but might race */
872 if (obd->obd_stopping)
873 GOTO(exit_unlock, rc = -ENODEV);
875 hash = cfs_hash_getref(obd->obd_uuid_hash);
877 GOTO(exit_unlock, rc = -ENODEV);
878 spin_unlock(&obd->obd_dev_lock);
880 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
881 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
883 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
884 obd->obd_name, cluuid->uuid, rc);
885 GOTO(exit_err, rc = -EALREADY);
889 spin_lock(&obd->obd_dev_lock);
890 if (obd->obd_stopping) {
891 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
892 GOTO(exit_unlock, rc = -ENODEV);
895 class_incref(obd, "export", export);
896 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
897 list_add_tail(&export->exp_obd_chain_timed,
898 &export->exp_obd->obd_exports_timed);
899 export->exp_obd->obd_num_exports++;
900 spin_unlock(&obd->obd_dev_lock);
901 cfs_hash_putref(hash);
905 spin_unlock(&obd->obd_dev_lock);
908 cfs_hash_putref(hash);
909 class_handle_unhash(&export->exp_handle);
910 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
911 obd_destroy_export(export);
912 OBD_FREE_PTR(export);
915 EXPORT_SYMBOL(class_new_export);
917 void class_unlink_export(struct obd_export *exp)
919 class_handle_unhash(&exp->exp_handle);
921 spin_lock(&exp->exp_obd->obd_dev_lock);
922 /* delete an uuid-export hashitem from hashtables */
923 if (!hlist_unhashed(&exp->exp_uuid_hash))
924 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
925 &exp->exp_client_uuid,
926 &exp->exp_uuid_hash);
928 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
929 list_del_init(&exp->exp_obd_chain_timed);
930 exp->exp_obd->obd_num_exports--;
931 spin_unlock(&exp->exp_obd->obd_dev_lock);
932 class_export_put(exp);
934 EXPORT_SYMBOL(class_unlink_export);
936 /* Import management functions */
937 void class_import_destroy(struct obd_import *imp)
941 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
942 imp->imp_obd->obd_name);
944 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
946 ptlrpc_put_connection_superhack(imp->imp_connection);
948 while (!list_empty(&imp->imp_conn_list)) {
949 struct obd_import_conn *imp_conn;
951 imp_conn = list_entry(imp->imp_conn_list.next,
952 struct obd_import_conn, oic_item);
953 list_del_init(&imp_conn->oic_item);
954 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
955 OBD_FREE(imp_conn, sizeof(*imp_conn));
958 LASSERT(imp->imp_sec == NULL);
959 class_decref(imp->imp_obd, "import", imp);
960 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
964 static void import_handle_addref(void *import)
966 class_import_get(import);
969 static struct portals_handle_ops import_handle_ops = {
970 .hop_addref = import_handle_addref,
974 struct obd_import *class_import_get(struct obd_import *import)
976 atomic_inc(&import->imp_refcount);
977 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
978 atomic_read(&import->imp_refcount),
979 import->imp_obd->obd_name);
982 EXPORT_SYMBOL(class_import_get);
984 void class_import_put(struct obd_import *imp)
988 LASSERT(list_empty(&imp->imp_zombie_chain));
989 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
991 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
992 atomic_read(&imp->imp_refcount) - 1,
993 imp->imp_obd->obd_name);
995 if (atomic_dec_and_test(&imp->imp_refcount)) {
996 CDEBUG(D_INFO, "final put import %p\n", imp);
997 obd_zombie_import_add(imp);
1000 /* catch possible import put race */
1001 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1004 EXPORT_SYMBOL(class_import_put);
1006 static void init_imp_at(struct imp_at *at) {
1008 at_init(&at->iat_net_latency, 0, 0);
1009 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1010 /* max service estimates are tracked on the server side, so
1011 don't use the AT history here, just use the last reported
1012 val. (But keep hist for proc histogram, worst_ever) */
1013 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1018 struct obd_import *class_new_import(struct obd_device *obd)
1020 struct obd_import *imp;
1022 OBD_ALLOC(imp, sizeof(*imp));
1026 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1027 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1028 INIT_LIST_HEAD(&imp->imp_replay_list);
1029 INIT_LIST_HEAD(&imp->imp_sending_list);
1030 INIT_LIST_HEAD(&imp->imp_delayed_list);
1031 spin_lock_init(&imp->imp_lock);
1032 imp->imp_last_success_conn = 0;
1033 imp->imp_state = LUSTRE_IMP_NEW;
1034 imp->imp_obd = class_incref(obd, "import", imp);
1035 mutex_init(&imp->imp_sec_mutex);
1036 init_waitqueue_head(&imp->imp_recovery_waitq);
1038 atomic_set(&imp->imp_refcount, 2);
1039 atomic_set(&imp->imp_unregistering, 0);
1040 atomic_set(&imp->imp_inflight, 0);
1041 atomic_set(&imp->imp_replay_inflight, 0);
1042 atomic_set(&imp->imp_inval_count, 0);
1043 INIT_LIST_HEAD(&imp->imp_conn_list);
1044 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1045 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1046 init_imp_at(&imp->imp_at);
1048 /* the default magic is V2, will be used in connect RPC, and
1049 * then adjusted according to the flags in request/reply. */
1050 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1054 EXPORT_SYMBOL(class_new_import);
1056 void class_destroy_import(struct obd_import *import)
1058 LASSERT(import != NULL);
1059 LASSERT(import != LP_POISON);
1061 class_handle_unhash(&import->imp_handle);
1063 spin_lock(&import->imp_lock);
1064 import->imp_generation++;
1065 spin_unlock(&import->imp_lock);
1066 class_import_put(import);
1068 EXPORT_SYMBOL(class_destroy_import);
1070 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1072 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1074 spin_lock(&exp->exp_locks_list_guard);
1076 LASSERT(lock->l_exp_refs_nr >= 0);
1078 if (lock->l_exp_refs_target != NULL &&
1079 lock->l_exp_refs_target != exp) {
1080 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1081 exp, lock, lock->l_exp_refs_target);
1083 if ((lock->l_exp_refs_nr ++) == 0) {
1084 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1085 lock->l_exp_refs_target = exp;
1087 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1088 lock, exp, lock->l_exp_refs_nr);
1089 spin_unlock(&exp->exp_locks_list_guard);
1091 EXPORT_SYMBOL(__class_export_add_lock_ref);
1093 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1095 spin_lock(&exp->exp_locks_list_guard);
1096 LASSERT(lock->l_exp_refs_nr > 0);
1097 if (lock->l_exp_refs_target != exp) {
1098 LCONSOLE_WARN("lock %p, "
1099 "mismatching export pointers: %p, %p\n",
1100 lock, lock->l_exp_refs_target, exp);
1102 if (-- lock->l_exp_refs_nr == 0) {
1103 list_del_init(&lock->l_exp_refs_link);
1104 lock->l_exp_refs_target = NULL;
1106 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1107 lock, exp, lock->l_exp_refs_nr);
1108 spin_unlock(&exp->exp_locks_list_guard);
1110 EXPORT_SYMBOL(__class_export_del_lock_ref);
1113 /* A connection defines an export context in which preallocation can
1114 be managed. This releases the export pointer reference, and returns
1115 the export handle, so the export refcount is 1 when this function
1117 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1118 struct obd_uuid *cluuid)
1120 struct obd_export *export;
1121 LASSERT(conn != NULL);
1122 LASSERT(obd != NULL);
1123 LASSERT(cluuid != NULL);
1126 export = class_new_export(obd, cluuid);
1128 RETURN(PTR_ERR(export));
1130 conn->cookie = export->exp_handle.h_cookie;
1131 class_export_put(export);
1133 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1134 cluuid->uuid, conn->cookie);
1137 EXPORT_SYMBOL(class_connect);
1139 /* if export is involved in recovery then clean up related things */
1140 void class_export_recovery_cleanup(struct obd_export *exp)
1142 struct obd_device *obd = exp->exp_obd;
1144 spin_lock(&obd->obd_recovery_task_lock);
1145 if (exp->exp_delayed)
1146 obd->obd_delayed_clients--;
1147 if (obd->obd_recovering) {
1148 if (exp->exp_in_recovery) {
1149 spin_lock(&exp->exp_lock);
1150 exp->exp_in_recovery = 0;
1151 spin_unlock(&exp->exp_lock);
1152 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1153 atomic_dec(&obd->obd_connected_clients);
1156 /* if called during recovery then should update
1157 * obd_stale_clients counter,
1158 * lightweight exports are not counted */
1159 if (exp->exp_failed &&
1160 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1161 exp->exp_obd->obd_stale_clients++;
1163 spin_unlock(&obd->obd_recovery_task_lock);
1164 /** Cleanup req replay fields */
1165 if (exp->exp_req_replay_needed) {
1166 spin_lock(&exp->exp_lock);
1167 exp->exp_req_replay_needed = 0;
1168 spin_unlock(&exp->exp_lock);
1169 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1170 atomic_dec(&obd->obd_req_replay_clients);
1172 /** Cleanup lock replay data */
1173 if (exp->exp_lock_replay_needed) {
1174 spin_lock(&exp->exp_lock);
1175 exp->exp_lock_replay_needed = 0;
1176 spin_unlock(&exp->exp_lock);
1177 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1178 atomic_dec(&obd->obd_lock_replay_clients);
1182 /* This function removes 1-3 references from the export:
1183 * 1 - for export pointer passed
1184 * and if disconnect really need
1185 * 2 - removing from hash
1186 * 3 - in client_unlink_export
1187 * The export pointer passed to this function can destroyed */
1188 int class_disconnect(struct obd_export *export)
1190 int already_disconnected;
1193 if (export == NULL) {
1194 CWARN("attempting to free NULL export %p\n", export);
1198 spin_lock(&export->exp_lock);
1199 already_disconnected = export->exp_disconnected;
1200 export->exp_disconnected = 1;
1201 spin_unlock(&export->exp_lock);
1203 /* class_cleanup(), abort_recovery(), and class_fail_export()
1204 * all end up in here, and if any of them race we shouldn't
1205 * call extra class_export_puts(). */
1206 if (already_disconnected) {
1207 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1208 GOTO(no_disconn, already_disconnected);
1211 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1212 export->exp_handle.h_cookie);
1214 if (!hlist_unhashed(&export->exp_nid_hash))
1215 cfs_hash_del(export->exp_obd->obd_nid_hash,
1216 &export->exp_connection->c_peer.nid,
1217 &export->exp_nid_hash);
1219 class_export_recovery_cleanup(export);
1220 class_unlink_export(export);
1222 class_export_put(export);
1225 EXPORT_SYMBOL(class_disconnect);
1227 /* Return non-zero for a fully connected export */
1228 int class_connected_export(struct obd_export *exp)
1232 spin_lock(&exp->exp_lock);
1233 connected = (exp->exp_conn_cnt > 0);
1234 spin_unlock(&exp->exp_lock);
1239 EXPORT_SYMBOL(class_connected_export);
1241 static void class_disconnect_export_list(struct list_head *list,
1242 enum obd_option flags)
1245 struct obd_export *exp;
1248 /* It's possible that an export may disconnect itself, but
1249 * nothing else will be added to this list. */
1250 while (!list_empty(list)) {
1251 exp = list_entry(list->next, struct obd_export,
1253 /* need for safe call CDEBUG after obd_disconnect */
1254 class_export_get(exp);
1256 spin_lock(&exp->exp_lock);
1257 exp->exp_flags = flags;
1258 spin_unlock(&exp->exp_lock);
1260 if (obd_uuid_equals(&exp->exp_client_uuid,
1261 &exp->exp_obd->obd_uuid)) {
1263 "exp %p export uuid == obd uuid, don't discon\n",
1265 /* Need to delete this now so we don't end up pointing
1266 * to work_list later when this export is cleaned up. */
1267 list_del_init(&exp->exp_obd_chain);
1268 class_export_put(exp);
1272 class_export_get(exp);
1273 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1274 "last request at "CFS_TIME_T"\n",
1275 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1276 exp, exp->exp_last_request_time);
1277 /* release one export reference anyway */
1278 rc = obd_disconnect(exp);
1280 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1281 obd_export_nid2str(exp), exp, rc);
1282 class_export_put(exp);
1287 void class_disconnect_exports(struct obd_device *obd)
1289 struct list_head work_list;
1292 /* Move all of the exports from obd_exports to a work list, en masse. */
1293 INIT_LIST_HEAD(&work_list);
1294 spin_lock(&obd->obd_dev_lock);
1295 list_splice_init(&obd->obd_exports, &work_list);
1296 list_splice_init(&obd->obd_delayed_exports, &work_list);
1297 spin_unlock(&obd->obd_dev_lock);
1299 if (!list_empty(&work_list)) {
1300 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1301 "disconnecting them\n", obd->obd_minor, obd);
1302 class_disconnect_export_list(&work_list,
1303 exp_flags_from_obd(obd));
1305 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1306 obd->obd_minor, obd);
1309 EXPORT_SYMBOL(class_disconnect_exports);
1311 /* Remove exports that have not completed recovery.
1313 void class_disconnect_stale_exports(struct obd_device *obd,
1314 int (*test_export)(struct obd_export *))
1316 struct list_head work_list;
1317 struct obd_export *exp, *n;
1321 INIT_LIST_HEAD(&work_list);
1322 spin_lock(&obd->obd_dev_lock);
1323 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1325 /* don't count self-export as client */
1326 if (obd_uuid_equals(&exp->exp_client_uuid,
1327 &exp->exp_obd->obd_uuid))
1330 /* don't evict clients which have no slot in last_rcvd
1331 * (e.g. lightweight connection) */
1332 if (exp->exp_target_data.ted_lr_idx == -1)
1335 spin_lock(&exp->exp_lock);
1336 if (exp->exp_failed || test_export(exp)) {
1337 spin_unlock(&exp->exp_lock);
1340 exp->exp_failed = 1;
1341 spin_unlock(&exp->exp_lock);
1343 list_move(&exp->exp_obd_chain, &work_list);
1345 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1346 obd->obd_name, exp->exp_client_uuid.uuid,
1347 exp->exp_connection == NULL ? "<unknown>" :
1348 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1349 print_export_data(exp, "EVICTING", 0);
1351 spin_unlock(&obd->obd_dev_lock);
1354 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1355 obd->obd_name, evicted);
1357 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1358 OBD_OPT_ABORT_RECOV);
1361 EXPORT_SYMBOL(class_disconnect_stale_exports);
1363 void class_fail_export(struct obd_export *exp)
1365 int rc, already_failed;
1367 spin_lock(&exp->exp_lock);
1368 already_failed = exp->exp_failed;
1369 exp->exp_failed = 1;
1370 spin_unlock(&exp->exp_lock);
1372 if (already_failed) {
1373 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1374 exp, exp->exp_client_uuid.uuid);
1378 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1379 exp, exp->exp_client_uuid.uuid);
1381 if (obd_dump_on_timeout)
1382 libcfs_debug_dumplog();
1384 /* need for safe call CDEBUG after obd_disconnect */
1385 class_export_get(exp);
1387 /* Most callers into obd_disconnect are removing their own reference
1388 * (request, for example) in addition to the one from the hash table.
1389 * We don't have such a reference here, so make one. */
1390 class_export_get(exp);
1391 rc = obd_disconnect(exp);
1393 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1395 CDEBUG(D_HA, "disconnected export %p/%s\n",
1396 exp, exp->exp_client_uuid.uuid);
1397 class_export_put(exp);
1399 EXPORT_SYMBOL(class_fail_export);
1401 char *obd_export_nid2str(struct obd_export *exp)
1403 if (exp->exp_connection != NULL)
1404 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1408 EXPORT_SYMBOL(obd_export_nid2str);
1410 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1412 cfs_hash_t *nid_hash;
1413 struct obd_export *doomed_exp = NULL;
1414 int exports_evicted = 0;
1416 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1418 spin_lock(&obd->obd_dev_lock);
1419 /* umount has run already, so evict thread should leave
1420 * its task to umount thread now */
1421 if (obd->obd_stopping) {
1422 spin_unlock(&obd->obd_dev_lock);
1423 return exports_evicted;
1425 nid_hash = obd->obd_nid_hash;
1426 cfs_hash_getref(nid_hash);
1427 spin_unlock(&obd->obd_dev_lock);
1430 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1431 if (doomed_exp == NULL)
1434 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1435 "nid %s found, wanted nid %s, requested nid %s\n",
1436 obd_export_nid2str(doomed_exp),
1437 libcfs_nid2str(nid_key), nid);
1438 LASSERTF(doomed_exp != obd->obd_self_export,
1439 "self-export is hashed by NID?\n");
1441 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1442 "request\n", obd->obd_name,
1443 obd_uuid2str(&doomed_exp->exp_client_uuid),
1444 obd_export_nid2str(doomed_exp));
1445 class_fail_export(doomed_exp);
1446 class_export_put(doomed_exp);
1449 cfs_hash_putref(nid_hash);
1451 if (!exports_evicted)
1452 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1453 obd->obd_name, nid);
1454 return exports_evicted;
1456 EXPORT_SYMBOL(obd_export_evict_by_nid);
1458 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1460 cfs_hash_t *uuid_hash;
1461 struct obd_export *doomed_exp = NULL;
1462 struct obd_uuid doomed_uuid;
1463 int exports_evicted = 0;
1465 spin_lock(&obd->obd_dev_lock);
1466 if (obd->obd_stopping) {
1467 spin_unlock(&obd->obd_dev_lock);
1468 return exports_evicted;
1470 uuid_hash = obd->obd_uuid_hash;
1471 cfs_hash_getref(uuid_hash);
1472 spin_unlock(&obd->obd_dev_lock);
1474 obd_str2uuid(&doomed_uuid, uuid);
1475 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1476 CERROR("%s: can't evict myself\n", obd->obd_name);
1477 cfs_hash_putref(uuid_hash);
1478 return exports_evicted;
1481 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1483 if (doomed_exp == NULL) {
1484 CERROR("%s: can't disconnect %s: no exports found\n",
1485 obd->obd_name, uuid);
1487 CWARN("%s: evicting %s at adminstrative request\n",
1488 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1489 class_fail_export(doomed_exp);
1490 class_export_put(doomed_exp);
1493 cfs_hash_putref(uuid_hash);
1495 return exports_evicted;
1497 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1499 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1500 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1501 EXPORT_SYMBOL(class_export_dump_hook);
1504 static void print_export_data(struct obd_export *exp, const char *status,
1507 struct ptlrpc_reply_state *rs;
1508 struct ptlrpc_reply_state *first_reply = NULL;
1511 spin_lock(&exp->exp_lock);
1512 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1518 spin_unlock(&exp->exp_lock);
1520 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1521 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1522 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1523 atomic_read(&exp->exp_rpc_count),
1524 atomic_read(&exp->exp_cb_count),
1525 atomic_read(&exp->exp_locks_count),
1526 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1527 nreplies, first_reply, nreplies > 3 ? "..." : "",
1528 exp->exp_last_committed);
1529 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1530 if (locks && class_export_dump_hook != NULL)
1531 class_export_dump_hook(exp);
1535 void dump_exports(struct obd_device *obd, int locks)
1537 struct obd_export *exp;
1539 spin_lock(&obd->obd_dev_lock);
1540 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1541 print_export_data(exp, "ACTIVE", locks);
1542 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1543 print_export_data(exp, "UNLINKED", locks);
1544 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1545 print_export_data(exp, "DELAYED", locks);
1546 spin_unlock(&obd->obd_dev_lock);
1547 spin_lock(&obd_zombie_impexp_lock);
1548 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1549 print_export_data(exp, "ZOMBIE", locks);
1550 spin_unlock(&obd_zombie_impexp_lock);
1552 EXPORT_SYMBOL(dump_exports);
1554 void obd_exports_barrier(struct obd_device *obd)
1557 LASSERT(list_empty(&obd->obd_exports));
1558 spin_lock(&obd->obd_dev_lock);
1559 while (!list_empty(&obd->obd_unlinked_exports)) {
1560 spin_unlock(&obd->obd_dev_lock);
1561 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1562 cfs_time_seconds(waited));
1563 if (waited > 5 && IS_PO2(waited)) {
1564 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1565 "more than %d seconds. "
1566 "The obd refcount = %d. Is it stuck?\n",
1567 obd->obd_name, waited,
1568 atomic_read(&obd->obd_refcount));
1569 dump_exports(obd, 1);
1572 spin_lock(&obd->obd_dev_lock);
1574 spin_unlock(&obd->obd_dev_lock);
1576 EXPORT_SYMBOL(obd_exports_barrier);
1578 /* Total amount of zombies to be destroyed */
1579 static int zombies_count = 0;
1582 * kill zombie imports and exports
1584 void obd_zombie_impexp_cull(void)
1586 struct obd_import *import;
1587 struct obd_export *export;
1591 spin_lock(&obd_zombie_impexp_lock);
1594 if (!list_empty(&obd_zombie_imports)) {
1595 import = list_entry(obd_zombie_imports.next,
1598 list_del_init(&import->imp_zombie_chain);
1602 if (!list_empty(&obd_zombie_exports)) {
1603 export = list_entry(obd_zombie_exports.next,
1606 list_del_init(&export->exp_obd_chain);
1609 spin_unlock(&obd_zombie_impexp_lock);
1611 if (import != NULL) {
1612 class_import_destroy(import);
1613 spin_lock(&obd_zombie_impexp_lock);
1615 spin_unlock(&obd_zombie_impexp_lock);
1618 if (export != NULL) {
1619 class_export_destroy(export);
1620 spin_lock(&obd_zombie_impexp_lock);
1622 spin_unlock(&obd_zombie_impexp_lock);
1626 } while (import != NULL || export != NULL);
1630 static struct completion obd_zombie_start;
1631 static struct completion obd_zombie_stop;
1632 static unsigned long obd_zombie_flags;
1633 static wait_queue_head_t obd_zombie_waitq;
1634 static pid_t obd_zombie_pid;
1637 OBD_ZOMBIE_STOP = 0x0001,
1641 * check for work for kill zombie import/export thread.
1643 static int obd_zombie_impexp_check(void *arg)
1647 spin_lock(&obd_zombie_impexp_lock);
1648 rc = (zombies_count == 0) &&
1649 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1650 spin_unlock(&obd_zombie_impexp_lock);
1656 * Add export to the obd_zombe thread and notify it.
1658 static void obd_zombie_export_add(struct obd_export *exp) {
1659 spin_lock(&exp->exp_obd->obd_dev_lock);
1660 LASSERT(!list_empty(&exp->exp_obd_chain));
1661 list_del_init(&exp->exp_obd_chain);
1662 spin_unlock(&exp->exp_obd->obd_dev_lock);
1663 spin_lock(&obd_zombie_impexp_lock);
1665 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1666 spin_unlock(&obd_zombie_impexp_lock);
1668 obd_zombie_impexp_notify();
1672 * Add import to the obd_zombe thread and notify it.
1674 static void obd_zombie_import_add(struct obd_import *imp) {
1675 LASSERT(imp->imp_sec == NULL);
1676 LASSERT(imp->imp_rq_pool == NULL);
1677 spin_lock(&obd_zombie_impexp_lock);
1678 LASSERT(list_empty(&imp->imp_zombie_chain));
1680 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1681 spin_unlock(&obd_zombie_impexp_lock);
1683 obd_zombie_impexp_notify();
1687 * notify import/export destroy thread about new zombie.
1689 static void obd_zombie_impexp_notify(void)
1692 * Make sure obd_zomebie_impexp_thread get this notification.
1693 * It is possible this signal only get by obd_zombie_barrier, and
1694 * barrier gulps this notification and sleeps away and hangs ensues
1696 wake_up_all(&obd_zombie_waitq);
1700 * check whether obd_zombie is idle
1702 static int obd_zombie_is_idle(void)
1706 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1707 spin_lock(&obd_zombie_impexp_lock);
1708 rc = (zombies_count == 0);
1709 spin_unlock(&obd_zombie_impexp_lock);
1714 * wait when obd_zombie import/export queues become empty
1716 void obd_zombie_barrier(void)
1718 struct l_wait_info lwi = { 0 };
1720 if (obd_zombie_pid == current_pid())
1721 /* don't wait for myself */
1723 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1725 EXPORT_SYMBOL(obd_zombie_barrier);
1729 * destroy zombie export/import thread.
1731 static int obd_zombie_impexp_thread(void *unused)
1733 unshare_fs_struct();
1734 complete(&obd_zombie_start);
1736 obd_zombie_pid = current_pid();
1738 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1739 struct l_wait_info lwi = { 0 };
1741 l_wait_event(obd_zombie_waitq,
1742 !obd_zombie_impexp_check(NULL), &lwi);
1743 obd_zombie_impexp_cull();
1746 * Notify obd_zombie_barrier callers that queues
1749 wake_up(&obd_zombie_waitq);
1752 complete(&obd_zombie_stop);
1759 * start destroy zombie import/export thread
1761 int obd_zombie_impexp_init(void)
1765 INIT_LIST_HEAD(&obd_zombie_imports);
1766 INIT_LIST_HEAD(&obd_zombie_exports);
1767 spin_lock_init(&obd_zombie_impexp_lock);
1768 init_completion(&obd_zombie_start);
1769 init_completion(&obd_zombie_stop);
1770 init_waitqueue_head(&obd_zombie_waitq);
1773 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1775 RETURN(PTR_ERR(task));
1777 wait_for_completion(&obd_zombie_start);
1781 * stop destroy zombie import/export thread
1783 void obd_zombie_impexp_stop(void)
1785 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1786 obd_zombie_impexp_notify();
1787 wait_for_completion(&obd_zombie_stop);
1790 /***** Kernel-userspace comm helpers *******/
1792 /* Get length of entire message, including header */
1793 int kuc_len(int payload_len)
1795 return sizeof(struct kuc_hdr) + payload_len;
1797 EXPORT_SYMBOL(kuc_len);
1799 /* Get a pointer to kuc header, given a ptr to the payload
1800 * @param p Pointer to payload area
1801 * @returns Pointer to kuc header
1803 struct kuc_hdr * kuc_ptr(void *p)
1805 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1806 LASSERT(lh->kuc_magic == KUC_MAGIC);
1809 EXPORT_SYMBOL(kuc_ptr);
1811 /* Test if payload is part of kuc message
1812 * @param p Pointer to payload area
1815 int kuc_ispayload(void *p)
1817 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1819 if (kh->kuc_magic == KUC_MAGIC)
1824 EXPORT_SYMBOL(kuc_ispayload);
1826 /* Alloc space for a message, and fill in header
1827 * @return Pointer to payload area
1829 void *kuc_alloc(int payload_len, int transport, int type)
1832 int len = kuc_len(payload_len);
1836 return ERR_PTR(-ENOMEM);
1838 lh->kuc_magic = KUC_MAGIC;
1839 lh->kuc_transport = transport;
1840 lh->kuc_msgtype = type;
1841 lh->kuc_msglen = len;
1843 return (void *)(lh + 1);
1845 EXPORT_SYMBOL(kuc_alloc);
1847 /* Takes pointer to payload area */
1848 inline void kuc_free(void *p, int payload_len)
1850 struct kuc_hdr *lh = kuc_ptr(p);
1851 OBD_FREE(lh, kuc_len(payload_len));
1853 EXPORT_SYMBOL(kuc_free);