staging: lustre: drop null test before destroy functions
[linux-block.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head      obd_zombie_imports;
54 static struct list_head      obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL)
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         return obd;
77 }
78
79 static void obd_device_free(struct obd_device *obd)
80 {
81         LASSERT(obd != NULL);
82         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84         if (obd->obd_namespace != NULL) {
85                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86                        obd, obd->obd_namespace, obd->obd_force);
87                 LBUG();
88         }
89         lu_ref_fini(&obd->obd_reference);
90         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 }
92
93 struct obd_type *class_search_type(const char *name)
94 {
95         struct list_head *tmp;
96         struct obd_type *type;
97
98         spin_lock(&obd_types_lock);
99         list_for_each(tmp, &obd_types) {
100                 type = list_entry(tmp, struct obd_type, typ_chain);
101                 if (strcmp(type->typ_name, name) == 0) {
102                         spin_unlock(&obd_types_lock);
103                         return type;
104                 }
105         }
106         spin_unlock(&obd_types_lock);
107         return NULL;
108 }
109 EXPORT_SYMBOL(class_search_type);
110
111 struct obd_type *class_get_type(const char *name)
112 {
113         struct obd_type *type = class_search_type(name);
114
115         if (!type) {
116                 const char *modname = name;
117
118                 if (strcmp(modname, "obdfilter") == 0)
119                         modname = "ofd";
120
121                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
122                         modname = LUSTRE_OSP_NAME;
123
124                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
125                         modname = LUSTRE_MDT_NAME;
126
127                 if (!request_module("%s", modname)) {
128                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
129                         type = class_search_type(name);
130                 } else {
131                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132                                            modname);
133                 }
134         }
135         if (type) {
136                 spin_lock(&type->obd_type_lock);
137                 type->typ_refcnt++;
138                 try_module_get(type->typ_dt_ops->o_owner);
139                 spin_unlock(&type->obd_type_lock);
140         }
141         return type;
142 }
143 EXPORT_SYMBOL(class_get_type);
144
145 void class_put_type(struct obd_type *type)
146 {
147         LASSERT(type);
148         spin_lock(&type->obd_type_lock);
149         type->typ_refcnt--;
150         module_put(type->typ_dt_ops->o_owner);
151         spin_unlock(&type->obd_type_lock);
152 }
153 EXPORT_SYMBOL(class_put_type);
154
155 #define CLASS_MAX_NAME 1024
156
157 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
158                         const char *name,
159                         struct lu_device_type *ldt)
160 {
161         struct obd_type *type;
162         int rc = 0;
163
164         /* sanity check */
165         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166
167         if (class_search_type(name)) {
168                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169                 return -EEXIST;
170         }
171
172         rc = -ENOMEM;
173         type = kzalloc(sizeof(*type), GFP_NOFS);
174         if (!type)
175                 return rc;
176
177         type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
178         type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
179         type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
180
181         if (!type->typ_dt_ops ||
182             !type->typ_md_ops ||
183             !type->typ_name)
184                 goto failed;
185
186         *(type->typ_dt_ops) = *dt_ops;
187         /* md_ops is optional */
188         if (md_ops)
189                 *(type->typ_md_ops) = *md_ops;
190         strcpy(type->typ_name, name);
191         spin_lock_init(&type->obd_type_lock);
192
193         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
194                                                     debugfs_lustre_root,
195                                                     NULL, type);
196         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
197                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
198                                              : -ENOMEM;
199                 type->typ_debugfs_entry = NULL;
200                 goto failed;
201         }
202
203         type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
204         if (!type->typ_kobj) {
205                 rc = -ENOMEM;
206                 goto failed;
207         }
208
209         if (ldt != NULL) {
210                 type->typ_lu = ldt;
211                 rc = lu_device_type_init(ldt);
212                 if (rc != 0)
213                         goto failed;
214         }
215
216         spin_lock(&obd_types_lock);
217         list_add(&type->typ_chain, &obd_types);
218         spin_unlock(&obd_types_lock);
219
220         return 0;
221
222  failed:
223         if (type->typ_kobj)
224                 kobject_put(type->typ_kobj);
225         kfree(type->typ_name);
226         kfree(type->typ_md_ops);
227         kfree(type->typ_dt_ops);
228         kfree(type);
229         return rc;
230 }
231 EXPORT_SYMBOL(class_register_type);
232
233 int class_unregister_type(const char *name)
234 {
235         struct obd_type *type = class_search_type(name);
236
237         if (!type) {
238                 CERROR("unknown obd type\n");
239                 return -EINVAL;
240         }
241
242         if (type->typ_refcnt) {
243                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
244                 /* This is a bad situation, let's make the best of it */
245                 /* Remove ops, but leave the name for debugging */
246                 kfree(type->typ_dt_ops);
247                 kfree(type->typ_md_ops);
248                 return -EBUSY;
249         }
250
251         if (type->typ_kobj)
252                 kobject_put(type->typ_kobj);
253
254         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
255                 ldebugfs_remove(&type->typ_debugfs_entry);
256
257         if (type->typ_lu)
258                 lu_device_type_fini(type->typ_lu);
259
260         spin_lock(&obd_types_lock);
261         list_del(&type->typ_chain);
262         spin_unlock(&obd_types_lock);
263         kfree(type->typ_name);
264         kfree(type->typ_dt_ops);
265         kfree(type->typ_md_ops);
266         kfree(type);
267         return 0;
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
270
271 /**
272  * Create a new obd device.
273  *
274  * Find an empty slot in ::obd_devs[], create a new obd device in it.
275  *
276  * \param[in] type_name obd device type string.
277  * \param[in] name      obd device name.
278  *
279  * \retval NULL if create fails, otherwise return the obd device
280  *       pointer created.
281  */
282 struct obd_device *class_newdev(const char *type_name, const char *name)
283 {
284         struct obd_device *result = NULL;
285         struct obd_device *newdev;
286         struct obd_type *type = NULL;
287         int i;
288         int new_obd_minor = 0;
289
290         if (strlen(name) >= MAX_OBD_NAME) {
291                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
292                 return ERR_PTR(-EINVAL);
293         }
294
295         type = class_get_type(type_name);
296         if (!type) {
297                 CERROR("OBD: unknown type: %s\n", type_name);
298                 return ERR_PTR(-ENODEV);
299         }
300
301         newdev = obd_device_alloc();
302         if (!newdev) {
303                 result = ERR_PTR(-ENOMEM);
304                 goto out_type;
305         }
306
307         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
308
309         write_lock(&obd_dev_lock);
310         for (i = 0; i < class_devno_max(); i++) {
311                 struct obd_device *obd = class_num2obd(i);
312
313                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
314                         CERROR("Device %s already exists at %d, won't add\n",
315                                name, i);
316                         if (result) {
317                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
318                                          "%p obd_magic %08x != %08x\n", result,
319                                          result->obd_magic, OBD_DEVICE_MAGIC);
320                                 LASSERTF(result->obd_minor == new_obd_minor,
321                                          "%p obd_minor %d != %d\n", result,
322                                          result->obd_minor, new_obd_minor);
323
324                                 obd_devs[result->obd_minor] = NULL;
325                                 result->obd_name[0] = '\0';
326                          }
327                         result = ERR_PTR(-EEXIST);
328                         break;
329                 }
330                 if (!result && !obd) {
331                         result = newdev;
332                         result->obd_minor = i;
333                         new_obd_minor = i;
334                         result->obd_type = type;
335                         strncpy(result->obd_name, name,
336                                 sizeof(result->obd_name) - 1);
337                         obd_devs[i] = result;
338                 }
339         }
340         write_unlock(&obd_dev_lock);
341
342         if (!result && i >= class_devno_max()) {
343                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
344                        class_devno_max());
345                 result = ERR_PTR(-EOVERFLOW);
346                 goto out;
347         }
348
349         if (IS_ERR(result))
350                 goto out;
351
352         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
353                result->obd_name, result);
354
355         return result;
356 out:
357         obd_device_free(newdev);
358 out_type:
359         class_put_type(type);
360         return result;
361 }
362
363 void class_release_dev(struct obd_device *obd)
364 {
365         struct obd_type *obd_type = obd->obd_type;
366
367         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
368                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
369         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
370                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
371         LASSERT(obd_type != NULL);
372
373         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
374                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
375
376         write_lock(&obd_dev_lock);
377         obd_devs[obd->obd_minor] = NULL;
378         write_unlock(&obd_dev_lock);
379         obd_device_free(obd);
380
381         class_put_type(obd_type);
382 }
383
384 int class_name2dev(const char *name)
385 {
386         int i;
387
388         if (!name)
389                 return -1;
390
391         read_lock(&obd_dev_lock);
392         for (i = 0; i < class_devno_max(); i++) {
393                 struct obd_device *obd = class_num2obd(i);
394
395                 if (obd && strcmp(name, obd->obd_name) == 0) {
396                         /* Make sure we finished attaching before we give
397                            out any references */
398                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
399                         if (obd->obd_attached) {
400                                 read_unlock(&obd_dev_lock);
401                                 return i;
402                         }
403                         break;
404                 }
405         }
406         read_unlock(&obd_dev_lock);
407
408         return -1;
409 }
410 EXPORT_SYMBOL(class_name2dev);
411
412 struct obd_device *class_name2obd(const char *name)
413 {
414         int dev = class_name2dev(name);
415
416         if (dev < 0 || dev > class_devno_max())
417                 return NULL;
418         return class_num2obd(dev);
419 }
420 EXPORT_SYMBOL(class_name2obd);
421
422 int class_uuid2dev(struct obd_uuid *uuid)
423 {
424         int i;
425
426         read_lock(&obd_dev_lock);
427         for (i = 0; i < class_devno_max(); i++) {
428                 struct obd_device *obd = class_num2obd(i);
429
430                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
431                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
432                         read_unlock(&obd_dev_lock);
433                         return i;
434                 }
435         }
436         read_unlock(&obd_dev_lock);
437
438         return -1;
439 }
440 EXPORT_SYMBOL(class_uuid2dev);
441
442 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
443 {
444         int dev = class_uuid2dev(uuid);
445
446         if (dev < 0)
447                 return NULL;
448         return class_num2obd(dev);
449 }
450 EXPORT_SYMBOL(class_uuid2obd);
451
452 /**
453  * Get obd device from ::obd_devs[]
454  *
455  * \param num [in] array index
456  *
457  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
458  *       otherwise return the obd device there.
459  */
460 struct obd_device *class_num2obd(int num)
461 {
462         struct obd_device *obd = NULL;
463
464         if (num < class_devno_max()) {
465                 obd = obd_devs[num];
466                 if (!obd)
467                         return NULL;
468
469                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
470                          "%p obd_magic %08x != %08x\n",
471                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
472                 LASSERTF(obd->obd_minor == num,
473                          "%p obd_minor %0d != %0d\n",
474                          obd, obd->obd_minor, num);
475         }
476
477         return obd;
478 }
479 EXPORT_SYMBOL(class_num2obd);
480
481 /**
482  * Get obd devices count. Device in any
483  *    state are counted
484  * \retval obd device count
485  */
486 int get_devices_count(void)
487 {
488         int index, max_index = class_devno_max(), dev_count = 0;
489
490         read_lock(&obd_dev_lock);
491         for (index = 0; index <= max_index; index++) {
492                 struct obd_device *obd = class_num2obd(index);
493
494                 if (obd != NULL)
495                         dev_count++;
496         }
497         read_unlock(&obd_dev_lock);
498
499         return dev_count;
500 }
501 EXPORT_SYMBOL(get_devices_count);
502
503 void class_obd_list(void)
504 {
505         char *status;
506         int i;
507
508         read_lock(&obd_dev_lock);
509         for (i = 0; i < class_devno_max(); i++) {
510                 struct obd_device *obd = class_num2obd(i);
511
512                 if (!obd)
513                         continue;
514                 if (obd->obd_stopping)
515                         status = "ST";
516                 else if (obd->obd_set_up)
517                         status = "UP";
518                 else if (obd->obd_attached)
519                         status = "AT";
520                 else
521                         status = "--";
522                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
523                          i, status, obd->obd_type->typ_name,
524                          obd->obd_name, obd->obd_uuid.uuid,
525                          atomic_read(&obd->obd_refcount));
526         }
527         read_unlock(&obd_dev_lock);
528 }
529
530 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
531    specified, then only the client with that uuid is returned,
532    otherwise any client connected to the tgt is returned. */
533 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
534                                           const char *typ_name,
535                                           struct obd_uuid *grp_uuid)
536 {
537         int i;
538
539         read_lock(&obd_dev_lock);
540         for (i = 0; i < class_devno_max(); i++) {
541                 struct obd_device *obd = class_num2obd(i);
542
543                 if (!obd)
544                         continue;
545                 if ((strncmp(obd->obd_type->typ_name, typ_name,
546                              strlen(typ_name)) == 0)) {
547                         if (obd_uuid_equals(tgt_uuid,
548                                             &obd->u.cli.cl_target_uuid) &&
549                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
550                                                          &obd->obd_uuid) : 1)) {
551                                 read_unlock(&obd_dev_lock);
552                                 return obd;
553                         }
554                 }
555         }
556         read_unlock(&obd_dev_lock);
557
558         return NULL;
559 }
560 EXPORT_SYMBOL(class_find_client_obd);
561
562 /* Iterate the obd_device list looking devices have grp_uuid. Start
563    searching at *next, and if a device is found, the next index to look
564    at is saved in *next. If next is NULL, then the first matching device
565    will always be returned. */
566 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
567 {
568         int i;
569
570         if (!next)
571                 i = 0;
572         else if (*next >= 0 && *next < class_devno_max())
573                 i = *next;
574         else
575                 return NULL;
576
577         read_lock(&obd_dev_lock);
578         for (; i < class_devno_max(); i++) {
579                 struct obd_device *obd = class_num2obd(i);
580
581                 if (!obd)
582                         continue;
583                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
584                         if (next)
585                                 *next = i+1;
586                         read_unlock(&obd_dev_lock);
587                         return obd;
588                 }
589         }
590         read_unlock(&obd_dev_lock);
591
592         return NULL;
593 }
594 EXPORT_SYMBOL(class_devices_in_group);
595
596 /**
597  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
598  * adjust sptlrpc settings accordingly.
599  */
600 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
601 {
602         struct obd_device  *obd;
603         const char       *type;
604         int              i, rc = 0, rc2;
605
606         LASSERT(namelen > 0);
607
608         read_lock(&obd_dev_lock);
609         for (i = 0; i < class_devno_max(); i++) {
610                 obd = class_num2obd(i);
611
612                 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
613                         continue;
614
615                 /* only notify mdc, osc, mdt, ost */
616                 type = obd->obd_type->typ_name;
617                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
618                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
619                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
620                     strcmp(type, LUSTRE_OST_NAME) != 0)
621                         continue;
622
623                 if (strncmp(obd->obd_name, fsname, namelen))
624                         continue;
625
626                 class_incref(obd, __func__, obd);
627                 read_unlock(&obd_dev_lock);
628                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
629                                          sizeof(KEY_SPTLRPC_CONF),
630                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
631                 rc = rc ? rc : rc2;
632                 class_decref(obd, __func__, obd);
633                 read_lock(&obd_dev_lock);
634         }
635         read_unlock(&obd_dev_lock);
636         return rc;
637 }
638 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
639
640 void obd_cleanup_caches(void)
641 {
642         kmem_cache_destroy(obd_device_cachep);
643         obd_device_cachep = NULL;
644         kmem_cache_destroy(obdo_cachep);
645         obdo_cachep = NULL;
646         kmem_cache_destroy(import_cachep);
647         import_cachep = NULL;
648         kmem_cache_destroy(capa_cachep);
649         capa_cachep = NULL;
650 }
651
652 int obd_init_caches(void)
653 {
654         LASSERT(!obd_device_cachep);
655         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
656                                                  sizeof(struct obd_device),
657                                                  0, 0, NULL);
658         if (!obd_device_cachep)
659                 goto out;
660
661         LASSERT(!obdo_cachep);
662         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
663                                            0, 0, NULL);
664         if (!obdo_cachep)
665                 goto out;
666
667         LASSERT(!import_cachep);
668         import_cachep = kmem_cache_create("ll_import_cache",
669                                              sizeof(struct obd_import),
670                                              0, 0, NULL);
671         if (!import_cachep)
672                 goto out;
673
674         LASSERT(!capa_cachep);
675         capa_cachep = kmem_cache_create("capa_cache",
676                                            sizeof(struct obd_capa), 0, 0, NULL);
677         if (!capa_cachep)
678                 goto out;
679
680         return 0;
681  out:
682         obd_cleanup_caches();
683         return -ENOMEM;
684
685 }
686
687 /* map connection to client */
688 struct obd_export *class_conn2export(struct lustre_handle *conn)
689 {
690         struct obd_export *export;
691
692         if (!conn) {
693                 CDEBUG(D_CACHE, "looking for null handle\n");
694                 return NULL;
695         }
696
697         if (conn->cookie == -1) {  /* this means assign a new connection */
698                 CDEBUG(D_CACHE, "want a new connection\n");
699                 return NULL;
700         }
701
702         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
703         export = class_handle2object(conn->cookie);
704         return export;
705 }
706 EXPORT_SYMBOL(class_conn2export);
707
708 struct obd_device *class_exp2obd(struct obd_export *exp)
709 {
710         if (exp)
711                 return exp->exp_obd;
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_exp2obd);
715
716 struct obd_device *class_conn2obd(struct lustre_handle *conn)
717 {
718         struct obd_export *export;
719
720         export = class_conn2export(conn);
721         if (export) {
722                 struct obd_device *obd = export->exp_obd;
723
724                 class_export_put(export);
725                 return obd;
726         }
727         return NULL;
728 }
729 EXPORT_SYMBOL(class_conn2obd);
730
731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
732 {
733         struct obd_device *obd = exp->exp_obd;
734
735         if (!obd)
736                 return NULL;
737         return obd->u.cli.cl_import;
738 }
739 EXPORT_SYMBOL(class_exp2cliimp);
740
741 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
742 {
743         struct obd_device *obd = class_conn2obd(conn);
744
745         if (!obd)
746                 return NULL;
747         return obd->u.cli.cl_import;
748 }
749 EXPORT_SYMBOL(class_conn2cliimp);
750
751 /* Export management functions */
752 static void class_export_destroy(struct obd_export *exp)
753 {
754         struct obd_device *obd = exp->exp_obd;
755
756         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
757         LASSERT(obd != NULL);
758
759         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
760                exp->exp_client_uuid.uuid, obd->obd_name);
761
762         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
763         if (exp->exp_connection)
764                 ptlrpc_put_connection_superhack(exp->exp_connection);
765
766         LASSERT(list_empty(&exp->exp_outstanding_replies));
767         LASSERT(list_empty(&exp->exp_uncommitted_replies));
768         LASSERT(list_empty(&exp->exp_req_replay_queue));
769         LASSERT(list_empty(&exp->exp_hp_rpcs));
770         obd_destroy_export(exp);
771         class_decref(obd, "export", exp);
772
773         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
774 }
775
776 static void export_handle_addref(void *export)
777 {
778         class_export_get(export);
779 }
780
781 static struct portals_handle_ops export_handle_ops = {
782         .hop_addref = export_handle_addref,
783         .hop_free   = NULL,
784 };
785
786 struct obd_export *class_export_get(struct obd_export *exp)
787 {
788         atomic_inc(&exp->exp_refcount);
789         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
790                atomic_read(&exp->exp_refcount));
791         return exp;
792 }
793 EXPORT_SYMBOL(class_export_get);
794
795 void class_export_put(struct obd_export *exp)
796 {
797         LASSERT(exp != NULL);
798         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
799         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
800                atomic_read(&exp->exp_refcount) - 1);
801
802         if (atomic_dec_and_test(&exp->exp_refcount)) {
803                 LASSERT(!list_empty(&exp->exp_obd_chain));
804                 CDEBUG(D_IOCTL, "final put %p/%s\n",
805                        exp, exp->exp_client_uuid.uuid);
806
807                 /* release nid stat refererence */
808                 lprocfs_exp_cleanup(exp);
809
810                 obd_zombie_export_add(exp);
811         }
812 }
813 EXPORT_SYMBOL(class_export_put);
814
815 /* Creates a new export, adds it to the hash table, and returns a
816  * pointer to it. The refcount is 2: one for the hash reference, and
817  * one for the pointer returned by this function. */
818 struct obd_export *class_new_export(struct obd_device *obd,
819                                     struct obd_uuid *cluuid)
820 {
821         struct obd_export *export;
822         struct cfs_hash *hash = NULL;
823         int rc = 0;
824
825         export = kzalloc(sizeof(*export), GFP_NOFS);
826         if (!export)
827                 return ERR_PTR(-ENOMEM);
828
829         export->exp_conn_cnt = 0;
830         export->exp_lock_hash = NULL;
831         export->exp_flock_hash = NULL;
832         atomic_set(&export->exp_refcount, 2);
833         atomic_set(&export->exp_rpc_count, 0);
834         atomic_set(&export->exp_cb_count, 0);
835         atomic_set(&export->exp_locks_count, 0);
836 #if LUSTRE_TRACKS_LOCK_EXP_REFS
837         INIT_LIST_HEAD(&export->exp_locks_list);
838         spin_lock_init(&export->exp_locks_list_guard);
839 #endif
840         atomic_set(&export->exp_replay_count, 0);
841         export->exp_obd = obd;
842         INIT_LIST_HEAD(&export->exp_outstanding_replies);
843         spin_lock_init(&export->exp_uncommitted_replies_lock);
844         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
845         INIT_LIST_HEAD(&export->exp_req_replay_queue);
846         INIT_LIST_HEAD(&export->exp_handle.h_link);
847         INIT_LIST_HEAD(&export->exp_hp_rpcs);
848         class_handle_hash(&export->exp_handle, &export_handle_ops);
849         export->exp_last_request_time = get_seconds();
850         spin_lock_init(&export->exp_lock);
851         spin_lock_init(&export->exp_rpc_lock);
852         INIT_HLIST_NODE(&export->exp_uuid_hash);
853         INIT_HLIST_NODE(&export->exp_nid_hash);
854         spin_lock_init(&export->exp_bl_list_lock);
855         INIT_LIST_HEAD(&export->exp_bl_list);
856
857         export->exp_sp_peer = LUSTRE_SP_ANY;
858         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
859         export->exp_client_uuid = *cluuid;
860         obd_init_export(export);
861
862         spin_lock(&obd->obd_dev_lock);
863         /* shouldn't happen, but might race */
864         if (obd->obd_stopping) {
865                 rc = -ENODEV;
866                 goto exit_unlock;
867         }
868
869         hash = cfs_hash_getref(obd->obd_uuid_hash);
870         if (!hash) {
871                 rc = -ENODEV;
872                 goto exit_unlock;
873         }
874         spin_unlock(&obd->obd_dev_lock);
875
876         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
877                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
878                 if (rc != 0) {
879                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
880                                       obd->obd_name, cluuid->uuid, rc);
881                         rc = -EALREADY;
882                         goto exit_err;
883                 }
884         }
885
886         spin_lock(&obd->obd_dev_lock);
887         if (obd->obd_stopping) {
888                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
889                 rc = -ENODEV;
890                 goto exit_unlock;
891         }
892
893         class_incref(obd, "export", export);
894         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
895         list_add_tail(&export->exp_obd_chain_timed,
896                           &export->exp_obd->obd_exports_timed);
897         export->exp_obd->obd_num_exports++;
898         spin_unlock(&obd->obd_dev_lock);
899         cfs_hash_putref(hash);
900         return export;
901
902 exit_unlock:
903         spin_unlock(&obd->obd_dev_lock);
904 exit_err:
905         if (hash)
906                 cfs_hash_putref(hash);
907         class_handle_unhash(&export->exp_handle);
908         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
909         obd_destroy_export(export);
910         kfree(export);
911         return ERR_PTR(rc);
912 }
913 EXPORT_SYMBOL(class_new_export);
914
915 void class_unlink_export(struct obd_export *exp)
916 {
917         class_handle_unhash(&exp->exp_handle);
918
919         spin_lock(&exp->exp_obd->obd_dev_lock);
920         /* delete an uuid-export hashitem from hashtables */
921         if (!hlist_unhashed(&exp->exp_uuid_hash))
922                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
923                              &exp->exp_client_uuid,
924                              &exp->exp_uuid_hash);
925
926         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
927         list_del_init(&exp->exp_obd_chain_timed);
928         exp->exp_obd->obd_num_exports--;
929         spin_unlock(&exp->exp_obd->obd_dev_lock);
930         class_export_put(exp);
931 }
932 EXPORT_SYMBOL(class_unlink_export);
933
934 /* Import management functions */
935 static void class_import_destroy(struct obd_import *imp)
936 {
937         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
938                 imp->imp_obd->obd_name);
939
940         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
941
942         ptlrpc_put_connection_superhack(imp->imp_connection);
943
944         while (!list_empty(&imp->imp_conn_list)) {
945                 struct obd_import_conn *imp_conn;
946
947                 imp_conn = list_entry(imp->imp_conn_list.next,
948                                           struct obd_import_conn, oic_item);
949                 list_del_init(&imp_conn->oic_item);
950                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
951                 kfree(imp_conn);
952         }
953
954         LASSERT(!imp->imp_sec);
955         class_decref(imp->imp_obd, "import", imp);
956         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
957 }
958
959 static void import_handle_addref(void *import)
960 {
961         class_import_get(import);
962 }
963
964 static struct portals_handle_ops import_handle_ops = {
965         .hop_addref = import_handle_addref,
966         .hop_free   = NULL,
967 };
968
969 struct obd_import *class_import_get(struct obd_import *import)
970 {
971         atomic_inc(&import->imp_refcount);
972         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
973                atomic_read(&import->imp_refcount),
974                import->imp_obd->obd_name);
975         return import;
976 }
977 EXPORT_SYMBOL(class_import_get);
978
979 void class_import_put(struct obd_import *imp)
980 {
981         LASSERT(list_empty(&imp->imp_zombie_chain));
982         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
983
984         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
985                atomic_read(&imp->imp_refcount) - 1,
986                imp->imp_obd->obd_name);
987
988         if (atomic_dec_and_test(&imp->imp_refcount)) {
989                 CDEBUG(D_INFO, "final put import %p\n", imp);
990                 obd_zombie_import_add(imp);
991         }
992
993         /* catch possible import put race */
994         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
995 }
996 EXPORT_SYMBOL(class_import_put);
997
998 static void init_imp_at(struct imp_at *at)
999 {
1000         int i;
1001
1002         at_init(&at->iat_net_latency, 0, 0);
1003         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1004                 /* max service estimates are tracked on the server side, so
1005                    don't use the AT history here, just use the last reported
1006                    val. (But keep hist for proc histogram, worst_ever) */
1007                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1008                         AT_FLG_NOHIST);
1009         }
1010 }
1011
1012 struct obd_import *class_new_import(struct obd_device *obd)
1013 {
1014         struct obd_import *imp;
1015
1016         imp = kzalloc(sizeof(*imp), GFP_NOFS);
1017         if (!imp)
1018                 return NULL;
1019
1020         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1021         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1022         INIT_LIST_HEAD(&imp->imp_replay_list);
1023         INIT_LIST_HEAD(&imp->imp_sending_list);
1024         INIT_LIST_HEAD(&imp->imp_delayed_list);
1025         INIT_LIST_HEAD(&imp->imp_committed_list);
1026         imp->imp_replay_cursor = &imp->imp_committed_list;
1027         spin_lock_init(&imp->imp_lock);
1028         imp->imp_last_success_conn = 0;
1029         imp->imp_state = LUSTRE_IMP_NEW;
1030         imp->imp_obd = class_incref(obd, "import", imp);
1031         mutex_init(&imp->imp_sec_mutex);
1032         init_waitqueue_head(&imp->imp_recovery_waitq);
1033
1034         atomic_set(&imp->imp_refcount, 2);
1035         atomic_set(&imp->imp_unregistering, 0);
1036         atomic_set(&imp->imp_inflight, 0);
1037         atomic_set(&imp->imp_replay_inflight, 0);
1038         atomic_set(&imp->imp_inval_count, 0);
1039         INIT_LIST_HEAD(&imp->imp_conn_list);
1040         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1041         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1042         init_imp_at(&imp->imp_at);
1043
1044         /* the default magic is V2, will be used in connect RPC, and
1045          * then adjusted according to the flags in request/reply. */
1046         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1047
1048         return imp;
1049 }
1050 EXPORT_SYMBOL(class_new_import);
1051
1052 void class_destroy_import(struct obd_import *import)
1053 {
1054         LASSERT(import != NULL);
1055         LASSERT(import != LP_POISON);
1056
1057         class_handle_unhash(&import->imp_handle);
1058
1059         spin_lock(&import->imp_lock);
1060         import->imp_generation++;
1061         spin_unlock(&import->imp_lock);
1062         class_import_put(import);
1063 }
1064 EXPORT_SYMBOL(class_destroy_import);
1065
1066 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1067
1068 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1069 {
1070         spin_lock(&exp->exp_locks_list_guard);
1071
1072         LASSERT(lock->l_exp_refs_nr >= 0);
1073
1074         if (lock->l_exp_refs_target != NULL &&
1075             lock->l_exp_refs_target != exp) {
1076                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1077                               exp, lock, lock->l_exp_refs_target);
1078         }
1079         if ((lock->l_exp_refs_nr++) == 0) {
1080                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1081                 lock->l_exp_refs_target = exp;
1082         }
1083         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1084                lock, exp, lock->l_exp_refs_nr);
1085         spin_unlock(&exp->exp_locks_list_guard);
1086 }
1087 EXPORT_SYMBOL(__class_export_add_lock_ref);
1088
1089 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1090 {
1091         spin_lock(&exp->exp_locks_list_guard);
1092         LASSERT(lock->l_exp_refs_nr > 0);
1093         if (lock->l_exp_refs_target != exp) {
1094                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1095                               lock, lock->l_exp_refs_target, exp);
1096         }
1097         if (-- lock->l_exp_refs_nr == 0) {
1098                 list_del_init(&lock->l_exp_refs_link);
1099                 lock->l_exp_refs_target = NULL;
1100         }
1101         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1102                lock, exp, lock->l_exp_refs_nr);
1103         spin_unlock(&exp->exp_locks_list_guard);
1104 }
1105 EXPORT_SYMBOL(__class_export_del_lock_ref);
1106 #endif
1107
1108 /* A connection defines an export context in which preallocation can
1109    be managed. This releases the export pointer reference, and returns
1110    the export handle, so the export refcount is 1 when this function
1111    returns. */
1112 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1113                   struct obd_uuid *cluuid)
1114 {
1115         struct obd_export *export;
1116
1117         LASSERT(conn != NULL);
1118         LASSERT(obd != NULL);
1119         LASSERT(cluuid != NULL);
1120
1121         export = class_new_export(obd, cluuid);
1122         if (IS_ERR(export))
1123                 return PTR_ERR(export);
1124
1125         conn->cookie = export->exp_handle.h_cookie;
1126         class_export_put(export);
1127
1128         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1129                cluuid->uuid, conn->cookie);
1130         return 0;
1131 }
1132 EXPORT_SYMBOL(class_connect);
1133
1134 /* if export is involved in recovery then clean up related things */
1135 static void class_export_recovery_cleanup(struct obd_export *exp)
1136 {
1137         struct obd_device *obd = exp->exp_obd;
1138
1139         spin_lock(&obd->obd_recovery_task_lock);
1140         if (exp->exp_delayed)
1141                 obd->obd_delayed_clients--;
1142         if (obd->obd_recovering) {
1143                 if (exp->exp_in_recovery) {
1144                         spin_lock(&exp->exp_lock);
1145                         exp->exp_in_recovery = 0;
1146                         spin_unlock(&exp->exp_lock);
1147                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1148                         atomic_dec(&obd->obd_connected_clients);
1149                 }
1150
1151                 /* if called during recovery then should update
1152                  * obd_stale_clients counter,
1153                  * lightweight exports are not counted */
1154                 if (exp->exp_failed &&
1155                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1156                         exp->exp_obd->obd_stale_clients++;
1157         }
1158         spin_unlock(&obd->obd_recovery_task_lock);
1159
1160         spin_lock(&exp->exp_lock);
1161         /** Cleanup req replay fields */
1162         if (exp->exp_req_replay_needed) {
1163                 exp->exp_req_replay_needed = 0;
1164
1165                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1166                 atomic_dec(&obd->obd_req_replay_clients);
1167         }
1168
1169         /** Cleanup lock replay data */
1170         if (exp->exp_lock_replay_needed) {
1171                 exp->exp_lock_replay_needed = 0;
1172
1173                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1174                 atomic_dec(&obd->obd_lock_replay_clients);
1175         }
1176         spin_unlock(&exp->exp_lock);
1177 }
1178
1179 /* This function removes 1-3 references from the export:
1180  * 1 - for export pointer passed
1181  * and if disconnect really need
1182  * 2 - removing from hash
1183  * 3 - in client_unlink_export
1184  * The export pointer passed to this function can destroyed */
1185 int class_disconnect(struct obd_export *export)
1186 {
1187         int already_disconnected;
1188
1189         if (!export) {
1190                 CWARN("attempting to free NULL export %p\n", export);
1191                 return -EINVAL;
1192         }
1193
1194         spin_lock(&export->exp_lock);
1195         already_disconnected = export->exp_disconnected;
1196         export->exp_disconnected = 1;
1197         spin_unlock(&export->exp_lock);
1198
1199         /* class_cleanup(), abort_recovery(), and class_fail_export()
1200          * all end up in here, and if any of them race we shouldn't
1201          * call extra class_export_puts(). */
1202         if (already_disconnected) {
1203                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1204                 goto no_disconn;
1205         }
1206
1207         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1208                export->exp_handle.h_cookie);
1209
1210         if (!hlist_unhashed(&export->exp_nid_hash))
1211                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1212                              &export->exp_connection->c_peer.nid,
1213                              &export->exp_nid_hash);
1214
1215         class_export_recovery_cleanup(export);
1216         class_unlink_export(export);
1217 no_disconn:
1218         class_export_put(export);
1219         return 0;
1220 }
1221 EXPORT_SYMBOL(class_disconnect);
1222
1223 /* Return non-zero for a fully connected export */
1224 int class_connected_export(struct obd_export *exp)
1225 {
1226         if (exp) {
1227                 int connected;
1228
1229                 spin_lock(&exp->exp_lock);
1230                 connected = exp->exp_conn_cnt > 0;
1231                 spin_unlock(&exp->exp_lock);
1232                 return connected;
1233         }
1234         return 0;
1235 }
1236 EXPORT_SYMBOL(class_connected_export);
1237
1238 static void class_disconnect_export_list(struct list_head *list,
1239                                          enum obd_option flags)
1240 {
1241         int rc;
1242         struct obd_export *exp;
1243
1244         /* It's possible that an export may disconnect itself, but
1245          * nothing else will be added to this list. */
1246         while (!list_empty(list)) {
1247                 exp = list_entry(list->next, struct obd_export,
1248                                      exp_obd_chain);
1249                 /* need for safe call CDEBUG after obd_disconnect */
1250                 class_export_get(exp);
1251
1252                 spin_lock(&exp->exp_lock);
1253                 exp->exp_flags = flags;
1254                 spin_unlock(&exp->exp_lock);
1255
1256                 if (obd_uuid_equals(&exp->exp_client_uuid,
1257                                     &exp->exp_obd->obd_uuid)) {
1258                         CDEBUG(D_HA,
1259                                "exp %p export uuid == obd uuid, don't discon\n",
1260                                exp);
1261                         /* Need to delete this now so we don't end up pointing
1262                          * to work_list later when this export is cleaned up. */
1263                         list_del_init(&exp->exp_obd_chain);
1264                         class_export_put(exp);
1265                         continue;
1266                 }
1267
1268                 class_export_get(exp);
1269                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1270                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1271                        exp, exp->exp_last_request_time);
1272                 /* release one export reference anyway */
1273                 rc = obd_disconnect(exp);
1274
1275                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1276                        obd_export_nid2str(exp), exp, rc);
1277                 class_export_put(exp);
1278         }
1279 }
1280
1281 void class_disconnect_exports(struct obd_device *obd)
1282 {
1283         struct list_head work_list;
1284
1285         /* Move all of the exports from obd_exports to a work list, en masse. */
1286         INIT_LIST_HEAD(&work_list);
1287         spin_lock(&obd->obd_dev_lock);
1288         list_splice_init(&obd->obd_exports, &work_list);
1289         list_splice_init(&obd->obd_delayed_exports, &work_list);
1290         spin_unlock(&obd->obd_dev_lock);
1291
1292         if (!list_empty(&work_list)) {
1293                 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1294                        obd->obd_minor, obd);
1295                 class_disconnect_export_list(&work_list,
1296                                              exp_flags_from_obd(obd));
1297         } else
1298                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1299                        obd->obd_minor, obd);
1300 }
1301 EXPORT_SYMBOL(class_disconnect_exports);
1302
1303 /* Remove exports that have not completed recovery.
1304  */
1305 void class_disconnect_stale_exports(struct obd_device *obd,
1306                                     int (*test_export)(struct obd_export *))
1307 {
1308         struct list_head work_list;
1309         struct obd_export *exp, *n;
1310         int evicted = 0;
1311
1312         INIT_LIST_HEAD(&work_list);
1313         spin_lock(&obd->obd_dev_lock);
1314         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1315                                      exp_obd_chain) {
1316                 /* don't count self-export as client */
1317                 if (obd_uuid_equals(&exp->exp_client_uuid,
1318                                     &exp->exp_obd->obd_uuid))
1319                         continue;
1320
1321                 /* don't evict clients which have no slot in last_rcvd
1322                  * (e.g. lightweight connection) */
1323                 if (exp->exp_target_data.ted_lr_idx == -1)
1324                         continue;
1325
1326                 spin_lock(&exp->exp_lock);
1327                 if (exp->exp_failed || test_export(exp)) {
1328                         spin_unlock(&exp->exp_lock);
1329                         continue;
1330                 }
1331                 exp->exp_failed = 1;
1332                 spin_unlock(&exp->exp_lock);
1333
1334                 list_move(&exp->exp_obd_chain, &work_list);
1335                 evicted++;
1336                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1337                        obd->obd_name, exp->exp_client_uuid.uuid,
1338                        !exp->exp_connection ? "<unknown>" :
1339                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1340                 print_export_data(exp, "EVICTING", 0);
1341         }
1342         spin_unlock(&obd->obd_dev_lock);
1343
1344         if (evicted)
1345                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1346                               obd->obd_name, evicted);
1347
1348         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1349                                                  OBD_OPT_ABORT_RECOV);
1350 }
1351 EXPORT_SYMBOL(class_disconnect_stale_exports);
1352
1353 void class_fail_export(struct obd_export *exp)
1354 {
1355         int rc, already_failed;
1356
1357         spin_lock(&exp->exp_lock);
1358         already_failed = exp->exp_failed;
1359         exp->exp_failed = 1;
1360         spin_unlock(&exp->exp_lock);
1361
1362         if (already_failed) {
1363                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1364                        exp, exp->exp_client_uuid.uuid);
1365                 return;
1366         }
1367
1368         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1369                exp, exp->exp_client_uuid.uuid);
1370
1371         if (obd_dump_on_timeout)
1372                 libcfs_debug_dumplog();
1373
1374         /* need for safe call CDEBUG after obd_disconnect */
1375         class_export_get(exp);
1376
1377         /* Most callers into obd_disconnect are removing their own reference
1378          * (request, for example) in addition to the one from the hash table.
1379          * We don't have such a reference here, so make one. */
1380         class_export_get(exp);
1381         rc = obd_disconnect(exp);
1382         if (rc)
1383                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1384         else
1385                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1386                        exp, exp->exp_client_uuid.uuid);
1387         class_export_put(exp);
1388 }
1389 EXPORT_SYMBOL(class_fail_export);
1390
1391 char *obd_export_nid2str(struct obd_export *exp)
1392 {
1393         if (exp->exp_connection != NULL)
1394                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1395
1396         return "(no nid)";
1397 }
1398 EXPORT_SYMBOL(obd_export_nid2str);
1399
1400 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1401 {
1402         struct cfs_hash *nid_hash;
1403         struct obd_export *doomed_exp = NULL;
1404         int exports_evicted = 0;
1405
1406         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1407
1408         spin_lock(&obd->obd_dev_lock);
1409         /* umount has run already, so evict thread should leave
1410          * its task to umount thread now */
1411         if (obd->obd_stopping) {
1412                 spin_unlock(&obd->obd_dev_lock);
1413                 return exports_evicted;
1414         }
1415         nid_hash = obd->obd_nid_hash;
1416         cfs_hash_getref(nid_hash);
1417         spin_unlock(&obd->obd_dev_lock);
1418
1419         do {
1420                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1421                 if (!doomed_exp)
1422                         break;
1423
1424                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1425                          "nid %s found, wanted nid %s, requested nid %s\n",
1426                          obd_export_nid2str(doomed_exp),
1427                          libcfs_nid2str(nid_key), nid);
1428                 LASSERTF(doomed_exp != obd->obd_self_export,
1429                          "self-export is hashed by NID?\n");
1430                 exports_evicted++;
1431                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1432                               obd->obd_name,
1433                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1434                               obd_export_nid2str(doomed_exp));
1435                 class_fail_export(doomed_exp);
1436                 class_export_put(doomed_exp);
1437         } while (1);
1438
1439         cfs_hash_putref(nid_hash);
1440
1441         if (!exports_evicted)
1442                 CDEBUG(D_HA,
1443                        "%s: can't disconnect NID '%s': no exports found\n",
1444                        obd->obd_name, nid);
1445         return exports_evicted;
1446 }
1447 EXPORT_SYMBOL(obd_export_evict_by_nid);
1448
1449 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1450 {
1451         struct cfs_hash *uuid_hash;
1452         struct obd_export *doomed_exp = NULL;
1453         struct obd_uuid doomed_uuid;
1454         int exports_evicted = 0;
1455
1456         spin_lock(&obd->obd_dev_lock);
1457         if (obd->obd_stopping) {
1458                 spin_unlock(&obd->obd_dev_lock);
1459                 return exports_evicted;
1460         }
1461         uuid_hash = obd->obd_uuid_hash;
1462         cfs_hash_getref(uuid_hash);
1463         spin_unlock(&obd->obd_dev_lock);
1464
1465         obd_str2uuid(&doomed_uuid, uuid);
1466         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1467                 CERROR("%s: can't evict myself\n", obd->obd_name);
1468                 cfs_hash_putref(uuid_hash);
1469                 return exports_evicted;
1470         }
1471
1472         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1473
1474         if (!doomed_exp) {
1475                 CERROR("%s: can't disconnect %s: no exports found\n",
1476                        obd->obd_name, uuid);
1477         } else {
1478                 CWARN("%s: evicting %s at administrative request\n",
1479                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1480                 class_fail_export(doomed_exp);
1481                 class_export_put(doomed_exp);
1482                 exports_evicted++;
1483         }
1484         cfs_hash_putref(uuid_hash);
1485
1486         return exports_evicted;
1487 }
1488 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1489
1490 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1491 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1492 EXPORT_SYMBOL(class_export_dump_hook);
1493 #endif
1494
1495 static void print_export_data(struct obd_export *exp, const char *status,
1496                               int locks)
1497 {
1498         struct ptlrpc_reply_state *rs;
1499         struct ptlrpc_reply_state *first_reply = NULL;
1500         int nreplies = 0;
1501
1502         spin_lock(&exp->exp_lock);
1503         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1504                                 rs_exp_list) {
1505                 if (nreplies == 0)
1506                         first_reply = rs;
1507                 nreplies++;
1508         }
1509         spin_unlock(&exp->exp_lock);
1510
1511         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1512                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1513                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1514                atomic_read(&exp->exp_rpc_count),
1515                atomic_read(&exp->exp_cb_count),
1516                atomic_read(&exp->exp_locks_count),
1517                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1518                nreplies, first_reply, nreplies > 3 ? "..." : "",
1519                exp->exp_last_committed);
1520 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1521         if (locks && class_export_dump_hook != NULL)
1522                 class_export_dump_hook(exp);
1523 #endif
1524 }
1525
1526 void dump_exports(struct obd_device *obd, int locks)
1527 {
1528         struct obd_export *exp;
1529
1530         spin_lock(&obd->obd_dev_lock);
1531         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1532                 print_export_data(exp, "ACTIVE", locks);
1533         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1534                 print_export_data(exp, "UNLINKED", locks);
1535         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1536                 print_export_data(exp, "DELAYED", locks);
1537         spin_unlock(&obd->obd_dev_lock);
1538         spin_lock(&obd_zombie_impexp_lock);
1539         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1540                 print_export_data(exp, "ZOMBIE", locks);
1541         spin_unlock(&obd_zombie_impexp_lock);
1542 }
1543 EXPORT_SYMBOL(dump_exports);
1544
1545 void obd_exports_barrier(struct obd_device *obd)
1546 {
1547         int waited = 2;
1548
1549         LASSERT(list_empty(&obd->obd_exports));
1550         spin_lock(&obd->obd_dev_lock);
1551         while (!list_empty(&obd->obd_unlinked_exports)) {
1552                 spin_unlock(&obd->obd_dev_lock);
1553                 set_current_state(TASK_UNINTERRUPTIBLE);
1554                 schedule_timeout(cfs_time_seconds(waited));
1555                 if (waited > 5 && IS_PO2(waited)) {
1556                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1557                                       obd->obd_name, waited,
1558                                       atomic_read(&obd->obd_refcount));
1559                         dump_exports(obd, 1);
1560                 }
1561                 waited *= 2;
1562                 spin_lock(&obd->obd_dev_lock);
1563         }
1564         spin_unlock(&obd->obd_dev_lock);
1565 }
1566 EXPORT_SYMBOL(obd_exports_barrier);
1567
1568 /* Total amount of zombies to be destroyed */
1569 static int zombies_count;
1570
1571 /**
1572  * kill zombie imports and exports
1573  */
1574 void obd_zombie_impexp_cull(void)
1575 {
1576         struct obd_import *import;
1577         struct obd_export *export;
1578
1579         do {
1580                 spin_lock(&obd_zombie_impexp_lock);
1581
1582                 import = NULL;
1583                 if (!list_empty(&obd_zombie_imports)) {
1584                         import = list_entry(obd_zombie_imports.next,
1585                                                 struct obd_import,
1586                                                 imp_zombie_chain);
1587                         list_del_init(&import->imp_zombie_chain);
1588                 }
1589
1590                 export = NULL;
1591                 if (!list_empty(&obd_zombie_exports)) {
1592                         export = list_entry(obd_zombie_exports.next,
1593                                                 struct obd_export,
1594                                                 exp_obd_chain);
1595                         list_del_init(&export->exp_obd_chain);
1596                 }
1597
1598                 spin_unlock(&obd_zombie_impexp_lock);
1599
1600                 if (import != NULL) {
1601                         class_import_destroy(import);
1602                         spin_lock(&obd_zombie_impexp_lock);
1603                         zombies_count--;
1604                         spin_unlock(&obd_zombie_impexp_lock);
1605                 }
1606
1607                 if (export != NULL) {
1608                         class_export_destroy(export);
1609                         spin_lock(&obd_zombie_impexp_lock);
1610                         zombies_count--;
1611                         spin_unlock(&obd_zombie_impexp_lock);
1612                 }
1613
1614                 cond_resched();
1615         } while (import != NULL || export != NULL);
1616 }
1617
1618 static struct completion        obd_zombie_start;
1619 static struct completion        obd_zombie_stop;
1620 static unsigned long            obd_zombie_flags;
1621 static wait_queue_head_t                obd_zombie_waitq;
1622 static pid_t                    obd_zombie_pid;
1623
1624 enum {
1625         OBD_ZOMBIE_STOP         = 0x0001,
1626 };
1627
1628 /**
1629  * check for work for kill zombie import/export thread.
1630  */
1631 static int obd_zombie_impexp_check(void *arg)
1632 {
1633         int rc;
1634
1635         spin_lock(&obd_zombie_impexp_lock);
1636         rc = (zombies_count == 0) &&
1637              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1638         spin_unlock(&obd_zombie_impexp_lock);
1639
1640         return rc;
1641 }
1642
1643 /**
1644  * Add export to the obd_zombie thread and notify it.
1645  */
1646 static void obd_zombie_export_add(struct obd_export *exp)
1647 {
1648         spin_lock(&exp->exp_obd->obd_dev_lock);
1649         LASSERT(!list_empty(&exp->exp_obd_chain));
1650         list_del_init(&exp->exp_obd_chain);
1651         spin_unlock(&exp->exp_obd->obd_dev_lock);
1652         spin_lock(&obd_zombie_impexp_lock);
1653         zombies_count++;
1654         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1655         spin_unlock(&obd_zombie_impexp_lock);
1656
1657         obd_zombie_impexp_notify();
1658 }
1659
1660 /**
1661  * Add import to the obd_zombie thread and notify it.
1662  */
1663 static void obd_zombie_import_add(struct obd_import *imp)
1664 {
1665         LASSERT(!imp->imp_sec);
1666         LASSERT(!imp->imp_rq_pool);
1667         spin_lock(&obd_zombie_impexp_lock);
1668         LASSERT(list_empty(&imp->imp_zombie_chain));
1669         zombies_count++;
1670         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1671         spin_unlock(&obd_zombie_impexp_lock);
1672
1673         obd_zombie_impexp_notify();
1674 }
1675
1676 /**
1677  * notify import/export destroy thread about new zombie.
1678  */
1679 static void obd_zombie_impexp_notify(void)
1680 {
1681         /*
1682          * Make sure obd_zombie_impexp_thread get this notification.
1683          * It is possible this signal only get by obd_zombie_barrier, and
1684          * barrier gulps this notification and sleeps away and hangs ensues
1685          */
1686         wake_up_all(&obd_zombie_waitq);
1687 }
1688
1689 /**
1690  * check whether obd_zombie is idle
1691  */
1692 static int obd_zombie_is_idle(void)
1693 {
1694         int rc;
1695
1696         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1697         spin_lock(&obd_zombie_impexp_lock);
1698         rc = (zombies_count == 0);
1699         spin_unlock(&obd_zombie_impexp_lock);
1700         return rc;
1701 }
1702
1703 /**
1704  * wait when obd_zombie import/export queues become empty
1705  */
1706 void obd_zombie_barrier(void)
1707 {
1708         struct l_wait_info lwi = { 0 };
1709
1710         if (obd_zombie_pid == current_pid())
1711                 /* don't wait for myself */
1712                 return;
1713         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1714 }
1715 EXPORT_SYMBOL(obd_zombie_barrier);
1716
1717
1718 /**
1719  * destroy zombie export/import thread.
1720  */
1721 static int obd_zombie_impexp_thread(void *unused)
1722 {
1723         unshare_fs_struct();
1724         complete(&obd_zombie_start);
1725
1726         obd_zombie_pid = current_pid();
1727
1728         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1729                 struct l_wait_info lwi = { 0 };
1730
1731                 l_wait_event(obd_zombie_waitq,
1732                              !obd_zombie_impexp_check(NULL), &lwi);
1733                 obd_zombie_impexp_cull();
1734
1735                 /*
1736                  * Notify obd_zombie_barrier callers that queues
1737                  * may be empty.
1738                  */
1739                 wake_up(&obd_zombie_waitq);
1740         }
1741
1742         complete(&obd_zombie_stop);
1743
1744         return 0;
1745 }
1746
1747
1748 /**
1749  * start destroy zombie import/export thread
1750  */
1751 int obd_zombie_impexp_init(void)
1752 {
1753         struct task_struct *task;
1754
1755         INIT_LIST_HEAD(&obd_zombie_imports);
1756         INIT_LIST_HEAD(&obd_zombie_exports);
1757         spin_lock_init(&obd_zombie_impexp_lock);
1758         init_completion(&obd_zombie_start);
1759         init_completion(&obd_zombie_stop);
1760         init_waitqueue_head(&obd_zombie_waitq);
1761         obd_zombie_pid = 0;
1762
1763         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1764         if (IS_ERR(task))
1765                 return PTR_ERR(task);
1766
1767         wait_for_completion(&obd_zombie_start);
1768         return 0;
1769 }
1770 /**
1771  * stop destroy zombie import/export thread
1772  */
1773 void obd_zombie_impexp_stop(void)
1774 {
1775         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1776         obd_zombie_impexp_notify();
1777         wait_for_completion(&obd_zombie_stop);
1778 }
1779
1780 /***** Kernel-userspace comm helpers *******/
1781
1782 /* Get length of entire message, including header */
1783 int kuc_len(int payload_len)
1784 {
1785         return sizeof(struct kuc_hdr) + payload_len;
1786 }
1787 EXPORT_SYMBOL(kuc_len);
1788
1789 /* Get a pointer to kuc header, given a ptr to the payload
1790  * @param p Pointer to payload area
1791  * @returns Pointer to kuc header
1792  */
1793 struct kuc_hdr *kuc_ptr(void *p)
1794 {
1795         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1796
1797         LASSERT(lh->kuc_magic == KUC_MAGIC);
1798         return lh;
1799 }
1800 EXPORT_SYMBOL(kuc_ptr);
1801
1802 /* Test if payload is part of kuc message
1803  * @param p Pointer to payload area
1804  * @returns boolean
1805  */
1806 int kuc_ispayload(void *p)
1807 {
1808         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1809
1810         if (kh->kuc_magic == KUC_MAGIC)
1811                 return 1;
1812         else
1813                 return 0;
1814 }
1815 EXPORT_SYMBOL(kuc_ispayload);
1816
1817 /* Alloc space for a message, and fill in header
1818  * @return Pointer to payload area
1819  */
1820 void *kuc_alloc(int payload_len, int transport, int type)
1821 {
1822         struct kuc_hdr *lh;
1823         int len = kuc_len(payload_len);
1824
1825         lh = kzalloc(len, GFP_NOFS);
1826         if (!lh)
1827                 return ERR_PTR(-ENOMEM);
1828
1829         lh->kuc_magic = KUC_MAGIC;
1830         lh->kuc_transport = transport;
1831         lh->kuc_msgtype = type;
1832         lh->kuc_msglen = len;
1833
1834         return (void *)(lh + 1);
1835 }
1836 EXPORT_SYMBOL(kuc_alloc);
1837
1838 /* Takes pointer to payload area */
1839 inline void kuc_free(void *p, int payload_len)
1840 {
1841         struct kuc_hdr *lh = kuc_ptr(p);
1842
1843         kfree(lh);
1844 }
1845 EXPORT_SYMBOL(kuc_free);