cpuidle: Check the result of cpuidle_get_driver() against NULL
[linux-block.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_ost.h>
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
46
47 extern struct list_head obd_types;
48 spinlock_t obd_types_lock;
49
50 struct kmem_cache *obd_device_cachep;
51 struct kmem_cache *obdo_cachep;
52 EXPORT_SYMBOL(obdo_cachep);
53 struct kmem_cache *import_cachep;
54
55 struct list_head      obd_zombie_imports;
56 struct list_head      obd_zombie_exports;
57 spinlock_t  obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62                               const char *status, int locks);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66
67 /*
68  * support functions: we could use inter-module communication, but this
69  * is more portable to other OS's
70  */
71 static struct obd_device *obd_device_alloc(void)
72 {
73         struct obd_device *obd;
74
75         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
76         if (obd != NULL) {
77                 obd->obd_magic = OBD_DEVICE_MAGIC;
78         }
79         return obd;
80 }
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112 EXPORT_SYMBOL(class_search_type);
113
114 struct obd_type *class_get_type(const char *name)
115 {
116         struct obd_type *type = class_search_type(name);
117
118         if (!type) {
119                 const char *modname = name;
120
121                 if (strcmp(modname, "obdfilter") == 0)
122                         modname = "ofd";
123
124                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125                         modname = LUSTRE_OSP_NAME;
126
127                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128                         modname = LUSTRE_MDT_NAME;
129
130                 if (!request_module("%s", modname)) {
131                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132                         type = class_search_type(name);
133                 } else {
134                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135                                            modname);
136                 }
137         }
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         struct lprocfs_vars *vars, const char *name,
162                         struct lu_device_type *ldt)
163 {
164         struct obd_type *type;
165         int rc = 0;
166         ENTRY;
167
168         /* sanity check */
169         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170
171         if (class_search_type(name)) {
172                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
173                 RETURN(-EEXIST);
174         }
175
176         rc = -ENOMEM;
177         OBD_ALLOC(type, sizeof(*type));
178         if (type == NULL)
179                 RETURN(rc);
180
181         OBD_ALLOC_PTR(type->typ_dt_ops);
182         OBD_ALLOC_PTR(type->typ_md_ops);
183         OBD_ALLOC(type->typ_name, strlen(name) + 1);
184
185         if (type->typ_dt_ops == NULL ||
186             type->typ_md_ops == NULL ||
187             type->typ_name == NULL)
188                 GOTO (failed, rc);
189
190         *(type->typ_dt_ops) = *dt_ops;
191         /* md_ops is optional */
192         if (md_ops)
193                 *(type->typ_md_ops) = *md_ops;
194         strcpy(type->typ_name, name);
195         spin_lock_init(&type->obd_type_lock);
196
197 #ifdef LPROCFS
198         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
199                                               vars, type);
200         if (IS_ERR(type->typ_procroot)) {
201                 rc = PTR_ERR(type->typ_procroot);
202                 type->typ_procroot = NULL;
203                 GOTO (failed, rc);
204         }
205 #endif
206         if (ldt != NULL) {
207                 type->typ_lu = ldt;
208                 rc = lu_device_type_init(ldt);
209                 if (rc != 0)
210                         GOTO (failed, rc);
211         }
212
213         spin_lock(&obd_types_lock);
214         list_add(&type->typ_chain, &obd_types);
215         spin_unlock(&obd_types_lock);
216
217         RETURN (0);
218
219  failed:
220         if (type->typ_name != NULL)
221                 OBD_FREE(type->typ_name, strlen(name) + 1);
222         if (type->typ_md_ops != NULL)
223                 OBD_FREE_PTR(type->typ_md_ops);
224         if (type->typ_dt_ops != NULL)
225                 OBD_FREE_PTR(type->typ_dt_ops);
226         OBD_FREE(type, sizeof(*type));
227         RETURN(rc);
228 }
229 EXPORT_SYMBOL(class_register_type);
230
231 int class_unregister_type(const char *name)
232 {
233         struct obd_type *type = class_search_type(name);
234         ENTRY;
235
236         if (!type) {
237                 CERROR("unknown obd type\n");
238                 RETURN(-EINVAL);
239         }
240
241         if (type->typ_refcnt) {
242                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
243                 /* This is a bad situation, let's make the best of it */
244                 /* Remove ops, but leave the name for debugging */
245                 OBD_FREE_PTR(type->typ_dt_ops);
246                 OBD_FREE_PTR(type->typ_md_ops);
247                 RETURN(-EBUSY);
248         }
249
250         if (type->typ_procroot) {
251                 lprocfs_remove(&type->typ_procroot);
252         }
253
254         if (type->typ_lu)
255                 lu_device_type_fini(type->typ_lu);
256
257         spin_lock(&obd_types_lock);
258         list_del(&type->typ_chain);
259         spin_unlock(&obd_types_lock);
260         OBD_FREE(type->typ_name, strlen(name) + 1);
261         if (type->typ_dt_ops != NULL)
262                 OBD_FREE_PTR(type->typ_dt_ops);
263         if (type->typ_md_ops != NULL)
264                 OBD_FREE_PTR(type->typ_md_ops);
265         OBD_FREE(type, sizeof(*type));
266         RETURN(0);
267 } /* class_unregister_type */
268 EXPORT_SYMBOL(class_unregister_type);
269
270 /**
271  * Create a new obd device.
272  *
273  * Find an empty slot in ::obd_devs[], create a new obd device in it.
274  *
275  * \param[in] type_name obd device type string.
276  * \param[in] name      obd device name.
277  *
278  * \retval NULL if create fails, otherwise return the obd device
279  *       pointer created.
280  */
281 struct obd_device *class_newdev(const char *type_name, const char *name)
282 {
283         struct obd_device *result = NULL;
284         struct obd_device *newdev;
285         struct obd_type *type = NULL;
286         int i;
287         int new_obd_minor = 0;
288         ENTRY;
289
290         if (strlen(name) >= MAX_OBD_NAME) {
291                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
292                 RETURN(ERR_PTR(-EINVAL));
293         }
294
295         type = class_get_type(type_name);
296         if (type == NULL){
297                 CERROR("OBD: unknown type: %s\n", type_name);
298                 RETURN(ERR_PTR(-ENODEV));
299         }
300
301         newdev = obd_device_alloc();
302         if (newdev == NULL)
303                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
304
305         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
306
307         write_lock(&obd_dev_lock);
308         for (i = 0; i < class_devno_max(); i++) {
309                 struct obd_device *obd = class_num2obd(i);
310
311                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
312                         CERROR("Device %s already exists at %d, won't add\n",
313                                name, i);
314                         if (result) {
315                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
316                                          "%p obd_magic %08x != %08x\n", result,
317                                          result->obd_magic, OBD_DEVICE_MAGIC);
318                                 LASSERTF(result->obd_minor == new_obd_minor,
319                                          "%p obd_minor %d != %d\n", result,
320                                          result->obd_minor, new_obd_minor);
321
322                                 obd_devs[result->obd_minor] = NULL;
323                                 result->obd_name[0]='\0';
324                          }
325                         result = ERR_PTR(-EEXIST);
326                         break;
327                 }
328                 if (!result && !obd) {
329                         result = newdev;
330                         result->obd_minor = i;
331                         new_obd_minor = i;
332                         result->obd_type = type;
333                         strncpy(result->obd_name, name,
334                                 sizeof(result->obd_name) - 1);
335                         obd_devs[i] = result;
336                 }
337         }
338         write_unlock(&obd_dev_lock);
339
340         if (result == NULL && i >= class_devno_max()) {
341                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
342                        class_devno_max());
343                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
344         }
345
346         if (IS_ERR(result))
347                 GOTO(out, result);
348
349         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
350                result->obd_name, result);
351
352         RETURN(result);
353 out:
354         obd_device_free(newdev);
355 out_type:
356         class_put_type(type);
357         return result;
358 }
359
360 void class_release_dev(struct obd_device *obd)
361 {
362         struct obd_type *obd_type = obd->obd_type;
363
364         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
365                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
366         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
367                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
368         LASSERT(obd_type != NULL);
369
370         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
371                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
372
373         write_lock(&obd_dev_lock);
374         obd_devs[obd->obd_minor] = NULL;
375         write_unlock(&obd_dev_lock);
376         obd_device_free(obd);
377
378         class_put_type(obd_type);
379 }
380
381 int class_name2dev(const char *name)
382 {
383         int i;
384
385         if (!name)
386                 return -1;
387
388         read_lock(&obd_dev_lock);
389         for (i = 0; i < class_devno_max(); i++) {
390                 struct obd_device *obd = class_num2obd(i);
391
392                 if (obd && strcmp(name, obd->obd_name) == 0) {
393                         /* Make sure we finished attaching before we give
394                            out any references */
395                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
396                         if (obd->obd_attached) {
397                                 read_unlock(&obd_dev_lock);
398                                 return i;
399                         }
400                         break;
401                 }
402         }
403         read_unlock(&obd_dev_lock);
404
405         return -1;
406 }
407 EXPORT_SYMBOL(class_name2dev);
408
409 struct obd_device *class_name2obd(const char *name)
410 {
411         int dev = class_name2dev(name);
412
413         if (dev < 0 || dev > class_devno_max())
414                 return NULL;
415         return class_num2obd(dev);
416 }
417 EXPORT_SYMBOL(class_name2obd);
418
419 int class_uuid2dev(struct obd_uuid *uuid)
420 {
421         int i;
422
423         read_lock(&obd_dev_lock);
424         for (i = 0; i < class_devno_max(); i++) {
425                 struct obd_device *obd = class_num2obd(i);
426
427                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
428                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
429                         read_unlock(&obd_dev_lock);
430                         return i;
431                 }
432         }
433         read_unlock(&obd_dev_lock);
434
435         return -1;
436 }
437 EXPORT_SYMBOL(class_uuid2dev);
438
439 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
440 {
441         int dev = class_uuid2dev(uuid);
442         if (dev < 0)
443                 return NULL;
444         return class_num2obd(dev);
445 }
446 EXPORT_SYMBOL(class_uuid2obd);
447
448 /**
449  * Get obd device from ::obd_devs[]
450  *
451  * \param num [in] array index
452  *
453  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
454  *       otherwise return the obd device there.
455  */
456 struct obd_device *class_num2obd(int num)
457 {
458         struct obd_device *obd = NULL;
459
460         if (num < class_devno_max()) {
461                 obd = obd_devs[num];
462                 if (obd == NULL)
463                         return NULL;
464
465                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
466                          "%p obd_magic %08x != %08x\n",
467                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
468                 LASSERTF(obd->obd_minor == num,
469                          "%p obd_minor %0d != %0d\n",
470                          obd, obd->obd_minor, num);
471         }
472
473         return obd;
474 }
475 EXPORT_SYMBOL(class_num2obd);
476
477 /**
478  * Get obd devices count. Device in any
479  *    state are counted
480  * \retval obd device count
481  */
482 int get_devices_count(void)
483 {
484         int index, max_index = class_devno_max(), dev_count = 0;
485
486         read_lock(&obd_dev_lock);
487         for (index = 0; index <= max_index; index++) {
488                 struct obd_device *obd = class_num2obd(index);
489                 if (obd != NULL)
490                         dev_count++;
491         }
492         read_unlock(&obd_dev_lock);
493
494         return dev_count;
495 }
496 EXPORT_SYMBOL(get_devices_count);
497
498 void class_obd_list(void)
499 {
500         char *status;
501         int i;
502
503         read_lock(&obd_dev_lock);
504         for (i = 0; i < class_devno_max(); i++) {
505                 struct obd_device *obd = class_num2obd(i);
506
507                 if (obd == NULL)
508                         continue;
509                 if (obd->obd_stopping)
510                         status = "ST";
511                 else if (obd->obd_set_up)
512                         status = "UP";
513                 else if (obd->obd_attached)
514                         status = "AT";
515                 else
516                         status = "--";
517                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
518                          i, status, obd->obd_type->typ_name,
519                          obd->obd_name, obd->obd_uuid.uuid,
520                          atomic_read(&obd->obd_refcount));
521         }
522         read_unlock(&obd_dev_lock);
523         return;
524 }
525
526 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
527    specified, then only the client with that uuid is returned,
528    otherwise any client connected to the tgt is returned. */
529 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
530                                           const char * typ_name,
531                                           struct obd_uuid *grp_uuid)
532 {
533         int i;
534
535         read_lock(&obd_dev_lock);
536         for (i = 0; i < class_devno_max(); i++) {
537                 struct obd_device *obd = class_num2obd(i);
538
539                 if (obd == NULL)
540                         continue;
541                 if ((strncmp(obd->obd_type->typ_name, typ_name,
542                              strlen(typ_name)) == 0)) {
543                         if (obd_uuid_equals(tgt_uuid,
544                                             &obd->u.cli.cl_target_uuid) &&
545                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
546                                                          &obd->obd_uuid) : 1)) {
547                                 read_unlock(&obd_dev_lock);
548                                 return obd;
549                         }
550                 }
551         }
552         read_unlock(&obd_dev_lock);
553
554         return NULL;
555 }
556 EXPORT_SYMBOL(class_find_client_obd);
557
558 /* Iterate the obd_device list looking devices have grp_uuid. Start
559    searching at *next, and if a device is found, the next index to look
560    at is saved in *next. If next is NULL, then the first matching device
561    will always be returned. */
562 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
563 {
564         int i;
565
566         if (next == NULL)
567                 i = 0;
568         else if (*next >= 0 && *next < class_devno_max())
569                 i = *next;
570         else
571                 return NULL;
572
573         read_lock(&obd_dev_lock);
574         for (; i < class_devno_max(); i++) {
575                 struct obd_device *obd = class_num2obd(i);
576
577                 if (obd == NULL)
578                         continue;
579                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
580                         if (next != NULL)
581                                 *next = i+1;
582                         read_unlock(&obd_dev_lock);
583                         return obd;
584                 }
585         }
586         read_unlock(&obd_dev_lock);
587
588         return NULL;
589 }
590 EXPORT_SYMBOL(class_devices_in_group);
591
592 /**
593  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
594  * adjust sptlrpc settings accordingly.
595  */
596 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
597 {
598         struct obd_device  *obd;
599         const char       *type;
600         int              i, rc = 0, rc2;
601
602         LASSERT(namelen > 0);
603
604         read_lock(&obd_dev_lock);
605         for (i = 0; i < class_devno_max(); i++) {
606                 obd = class_num2obd(i);
607
608                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
609                         continue;
610
611                 /* only notify mdc, osc, mdt, ost */
612                 type = obd->obd_type->typ_name;
613                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
614                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
615                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
616                     strcmp(type, LUSTRE_OST_NAME) != 0)
617                         continue;
618
619                 if (strncmp(obd->obd_name, fsname, namelen))
620                         continue;
621
622                 class_incref(obd, __FUNCTION__, obd);
623                 read_unlock(&obd_dev_lock);
624                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
625                                          sizeof(KEY_SPTLRPC_CONF),
626                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
627                 rc = rc ? rc : rc2;
628                 class_decref(obd, __FUNCTION__, obd);
629                 read_lock(&obd_dev_lock);
630         }
631         read_unlock(&obd_dev_lock);
632         return rc;
633 }
634 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
635
636 void obd_cleanup_caches(void)
637 {
638         ENTRY;
639         if (obd_device_cachep) {
640                 kmem_cache_destroy(obd_device_cachep);
641                 obd_device_cachep = NULL;
642         }
643         if (obdo_cachep) {
644                 kmem_cache_destroy(obdo_cachep);
645                 obdo_cachep = NULL;
646         }
647         if (import_cachep) {
648                 kmem_cache_destroy(import_cachep);
649                 import_cachep = NULL;
650         }
651         if (capa_cachep) {
652                 kmem_cache_destroy(capa_cachep);
653                 capa_cachep = NULL;
654         }
655         EXIT;
656 }
657
658 int obd_init_caches(void)
659 {
660         ENTRY;
661
662         LASSERT(obd_device_cachep == NULL);
663         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
664                                                  sizeof(struct obd_device),
665                                                  0, 0, NULL);
666         if (!obd_device_cachep)
667                 GOTO(out, -ENOMEM);
668
669         LASSERT(obdo_cachep == NULL);
670         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
671                                            0, 0, NULL);
672         if (!obdo_cachep)
673                 GOTO(out, -ENOMEM);
674
675         LASSERT(import_cachep == NULL);
676         import_cachep = kmem_cache_create("ll_import_cache",
677                                              sizeof(struct obd_import),
678                                              0, 0, NULL);
679         if (!import_cachep)
680                 GOTO(out, -ENOMEM);
681
682         LASSERT(capa_cachep == NULL);
683         capa_cachep = kmem_cache_create("capa_cache",
684                                            sizeof(struct obd_capa), 0, 0, NULL);
685         if (!capa_cachep)
686                 GOTO(out, -ENOMEM);
687
688         RETURN(0);
689  out:
690         obd_cleanup_caches();
691         RETURN(-ENOMEM);
692
693 }
694
695 /* map connection to client */
696 struct obd_export *class_conn2export(struct lustre_handle *conn)
697 {
698         struct obd_export *export;
699         ENTRY;
700
701         if (!conn) {
702                 CDEBUG(D_CACHE, "looking for null handle\n");
703                 RETURN(NULL);
704         }
705
706         if (conn->cookie == -1) {  /* this means assign a new connection */
707                 CDEBUG(D_CACHE, "want a new connection\n");
708                 RETURN(NULL);
709         }
710
711         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
712         export = class_handle2object(conn->cookie);
713         RETURN(export);
714 }
715 EXPORT_SYMBOL(class_conn2export);
716
717 struct obd_device *class_exp2obd(struct obd_export *exp)
718 {
719         if (exp)
720                 return exp->exp_obd;
721         return NULL;
722 }
723 EXPORT_SYMBOL(class_exp2obd);
724
725 struct obd_device *class_conn2obd(struct lustre_handle *conn)
726 {
727         struct obd_export *export;
728         export = class_conn2export(conn);
729         if (export) {
730                 struct obd_device *obd = export->exp_obd;
731                 class_export_put(export);
732                 return obd;
733         }
734         return NULL;
735 }
736 EXPORT_SYMBOL(class_conn2obd);
737
738 struct obd_import *class_exp2cliimp(struct obd_export *exp)
739 {
740         struct obd_device *obd = exp->exp_obd;
741         if (obd == NULL)
742                 return NULL;
743         return obd->u.cli.cl_import;
744 }
745 EXPORT_SYMBOL(class_exp2cliimp);
746
747 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
748 {
749         struct obd_device *obd = class_conn2obd(conn);
750         if (obd == NULL)
751                 return NULL;
752         return obd->u.cli.cl_import;
753 }
754 EXPORT_SYMBOL(class_conn2cliimp);
755
756 /* Export management functions */
757 static void class_export_destroy(struct obd_export *exp)
758 {
759         struct obd_device *obd = exp->exp_obd;
760         ENTRY;
761
762         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
763         LASSERT(obd != NULL);
764
765         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
766                exp->exp_client_uuid.uuid, obd->obd_name);
767
768         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
769         if (exp->exp_connection)
770                 ptlrpc_put_connection_superhack(exp->exp_connection);
771
772         LASSERT(list_empty(&exp->exp_outstanding_replies));
773         LASSERT(list_empty(&exp->exp_uncommitted_replies));
774         LASSERT(list_empty(&exp->exp_req_replay_queue));
775         LASSERT(list_empty(&exp->exp_hp_rpcs));
776         obd_destroy_export(exp);
777         class_decref(obd, "export", exp);
778
779         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
780         EXIT;
781 }
782
783 static void export_handle_addref(void *export)
784 {
785         class_export_get(export);
786 }
787
788 static struct portals_handle_ops export_handle_ops = {
789         .hop_addref = export_handle_addref,
790         .hop_free   = NULL,
791 };
792
793 struct obd_export *class_export_get(struct obd_export *exp)
794 {
795         atomic_inc(&exp->exp_refcount);
796         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
797                atomic_read(&exp->exp_refcount));
798         return exp;
799 }
800 EXPORT_SYMBOL(class_export_get);
801
802 void class_export_put(struct obd_export *exp)
803 {
804         LASSERT(exp != NULL);
805         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
806         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
807                atomic_read(&exp->exp_refcount) - 1);
808
809         if (atomic_dec_and_test(&exp->exp_refcount)) {
810                 LASSERT(!list_empty(&exp->exp_obd_chain));
811                 CDEBUG(D_IOCTL, "final put %p/%s\n",
812                        exp, exp->exp_client_uuid.uuid);
813
814                 /* release nid stat refererence */
815                 lprocfs_exp_cleanup(exp);
816
817                 obd_zombie_export_add(exp);
818         }
819 }
820 EXPORT_SYMBOL(class_export_put);
821
822 /* Creates a new export, adds it to the hash table, and returns a
823  * pointer to it. The refcount is 2: one for the hash reference, and
824  * one for the pointer returned by this function. */
825 struct obd_export *class_new_export(struct obd_device *obd,
826                                     struct obd_uuid *cluuid)
827 {
828         struct obd_export *export;
829         cfs_hash_t *hash = NULL;
830         int rc = 0;
831         ENTRY;
832
833         OBD_ALLOC_PTR(export);
834         if (!export)
835                 return ERR_PTR(-ENOMEM);
836
837         export->exp_conn_cnt = 0;
838         export->exp_lock_hash = NULL;
839         export->exp_flock_hash = NULL;
840         atomic_set(&export->exp_refcount, 2);
841         atomic_set(&export->exp_rpc_count, 0);
842         atomic_set(&export->exp_cb_count, 0);
843         atomic_set(&export->exp_locks_count, 0);
844 #if LUSTRE_TRACKS_LOCK_EXP_REFS
845         INIT_LIST_HEAD(&export->exp_locks_list);
846         spin_lock_init(&export->exp_locks_list_guard);
847 #endif
848         atomic_set(&export->exp_replay_count, 0);
849         export->exp_obd = obd;
850         INIT_LIST_HEAD(&export->exp_outstanding_replies);
851         spin_lock_init(&export->exp_uncommitted_replies_lock);
852         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
853         INIT_LIST_HEAD(&export->exp_req_replay_queue);
854         INIT_LIST_HEAD(&export->exp_handle.h_link);
855         INIT_LIST_HEAD(&export->exp_hp_rpcs);
856         class_handle_hash(&export->exp_handle, &export_handle_ops);
857         export->exp_last_request_time = cfs_time_current_sec();
858         spin_lock_init(&export->exp_lock);
859         spin_lock_init(&export->exp_rpc_lock);
860         INIT_HLIST_NODE(&export->exp_uuid_hash);
861         INIT_HLIST_NODE(&export->exp_nid_hash);
862         spin_lock_init(&export->exp_bl_list_lock);
863         INIT_LIST_HEAD(&export->exp_bl_list);
864
865         export->exp_sp_peer = LUSTRE_SP_ANY;
866         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
867         export->exp_client_uuid = *cluuid;
868         obd_init_export(export);
869
870         spin_lock(&obd->obd_dev_lock);
871         /* shouldn't happen, but might race */
872         if (obd->obd_stopping)
873                 GOTO(exit_unlock, rc = -ENODEV);
874
875         hash = cfs_hash_getref(obd->obd_uuid_hash);
876         if (hash == NULL)
877                 GOTO(exit_unlock, rc = -ENODEV);
878         spin_unlock(&obd->obd_dev_lock);
879
880         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
881                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
882                 if (rc != 0) {
883                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
884                                       obd->obd_name, cluuid->uuid, rc);
885                         GOTO(exit_err, rc = -EALREADY);
886                 }
887         }
888
889         spin_lock(&obd->obd_dev_lock);
890         if (obd->obd_stopping) {
891                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
892                 GOTO(exit_unlock, rc = -ENODEV);
893         }
894
895         class_incref(obd, "export", export);
896         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
897         list_add_tail(&export->exp_obd_chain_timed,
898                           &export->exp_obd->obd_exports_timed);
899         export->exp_obd->obd_num_exports++;
900         spin_unlock(&obd->obd_dev_lock);
901         cfs_hash_putref(hash);
902         RETURN(export);
903
904 exit_unlock:
905         spin_unlock(&obd->obd_dev_lock);
906 exit_err:
907         if (hash)
908                 cfs_hash_putref(hash);
909         class_handle_unhash(&export->exp_handle);
910         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
911         obd_destroy_export(export);
912         OBD_FREE_PTR(export);
913         return ERR_PTR(rc);
914 }
915 EXPORT_SYMBOL(class_new_export);
916
917 void class_unlink_export(struct obd_export *exp)
918 {
919         class_handle_unhash(&exp->exp_handle);
920
921         spin_lock(&exp->exp_obd->obd_dev_lock);
922         /* delete an uuid-export hashitem from hashtables */
923         if (!hlist_unhashed(&exp->exp_uuid_hash))
924                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
925                              &exp->exp_client_uuid,
926                              &exp->exp_uuid_hash);
927
928         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
929         list_del_init(&exp->exp_obd_chain_timed);
930         exp->exp_obd->obd_num_exports--;
931         spin_unlock(&exp->exp_obd->obd_dev_lock);
932         class_export_put(exp);
933 }
934 EXPORT_SYMBOL(class_unlink_export);
935
936 /* Import management functions */
937 void class_import_destroy(struct obd_import *imp)
938 {
939         ENTRY;
940
941         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
942                 imp->imp_obd->obd_name);
943
944         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
945
946         ptlrpc_put_connection_superhack(imp->imp_connection);
947
948         while (!list_empty(&imp->imp_conn_list)) {
949                 struct obd_import_conn *imp_conn;
950
951                 imp_conn = list_entry(imp->imp_conn_list.next,
952                                           struct obd_import_conn, oic_item);
953                 list_del_init(&imp_conn->oic_item);
954                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
955                 OBD_FREE(imp_conn, sizeof(*imp_conn));
956         }
957
958         LASSERT(imp->imp_sec == NULL);
959         class_decref(imp->imp_obd, "import", imp);
960         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
961         EXIT;
962 }
963
964 static void import_handle_addref(void *import)
965 {
966         class_import_get(import);
967 }
968
969 static struct portals_handle_ops import_handle_ops = {
970         .hop_addref = import_handle_addref,
971         .hop_free   = NULL,
972 };
973
974 struct obd_import *class_import_get(struct obd_import *import)
975 {
976         atomic_inc(&import->imp_refcount);
977         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
978                atomic_read(&import->imp_refcount),
979                import->imp_obd->obd_name);
980         return import;
981 }
982 EXPORT_SYMBOL(class_import_get);
983
984 void class_import_put(struct obd_import *imp)
985 {
986         ENTRY;
987
988         LASSERT(list_empty(&imp->imp_zombie_chain));
989         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
990
991         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
992                atomic_read(&imp->imp_refcount) - 1,
993                imp->imp_obd->obd_name);
994
995         if (atomic_dec_and_test(&imp->imp_refcount)) {
996                 CDEBUG(D_INFO, "final put import %p\n", imp);
997                 obd_zombie_import_add(imp);
998         }
999
1000         /* catch possible import put race */
1001         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1002         EXIT;
1003 }
1004 EXPORT_SYMBOL(class_import_put);
1005
1006 static void init_imp_at(struct imp_at *at) {
1007         int i;
1008         at_init(&at->iat_net_latency, 0, 0);
1009         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1010                 /* max service estimates are tracked on the server side, so
1011                    don't use the AT history here, just use the last reported
1012                    val. (But keep hist for proc histogram, worst_ever) */
1013                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1014                         AT_FLG_NOHIST);
1015         }
1016 }
1017
1018 struct obd_import *class_new_import(struct obd_device *obd)
1019 {
1020         struct obd_import *imp;
1021
1022         OBD_ALLOC(imp, sizeof(*imp));
1023         if (imp == NULL)
1024                 return NULL;
1025
1026         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1027         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1028         INIT_LIST_HEAD(&imp->imp_replay_list);
1029         INIT_LIST_HEAD(&imp->imp_sending_list);
1030         INIT_LIST_HEAD(&imp->imp_delayed_list);
1031         spin_lock_init(&imp->imp_lock);
1032         imp->imp_last_success_conn = 0;
1033         imp->imp_state = LUSTRE_IMP_NEW;
1034         imp->imp_obd = class_incref(obd, "import", imp);
1035         mutex_init(&imp->imp_sec_mutex);
1036         init_waitqueue_head(&imp->imp_recovery_waitq);
1037
1038         atomic_set(&imp->imp_refcount, 2);
1039         atomic_set(&imp->imp_unregistering, 0);
1040         atomic_set(&imp->imp_inflight, 0);
1041         atomic_set(&imp->imp_replay_inflight, 0);
1042         atomic_set(&imp->imp_inval_count, 0);
1043         INIT_LIST_HEAD(&imp->imp_conn_list);
1044         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1045         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1046         init_imp_at(&imp->imp_at);
1047
1048         /* the default magic is V2, will be used in connect RPC, and
1049          * then adjusted according to the flags in request/reply. */
1050         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1051
1052         return imp;
1053 }
1054 EXPORT_SYMBOL(class_new_import);
1055
1056 void class_destroy_import(struct obd_import *import)
1057 {
1058         LASSERT(import != NULL);
1059         LASSERT(import != LP_POISON);
1060
1061         class_handle_unhash(&import->imp_handle);
1062
1063         spin_lock(&import->imp_lock);
1064         import->imp_generation++;
1065         spin_unlock(&import->imp_lock);
1066         class_import_put(import);
1067 }
1068 EXPORT_SYMBOL(class_destroy_import);
1069
1070 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1071
1072 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1073 {
1074         spin_lock(&exp->exp_locks_list_guard);
1075
1076         LASSERT(lock->l_exp_refs_nr >= 0);
1077
1078         if (lock->l_exp_refs_target != NULL &&
1079             lock->l_exp_refs_target != exp) {
1080                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1081                               exp, lock, lock->l_exp_refs_target);
1082         }
1083         if ((lock->l_exp_refs_nr ++) == 0) {
1084                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1085                 lock->l_exp_refs_target = exp;
1086         }
1087         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1088                lock, exp, lock->l_exp_refs_nr);
1089         spin_unlock(&exp->exp_locks_list_guard);
1090 }
1091 EXPORT_SYMBOL(__class_export_add_lock_ref);
1092
1093 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1094 {
1095         spin_lock(&exp->exp_locks_list_guard);
1096         LASSERT(lock->l_exp_refs_nr > 0);
1097         if (lock->l_exp_refs_target != exp) {
1098                 LCONSOLE_WARN("lock %p, "
1099                               "mismatching export pointers: %p, %p\n",
1100                               lock, lock->l_exp_refs_target, exp);
1101         }
1102         if (-- lock->l_exp_refs_nr == 0) {
1103                 list_del_init(&lock->l_exp_refs_link);
1104                 lock->l_exp_refs_target = NULL;
1105         }
1106         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1107                lock, exp, lock->l_exp_refs_nr);
1108         spin_unlock(&exp->exp_locks_list_guard);
1109 }
1110 EXPORT_SYMBOL(__class_export_del_lock_ref);
1111 #endif
1112
1113 /* A connection defines an export context in which preallocation can
1114    be managed. This releases the export pointer reference, and returns
1115    the export handle, so the export refcount is 1 when this function
1116    returns. */
1117 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1118                   struct obd_uuid *cluuid)
1119 {
1120         struct obd_export *export;
1121         LASSERT(conn != NULL);
1122         LASSERT(obd != NULL);
1123         LASSERT(cluuid != NULL);
1124         ENTRY;
1125
1126         export = class_new_export(obd, cluuid);
1127         if (IS_ERR(export))
1128                 RETURN(PTR_ERR(export));
1129
1130         conn->cookie = export->exp_handle.h_cookie;
1131         class_export_put(export);
1132
1133         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1134                cluuid->uuid, conn->cookie);
1135         RETURN(0);
1136 }
1137 EXPORT_SYMBOL(class_connect);
1138
1139 /* if export is involved in recovery then clean up related things */
1140 void class_export_recovery_cleanup(struct obd_export *exp)
1141 {
1142         struct obd_device *obd = exp->exp_obd;
1143
1144         spin_lock(&obd->obd_recovery_task_lock);
1145         if (exp->exp_delayed)
1146                 obd->obd_delayed_clients--;
1147         if (obd->obd_recovering) {
1148                 if (exp->exp_in_recovery) {
1149                         spin_lock(&exp->exp_lock);
1150                         exp->exp_in_recovery = 0;
1151                         spin_unlock(&exp->exp_lock);
1152                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1153                         atomic_dec(&obd->obd_connected_clients);
1154                 }
1155
1156                 /* if called during recovery then should update
1157                  * obd_stale_clients counter,
1158                  * lightweight exports are not counted */
1159                 if (exp->exp_failed &&
1160                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1161                         exp->exp_obd->obd_stale_clients++;
1162         }
1163         spin_unlock(&obd->obd_recovery_task_lock);
1164         /** Cleanup req replay fields */
1165         if (exp->exp_req_replay_needed) {
1166                 spin_lock(&exp->exp_lock);
1167                 exp->exp_req_replay_needed = 0;
1168                 spin_unlock(&exp->exp_lock);
1169                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1170                 atomic_dec(&obd->obd_req_replay_clients);
1171         }
1172         /** Cleanup lock replay data */
1173         if (exp->exp_lock_replay_needed) {
1174                 spin_lock(&exp->exp_lock);
1175                 exp->exp_lock_replay_needed = 0;
1176                 spin_unlock(&exp->exp_lock);
1177                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1178                 atomic_dec(&obd->obd_lock_replay_clients);
1179         }
1180 }
1181
1182 /* This function removes 1-3 references from the export:
1183  * 1 - for export pointer passed
1184  * and if disconnect really need
1185  * 2 - removing from hash
1186  * 3 - in client_unlink_export
1187  * The export pointer passed to this function can destroyed */
1188 int class_disconnect(struct obd_export *export)
1189 {
1190         int already_disconnected;
1191         ENTRY;
1192
1193         if (export == NULL) {
1194                 CWARN("attempting to free NULL export %p\n", export);
1195                 RETURN(-EINVAL);
1196         }
1197
1198         spin_lock(&export->exp_lock);
1199         already_disconnected = export->exp_disconnected;
1200         export->exp_disconnected = 1;
1201         spin_unlock(&export->exp_lock);
1202
1203         /* class_cleanup(), abort_recovery(), and class_fail_export()
1204          * all end up in here, and if any of them race we shouldn't
1205          * call extra class_export_puts(). */
1206         if (already_disconnected) {
1207                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1208                 GOTO(no_disconn, already_disconnected);
1209         }
1210
1211         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1212                export->exp_handle.h_cookie);
1213
1214         if (!hlist_unhashed(&export->exp_nid_hash))
1215                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1216                              &export->exp_connection->c_peer.nid,
1217                              &export->exp_nid_hash);
1218
1219         class_export_recovery_cleanup(export);
1220         class_unlink_export(export);
1221 no_disconn:
1222         class_export_put(export);
1223         RETURN(0);
1224 }
1225 EXPORT_SYMBOL(class_disconnect);
1226
1227 /* Return non-zero for a fully connected export */
1228 int class_connected_export(struct obd_export *exp)
1229 {
1230         if (exp) {
1231                 int connected;
1232                 spin_lock(&exp->exp_lock);
1233                 connected = (exp->exp_conn_cnt > 0);
1234                 spin_unlock(&exp->exp_lock);
1235                 return connected;
1236         }
1237         return 0;
1238 }
1239 EXPORT_SYMBOL(class_connected_export);
1240
1241 static void class_disconnect_export_list(struct list_head *list,
1242                                          enum obd_option flags)
1243 {
1244         int rc;
1245         struct obd_export *exp;
1246         ENTRY;
1247
1248         /* It's possible that an export may disconnect itself, but
1249          * nothing else will be added to this list. */
1250         while (!list_empty(list)) {
1251                 exp = list_entry(list->next, struct obd_export,
1252                                      exp_obd_chain);
1253                 /* need for safe call CDEBUG after obd_disconnect */
1254                 class_export_get(exp);
1255
1256                 spin_lock(&exp->exp_lock);
1257                 exp->exp_flags = flags;
1258                 spin_unlock(&exp->exp_lock);
1259
1260                 if (obd_uuid_equals(&exp->exp_client_uuid,
1261                                     &exp->exp_obd->obd_uuid)) {
1262                         CDEBUG(D_HA,
1263                                "exp %p export uuid == obd uuid, don't discon\n",
1264                                exp);
1265                         /* Need to delete this now so we don't end up pointing
1266                          * to work_list later when this export is cleaned up. */
1267                         list_del_init(&exp->exp_obd_chain);
1268                         class_export_put(exp);
1269                         continue;
1270                 }
1271
1272                 class_export_get(exp);
1273                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1274                        "last request at "CFS_TIME_T"\n",
1275                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1276                        exp, exp->exp_last_request_time);
1277                 /* release one export reference anyway */
1278                 rc = obd_disconnect(exp);
1279
1280                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1281                        obd_export_nid2str(exp), exp, rc);
1282                 class_export_put(exp);
1283         }
1284         EXIT;
1285 }
1286
1287 void class_disconnect_exports(struct obd_device *obd)
1288 {
1289         struct list_head work_list;
1290         ENTRY;
1291
1292         /* Move all of the exports from obd_exports to a work list, en masse. */
1293         INIT_LIST_HEAD(&work_list);
1294         spin_lock(&obd->obd_dev_lock);
1295         list_splice_init(&obd->obd_exports, &work_list);
1296         list_splice_init(&obd->obd_delayed_exports, &work_list);
1297         spin_unlock(&obd->obd_dev_lock);
1298
1299         if (!list_empty(&work_list)) {
1300                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1301                        "disconnecting them\n", obd->obd_minor, obd);
1302                 class_disconnect_export_list(&work_list,
1303                                              exp_flags_from_obd(obd));
1304         } else
1305                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1306                        obd->obd_minor, obd);
1307         EXIT;
1308 }
1309 EXPORT_SYMBOL(class_disconnect_exports);
1310
1311 /* Remove exports that have not completed recovery.
1312  */
1313 void class_disconnect_stale_exports(struct obd_device *obd,
1314                                     int (*test_export)(struct obd_export *))
1315 {
1316         struct list_head work_list;
1317         struct obd_export *exp, *n;
1318         int evicted = 0;
1319         ENTRY;
1320
1321         INIT_LIST_HEAD(&work_list);
1322         spin_lock(&obd->obd_dev_lock);
1323         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1324                                      exp_obd_chain) {
1325                 /* don't count self-export as client */
1326                 if (obd_uuid_equals(&exp->exp_client_uuid,
1327                                     &exp->exp_obd->obd_uuid))
1328                         continue;
1329
1330                 /* don't evict clients which have no slot in last_rcvd
1331                  * (e.g. lightweight connection) */
1332                 if (exp->exp_target_data.ted_lr_idx == -1)
1333                         continue;
1334
1335                 spin_lock(&exp->exp_lock);
1336                 if (exp->exp_failed || test_export(exp)) {
1337                         spin_unlock(&exp->exp_lock);
1338                         continue;
1339                 }
1340                 exp->exp_failed = 1;
1341                 spin_unlock(&exp->exp_lock);
1342
1343                 list_move(&exp->exp_obd_chain, &work_list);
1344                 evicted++;
1345                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1346                        obd->obd_name, exp->exp_client_uuid.uuid,
1347                        exp->exp_connection == NULL ? "<unknown>" :
1348                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1349                 print_export_data(exp, "EVICTING", 0);
1350         }
1351         spin_unlock(&obd->obd_dev_lock);
1352
1353         if (evicted)
1354                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1355                               obd->obd_name, evicted);
1356
1357         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1358                                                  OBD_OPT_ABORT_RECOV);
1359         EXIT;
1360 }
1361 EXPORT_SYMBOL(class_disconnect_stale_exports);
1362
1363 void class_fail_export(struct obd_export *exp)
1364 {
1365         int rc, already_failed;
1366
1367         spin_lock(&exp->exp_lock);
1368         already_failed = exp->exp_failed;
1369         exp->exp_failed = 1;
1370         spin_unlock(&exp->exp_lock);
1371
1372         if (already_failed) {
1373                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1374                        exp, exp->exp_client_uuid.uuid);
1375                 return;
1376         }
1377
1378         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1379                exp, exp->exp_client_uuid.uuid);
1380
1381         if (obd_dump_on_timeout)
1382                 libcfs_debug_dumplog();
1383
1384         /* need for safe call CDEBUG after obd_disconnect */
1385         class_export_get(exp);
1386
1387         /* Most callers into obd_disconnect are removing their own reference
1388          * (request, for example) in addition to the one from the hash table.
1389          * We don't have such a reference here, so make one. */
1390         class_export_get(exp);
1391         rc = obd_disconnect(exp);
1392         if (rc)
1393                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1394         else
1395                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1396                        exp, exp->exp_client_uuid.uuid);
1397         class_export_put(exp);
1398 }
1399 EXPORT_SYMBOL(class_fail_export);
1400
1401 char *obd_export_nid2str(struct obd_export *exp)
1402 {
1403         if (exp->exp_connection != NULL)
1404                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1405
1406         return "(no nid)";
1407 }
1408 EXPORT_SYMBOL(obd_export_nid2str);
1409
1410 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1411 {
1412         cfs_hash_t *nid_hash;
1413         struct obd_export *doomed_exp = NULL;
1414         int exports_evicted = 0;
1415
1416         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1417
1418         spin_lock(&obd->obd_dev_lock);
1419         /* umount has run already, so evict thread should leave
1420          * its task to umount thread now */
1421         if (obd->obd_stopping) {
1422                 spin_unlock(&obd->obd_dev_lock);
1423                 return exports_evicted;
1424         }
1425         nid_hash = obd->obd_nid_hash;
1426         cfs_hash_getref(nid_hash);
1427         spin_unlock(&obd->obd_dev_lock);
1428
1429         do {
1430                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1431                 if (doomed_exp == NULL)
1432                         break;
1433
1434                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1435                          "nid %s found, wanted nid %s, requested nid %s\n",
1436                          obd_export_nid2str(doomed_exp),
1437                          libcfs_nid2str(nid_key), nid);
1438                 LASSERTF(doomed_exp != obd->obd_self_export,
1439                          "self-export is hashed by NID?\n");
1440                 exports_evicted++;
1441                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1442                               "request\n", obd->obd_name,
1443                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1444                               obd_export_nid2str(doomed_exp));
1445                 class_fail_export(doomed_exp);
1446                 class_export_put(doomed_exp);
1447         } while (1);
1448
1449         cfs_hash_putref(nid_hash);
1450
1451         if (!exports_evicted)
1452                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1453                        obd->obd_name, nid);
1454         return exports_evicted;
1455 }
1456 EXPORT_SYMBOL(obd_export_evict_by_nid);
1457
1458 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1459 {
1460         cfs_hash_t *uuid_hash;
1461         struct obd_export *doomed_exp = NULL;
1462         struct obd_uuid doomed_uuid;
1463         int exports_evicted = 0;
1464
1465         spin_lock(&obd->obd_dev_lock);
1466         if (obd->obd_stopping) {
1467                 spin_unlock(&obd->obd_dev_lock);
1468                 return exports_evicted;
1469         }
1470         uuid_hash = obd->obd_uuid_hash;
1471         cfs_hash_getref(uuid_hash);
1472         spin_unlock(&obd->obd_dev_lock);
1473
1474         obd_str2uuid(&doomed_uuid, uuid);
1475         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1476                 CERROR("%s: can't evict myself\n", obd->obd_name);
1477                 cfs_hash_putref(uuid_hash);
1478                 return exports_evicted;
1479         }
1480
1481         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1482
1483         if (doomed_exp == NULL) {
1484                 CERROR("%s: can't disconnect %s: no exports found\n",
1485                        obd->obd_name, uuid);
1486         } else {
1487                 CWARN("%s: evicting %s at adminstrative request\n",
1488                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1489                 class_fail_export(doomed_exp);
1490                 class_export_put(doomed_exp);
1491                 exports_evicted++;
1492         }
1493         cfs_hash_putref(uuid_hash);
1494
1495         return exports_evicted;
1496 }
1497 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1498
1499 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1500 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1501 EXPORT_SYMBOL(class_export_dump_hook);
1502 #endif
1503
1504 static void print_export_data(struct obd_export *exp, const char *status,
1505                               int locks)
1506 {
1507         struct ptlrpc_reply_state *rs;
1508         struct ptlrpc_reply_state *first_reply = NULL;
1509         int nreplies = 0;
1510
1511         spin_lock(&exp->exp_lock);
1512         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1513                                 rs_exp_list) {
1514                 if (nreplies == 0)
1515                         first_reply = rs;
1516                 nreplies++;
1517         }
1518         spin_unlock(&exp->exp_lock);
1519
1520         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1521                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1522                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1523                atomic_read(&exp->exp_rpc_count),
1524                atomic_read(&exp->exp_cb_count),
1525                atomic_read(&exp->exp_locks_count),
1526                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1527                nreplies, first_reply, nreplies > 3 ? "..." : "",
1528                exp->exp_last_committed);
1529 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1530         if (locks && class_export_dump_hook != NULL)
1531                 class_export_dump_hook(exp);
1532 #endif
1533 }
1534
1535 void dump_exports(struct obd_device *obd, int locks)
1536 {
1537         struct obd_export *exp;
1538
1539         spin_lock(&obd->obd_dev_lock);
1540         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1541                 print_export_data(exp, "ACTIVE", locks);
1542         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1543                 print_export_data(exp, "UNLINKED", locks);
1544         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1545                 print_export_data(exp, "DELAYED", locks);
1546         spin_unlock(&obd->obd_dev_lock);
1547         spin_lock(&obd_zombie_impexp_lock);
1548         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1549                 print_export_data(exp, "ZOMBIE", locks);
1550         spin_unlock(&obd_zombie_impexp_lock);
1551 }
1552 EXPORT_SYMBOL(dump_exports);
1553
1554 void obd_exports_barrier(struct obd_device *obd)
1555 {
1556         int waited = 2;
1557         LASSERT(list_empty(&obd->obd_exports));
1558         spin_lock(&obd->obd_dev_lock);
1559         while (!list_empty(&obd->obd_unlinked_exports)) {
1560                 spin_unlock(&obd->obd_dev_lock);
1561                 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1562                                                    cfs_time_seconds(waited));
1563                 if (waited > 5 && IS_PO2(waited)) {
1564                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1565                                       "more than %d seconds. "
1566                                       "The obd refcount = %d. Is it stuck?\n",
1567                                       obd->obd_name, waited,
1568                                       atomic_read(&obd->obd_refcount));
1569                         dump_exports(obd, 1);
1570                 }
1571                 waited *= 2;
1572                 spin_lock(&obd->obd_dev_lock);
1573         }
1574         spin_unlock(&obd->obd_dev_lock);
1575 }
1576 EXPORT_SYMBOL(obd_exports_barrier);
1577
1578 /* Total amount of zombies to be destroyed */
1579 static int zombies_count = 0;
1580
1581 /**
1582  * kill zombie imports and exports
1583  */
1584 void obd_zombie_impexp_cull(void)
1585 {
1586         struct obd_import *import;
1587         struct obd_export *export;
1588         ENTRY;
1589
1590         do {
1591                 spin_lock(&obd_zombie_impexp_lock);
1592
1593                 import = NULL;
1594                 if (!list_empty(&obd_zombie_imports)) {
1595                         import = list_entry(obd_zombie_imports.next,
1596                                                 struct obd_import,
1597                                                 imp_zombie_chain);
1598                         list_del_init(&import->imp_zombie_chain);
1599                 }
1600
1601                 export = NULL;
1602                 if (!list_empty(&obd_zombie_exports)) {
1603                         export = list_entry(obd_zombie_exports.next,
1604                                                 struct obd_export,
1605                                                 exp_obd_chain);
1606                         list_del_init(&export->exp_obd_chain);
1607                 }
1608
1609                 spin_unlock(&obd_zombie_impexp_lock);
1610
1611                 if (import != NULL) {
1612                         class_import_destroy(import);
1613                         spin_lock(&obd_zombie_impexp_lock);
1614                         zombies_count--;
1615                         spin_unlock(&obd_zombie_impexp_lock);
1616                 }
1617
1618                 if (export != NULL) {
1619                         class_export_destroy(export);
1620                         spin_lock(&obd_zombie_impexp_lock);
1621                         zombies_count--;
1622                         spin_unlock(&obd_zombie_impexp_lock);
1623                 }
1624
1625                 cond_resched();
1626         } while (import != NULL || export != NULL);
1627         EXIT;
1628 }
1629
1630 static struct completion        obd_zombie_start;
1631 static struct completion        obd_zombie_stop;
1632 static unsigned long            obd_zombie_flags;
1633 static wait_queue_head_t                obd_zombie_waitq;
1634 static pid_t                    obd_zombie_pid;
1635
1636 enum {
1637         OBD_ZOMBIE_STOP         = 0x0001,
1638 };
1639
1640 /**
1641  * check for work for kill zombie import/export thread.
1642  */
1643 static int obd_zombie_impexp_check(void *arg)
1644 {
1645         int rc;
1646
1647         spin_lock(&obd_zombie_impexp_lock);
1648         rc = (zombies_count == 0) &&
1649              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1650         spin_unlock(&obd_zombie_impexp_lock);
1651
1652         RETURN(rc);
1653 }
1654
1655 /**
1656  * Add export to the obd_zombe thread and notify it.
1657  */
1658 static void obd_zombie_export_add(struct obd_export *exp) {
1659         spin_lock(&exp->exp_obd->obd_dev_lock);
1660         LASSERT(!list_empty(&exp->exp_obd_chain));
1661         list_del_init(&exp->exp_obd_chain);
1662         spin_unlock(&exp->exp_obd->obd_dev_lock);
1663         spin_lock(&obd_zombie_impexp_lock);
1664         zombies_count++;
1665         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1666         spin_unlock(&obd_zombie_impexp_lock);
1667
1668         obd_zombie_impexp_notify();
1669 }
1670
1671 /**
1672  * Add import to the obd_zombe thread and notify it.
1673  */
1674 static void obd_zombie_import_add(struct obd_import *imp) {
1675         LASSERT(imp->imp_sec == NULL);
1676         LASSERT(imp->imp_rq_pool == NULL);
1677         spin_lock(&obd_zombie_impexp_lock);
1678         LASSERT(list_empty(&imp->imp_zombie_chain));
1679         zombies_count++;
1680         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1681         spin_unlock(&obd_zombie_impexp_lock);
1682
1683         obd_zombie_impexp_notify();
1684 }
1685
1686 /**
1687  * notify import/export destroy thread about new zombie.
1688  */
1689 static void obd_zombie_impexp_notify(void)
1690 {
1691         /*
1692          * Make sure obd_zomebie_impexp_thread get this notification.
1693          * It is possible this signal only get by obd_zombie_barrier, and
1694          * barrier gulps this notification and sleeps away and hangs ensues
1695          */
1696         wake_up_all(&obd_zombie_waitq);
1697 }
1698
1699 /**
1700  * check whether obd_zombie is idle
1701  */
1702 static int obd_zombie_is_idle(void)
1703 {
1704         int rc;
1705
1706         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1707         spin_lock(&obd_zombie_impexp_lock);
1708         rc = (zombies_count == 0);
1709         spin_unlock(&obd_zombie_impexp_lock);
1710         return rc;
1711 }
1712
1713 /**
1714  * wait when obd_zombie import/export queues become empty
1715  */
1716 void obd_zombie_barrier(void)
1717 {
1718         struct l_wait_info lwi = { 0 };
1719
1720         if (obd_zombie_pid == current_pid())
1721                 /* don't wait for myself */
1722                 return;
1723         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1724 }
1725 EXPORT_SYMBOL(obd_zombie_barrier);
1726
1727
1728 /**
1729  * destroy zombie export/import thread.
1730  */
1731 static int obd_zombie_impexp_thread(void *unused)
1732 {
1733         unshare_fs_struct();
1734         complete(&obd_zombie_start);
1735
1736         obd_zombie_pid = current_pid();
1737
1738         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1739                 struct l_wait_info lwi = { 0 };
1740
1741                 l_wait_event(obd_zombie_waitq,
1742                              !obd_zombie_impexp_check(NULL), &lwi);
1743                 obd_zombie_impexp_cull();
1744
1745                 /*
1746                  * Notify obd_zombie_barrier callers that queues
1747                  * may be empty.
1748                  */
1749                 wake_up(&obd_zombie_waitq);
1750         }
1751
1752         complete(&obd_zombie_stop);
1753
1754         RETURN(0);
1755 }
1756
1757
1758 /**
1759  * start destroy zombie import/export thread
1760  */
1761 int obd_zombie_impexp_init(void)
1762 {
1763         task_t *task;
1764
1765         INIT_LIST_HEAD(&obd_zombie_imports);
1766         INIT_LIST_HEAD(&obd_zombie_exports);
1767         spin_lock_init(&obd_zombie_impexp_lock);
1768         init_completion(&obd_zombie_start);
1769         init_completion(&obd_zombie_stop);
1770         init_waitqueue_head(&obd_zombie_waitq);
1771         obd_zombie_pid = 0;
1772
1773         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1774         if (IS_ERR(task))
1775                 RETURN(PTR_ERR(task));
1776
1777         wait_for_completion(&obd_zombie_start);
1778         RETURN(0);
1779 }
1780 /**
1781  * stop destroy zombie import/export thread
1782  */
1783 void obd_zombie_impexp_stop(void)
1784 {
1785         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1786         obd_zombie_impexp_notify();
1787         wait_for_completion(&obd_zombie_stop);
1788 }
1789
1790 /***** Kernel-userspace comm helpers *******/
1791
1792 /* Get length of entire message, including header */
1793 int kuc_len(int payload_len)
1794 {
1795         return sizeof(struct kuc_hdr) + payload_len;
1796 }
1797 EXPORT_SYMBOL(kuc_len);
1798
1799 /* Get a pointer to kuc header, given a ptr to the payload
1800  * @param p Pointer to payload area
1801  * @returns Pointer to kuc header
1802  */
1803 struct kuc_hdr * kuc_ptr(void *p)
1804 {
1805         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1806         LASSERT(lh->kuc_magic == KUC_MAGIC);
1807         return lh;
1808 }
1809 EXPORT_SYMBOL(kuc_ptr);
1810
1811 /* Test if payload is part of kuc message
1812  * @param p Pointer to payload area
1813  * @returns boolean
1814  */
1815 int kuc_ispayload(void *p)
1816 {
1817         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1818
1819         if (kh->kuc_magic == KUC_MAGIC)
1820                 return 1;
1821         else
1822                 return 0;
1823 }
1824 EXPORT_SYMBOL(kuc_ispayload);
1825
1826 /* Alloc space for a message, and fill in header
1827  * @return Pointer to payload area
1828  */
1829 void *kuc_alloc(int payload_len, int transport, int type)
1830 {
1831         struct kuc_hdr *lh;
1832         int len = kuc_len(payload_len);
1833
1834         OBD_ALLOC(lh, len);
1835         if (lh == NULL)
1836                 return ERR_PTR(-ENOMEM);
1837
1838         lh->kuc_magic = KUC_MAGIC;
1839         lh->kuc_transport = transport;
1840         lh->kuc_msgtype = type;
1841         lh->kuc_msglen = len;
1842
1843         return (void *)(lh + 1);
1844 }
1845 EXPORT_SYMBOL(kuc_alloc);
1846
1847 /* Takes pointer to payload area */
1848 inline void kuc_free(void *p, int payload_len)
1849 {
1850         struct kuc_hdr *lh = kuc_ptr(p);
1851         OBD_FREE(lh, kuc_len(payload_len));
1852 }
1853 EXPORT_SYMBOL(kuc_free);