Linux 6.12-rc1
[linux-block.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32
33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35         ssize_t ret = len;
36         int n;
37         int rc = kstrtoint(buf, 0, &n);
38
39         if (rc)
40                 return rc;
41         ls = dlm_find_lockspace_local(ls);
42         if (!ls)
43                 return -EINVAL;
44
45         switch (n) {
46         case 0:
47                 dlm_ls_stop(ls);
48                 break;
49         case 1:
50                 dlm_ls_start(ls);
51                 break;
52         default:
53                 ret = -EINVAL;
54         }
55         dlm_put_lockspace(ls);
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63         if (rc)
64                 return rc;
65         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66         wake_up(&ls->ls_uevent_wait);
67         return len;
68 }
69
70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74
75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79         if (rc)
80                 return rc;
81         return len;
82 }
83
84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88
89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91         int val;
92         int rc = kstrtoint(buf, 0, &val);
93
94         if (rc)
95                 return rc;
96         if (val == 1)
97                 set_bit(LSFL_NODIR, &ls->ls_flags);
98         return len;
99 }
100
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103         uint32_t status = dlm_recover_status(ls);
104         return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111
112 struct dlm_attr {
113         struct attribute attr;
114         ssize_t (*show)(struct dlm_ls *, char *);
115         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117
118 static struct dlm_attr dlm_attr_control = {
119         .attr  = {.name = "control", .mode = S_IWUSR},
120         .store = dlm_control_store
121 };
122
123 static struct dlm_attr dlm_attr_event = {
124         .attr  = {.name = "event_done", .mode = S_IWUSR},
125         .store = dlm_event_store
126 };
127
128 static struct dlm_attr dlm_attr_id = {
129         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130         .show  = dlm_id_show,
131         .store = dlm_id_store
132 };
133
134 static struct dlm_attr dlm_attr_nodir = {
135         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136         .show  = dlm_nodir_show,
137         .store = dlm_nodir_store
138 };
139
140 static struct dlm_attr dlm_attr_recover_status = {
141         .attr  = {.name = "recover_status", .mode = S_IRUGO},
142         .show  = dlm_recover_status_show
143 };
144
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147         .show  = dlm_recover_nodeid_show
148 };
149
150 static struct attribute *dlm_attrs[] = {
151         &dlm_attr_control.attr,
152         &dlm_attr_event.attr,
153         &dlm_attr_id.attr,
154         &dlm_attr_nodir.attr,
155         &dlm_attr_recover_status.attr,
156         &dlm_attr_recover_nodeid.attr,
157         NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162                              char *buf)
163 {
164         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166         return a->show ? a->show(ls, buf) : 0;
167 }
168
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170                               const char *buf, size_t len)
171 {
172         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174         return a->store ? a->store(ls, buf, len) : len;
175 }
176
177 static const struct sysfs_ops dlm_attr_ops = {
178         .show  = dlm_attr_show,
179         .store = dlm_attr_store,
180 };
181
182 static struct kobj_type dlm_ktype = {
183         .default_groups = dlm_groups,
184         .sysfs_ops     = &dlm_attr_ops,
185 };
186
187 static struct kset *dlm_kset;
188
189 static int do_uevent(struct dlm_ls *ls, int in)
190 {
191         if (in)
192                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193         else
194                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195
196         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197
198         /* dlm_controld will see the uevent, do the necessary group management
199            and then write to sysfs to wake us */
200
201         wait_event(ls->ls_uevent_wait,
202                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
203
204         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
205
206         return ls->ls_uevent_result;
207 }
208
209 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
210 {
211         const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212
213         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214         return 0;
215 }
216
217 static const struct kset_uevent_ops dlm_uevent_ops = {
218         .uevent = dlm_uevent,
219 };
220
221 int __init dlm_lockspace_init(void)
222 {
223         ls_count = 0;
224         mutex_init(&ls_lock);
225         INIT_LIST_HEAD(&lslist);
226         spin_lock_init(&lslist_lock);
227
228         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229         if (!dlm_kset) {
230                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
231                 return -ENOMEM;
232         }
233         return 0;
234 }
235
236 void dlm_lockspace_exit(void)
237 {
238         kset_unregister(dlm_kset);
239 }
240
241 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242 {
243         struct dlm_ls *ls;
244
245         spin_lock_bh(&lslist_lock);
246
247         list_for_each_entry(ls, &lslist, ls_list) {
248                 if (ls->ls_global_id == id) {
249                         atomic_inc(&ls->ls_count);
250                         goto out;
251                 }
252         }
253         ls = NULL;
254  out:
255         spin_unlock_bh(&lslist_lock);
256         return ls;
257 }
258
259 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260 {
261         struct dlm_ls *ls = lockspace;
262
263         atomic_inc(&ls->ls_count);
264         return ls;
265 }
266
267 struct dlm_ls *dlm_find_lockspace_device(int minor)
268 {
269         struct dlm_ls *ls;
270
271         spin_lock_bh(&lslist_lock);
272         list_for_each_entry(ls, &lslist, ls_list) {
273                 if (ls->ls_device.minor == minor) {
274                         atomic_inc(&ls->ls_count);
275                         goto out;
276                 }
277         }
278         ls = NULL;
279  out:
280         spin_unlock_bh(&lslist_lock);
281         return ls;
282 }
283
284 void dlm_put_lockspace(struct dlm_ls *ls)
285 {
286         if (atomic_dec_and_test(&ls->ls_count))
287                 wake_up(&ls->ls_count_wait);
288 }
289
290 static void remove_lockspace(struct dlm_ls *ls)
291 {
292 retry:
293         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294
295         spin_lock_bh(&lslist_lock);
296         if (atomic_read(&ls->ls_count) != 0) {
297                 spin_unlock_bh(&lslist_lock);
298                 goto retry;
299         }
300
301         WARN_ON(ls->ls_create_count != 0);
302         list_del(&ls->ls_list);
303         spin_unlock_bh(&lslist_lock);
304 }
305
306 static int threads_start(void)
307 {
308         int error;
309
310         /* Thread for sending/receiving messages for all lockspace's */
311         error = dlm_midcomms_start();
312         if (error)
313                 log_print("cannot start dlm midcomms %d", error);
314
315         return error;
316 }
317
318 static int lkb_idr_free(struct dlm_lkb *lkb)
319 {
320         if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321                 dlm_free_lvb(lkb->lkb_lvbptr);
322
323         dlm_free_lkb(lkb);
324         return 0;
325 }
326
327 static void rhash_free_rsb(void *ptr, void *arg)
328 {
329         struct dlm_rsb *rsb = ptr;
330
331         dlm_free_rsb(rsb);
332 }
333
334 static void free_lockspace(struct work_struct *work)
335 {
336         struct dlm_ls *ls  = container_of(work, struct dlm_ls, ls_free_work);
337         struct dlm_lkb *lkb;
338         unsigned long id;
339
340         /*
341          * Free all lkb's in xa
342          */
343         xa_for_each(&ls->ls_lkbxa, id, lkb) {
344                 lkb_idr_free(lkb);
345         }
346         xa_destroy(&ls->ls_lkbxa);
347
348         /*
349          * Free all rsb's on rsbtbl
350          */
351         rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352
353         kfree(ls);
354 }
355
356 static int new_lockspace(const char *name, const char *cluster,
357                          uint32_t flags, int lvblen,
358                          const struct dlm_lockspace_ops *ops, void *ops_arg,
359                          int *ops_result, dlm_lockspace_t **lockspace)
360 {
361         struct dlm_ls *ls;
362         int namelen = strlen(name);
363         int error;
364
365         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366                 return -EINVAL;
367
368         if (lvblen % 8)
369                 return -EINVAL;
370
371         if (!try_module_get(THIS_MODULE))
372                 return -EINVAL;
373
374         if (!dlm_user_daemon_available()) {
375                 log_print("dlm user daemon not available");
376                 error = -EUNATCH;
377                 goto out;
378         }
379
380         if (ops && ops_result) {
381                 if (!dlm_config.ci_recover_callbacks)
382                         *ops_result = -EOPNOTSUPP;
383                 else
384                         *ops_result = 0;
385         }
386
387         if (!cluster)
388                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389                           dlm_config.ci_cluster_name);
390
391         if (dlm_config.ci_recover_callbacks && cluster &&
392             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393                 log_print("dlm cluster name '%s' does not match "
394                           "the application cluster name '%s'",
395                           dlm_config.ci_cluster_name, cluster);
396                 error = -EBADR;
397                 goto out;
398         }
399
400         error = 0;
401
402         spin_lock_bh(&lslist_lock);
403         list_for_each_entry(ls, &lslist, ls_list) {
404                 WARN_ON(ls->ls_create_count <= 0);
405                 if (ls->ls_namelen != namelen)
406                         continue;
407                 if (memcmp(ls->ls_name, name, namelen))
408                         continue;
409                 if (flags & DLM_LSFL_NEWEXCL) {
410                         error = -EEXIST;
411                         break;
412                 }
413                 ls->ls_create_count++;
414                 *lockspace = ls;
415                 error = 1;
416                 break;
417         }
418         spin_unlock_bh(&lslist_lock);
419
420         if (error)
421                 goto out;
422
423         error = -ENOMEM;
424
425         ls = kzalloc(sizeof(*ls), GFP_NOFS);
426         if (!ls)
427                 goto out;
428         memcpy(ls->ls_name, name, namelen);
429         ls->ls_namelen = namelen;
430         ls->ls_lvblen = lvblen;
431         atomic_set(&ls->ls_count, 0);
432         init_waitqueue_head(&ls->ls_count_wait);
433         ls->ls_flags = 0;
434
435         if (ops && dlm_config.ci_recover_callbacks) {
436                 ls->ls_ops = ops;
437                 ls->ls_ops_arg = ops_arg;
438         }
439
440         if (flags & DLM_LSFL_SOFTIRQ)
441                 set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442
443         /* ls_exflags are forced to match among nodes, and we don't
444          * need to require all nodes to have some flags set
445          */
446         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447                                     DLM_LSFL_SOFTIRQ));
448
449         INIT_LIST_HEAD(&ls->ls_slow_inactive);
450         INIT_LIST_HEAD(&ls->ls_slow_active);
451         rwlock_init(&ls->ls_rsbtbl_lock);
452
453         error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454         if (error)
455                 goto out_lsfree;
456
457         xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458         rwlock_init(&ls->ls_lkbxa_lock);
459
460         INIT_LIST_HEAD(&ls->ls_waiters);
461         spin_lock_init(&ls->ls_waiters_lock);
462         INIT_LIST_HEAD(&ls->ls_orphans);
463         spin_lock_init(&ls->ls_orphans_lock);
464
465         INIT_LIST_HEAD(&ls->ls_nodes);
466         INIT_LIST_HEAD(&ls->ls_nodes_gone);
467         ls->ls_num_nodes = 0;
468         ls->ls_low_nodeid = 0;
469         ls->ls_total_weight = 0;
470         ls->ls_node_array = NULL;
471
472         memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473         ls->ls_local_rsb.res_ls = ls;
474
475         ls->ls_debug_rsb_dentry = NULL;
476         ls->ls_debug_waiters_dentry = NULL;
477
478         init_waitqueue_head(&ls->ls_uevent_wait);
479         ls->ls_uevent_result = 0;
480         init_completion(&ls->ls_recovery_done);
481         ls->ls_recovery_result = -1;
482
483         spin_lock_init(&ls->ls_cb_lock);
484         INIT_LIST_HEAD(&ls->ls_cb_delay);
485
486         INIT_WORK(&ls->ls_free_work, free_lockspace);
487
488         ls->ls_recoverd_task = NULL;
489         mutex_init(&ls->ls_recoverd_active);
490         spin_lock_init(&ls->ls_recover_lock);
491         spin_lock_init(&ls->ls_rcom_spin);
492         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493         ls->ls_recover_status = 0;
494         ls->ls_recover_seq = get_random_u64();
495         ls->ls_recover_args = NULL;
496         init_rwsem(&ls->ls_in_recovery);
497         rwlock_init(&ls->ls_recv_active);
498         INIT_LIST_HEAD(&ls->ls_requestqueue);
499         rwlock_init(&ls->ls_requestqueue_lock);
500         spin_lock_init(&ls->ls_clear_proc_locks);
501
502         /* Due backwards compatibility with 3.1 we need to use maximum
503          * possible dlm message size to be sure the message will fit and
504          * not having out of bounds issues. However on sending side 3.2
505          * might send less.
506          */
507         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508         if (!ls->ls_recover_buf) {
509                 error = -ENOMEM;
510                 goto out_lkbxa;
511         }
512
513         ls->ls_slot = 0;
514         ls->ls_num_slots = 0;
515         ls->ls_slots_size = 0;
516         ls->ls_slots = NULL;
517
518         INIT_LIST_HEAD(&ls->ls_recover_list);
519         spin_lock_init(&ls->ls_recover_list_lock);
520         xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521         spin_lock_init(&ls->ls_recover_xa_lock);
522         ls->ls_recover_list_count = 0;
523         init_waitqueue_head(&ls->ls_wait_general);
524         INIT_LIST_HEAD(&ls->ls_masters_list);
525         rwlock_init(&ls->ls_masters_lock);
526         INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527         rwlock_init(&ls->ls_dir_dump_lock);
528
529         INIT_LIST_HEAD(&ls->ls_scan_list);
530         spin_lock_init(&ls->ls_scan_lock);
531         timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532
533         spin_lock_bh(&lslist_lock);
534         ls->ls_create_count = 1;
535         list_add(&ls->ls_list, &lslist);
536         spin_unlock_bh(&lslist_lock);
537
538         if (flags & DLM_LSFL_FS)
539                 set_bit(LSFL_FS, &ls->ls_flags);
540
541         error = dlm_callback_start(ls);
542         if (error) {
543                 log_error(ls, "can't start dlm_callback %d", error);
544                 goto out_delist;
545         }
546
547         init_waitqueue_head(&ls->ls_recover_lock_wait);
548
549         /*
550          * Once started, dlm_recoverd first looks for ls in lslist, then
551          * initializes ls_in_recovery as locked in "down" mode.  We need
552          * to wait for the wakeup from dlm_recoverd because in_recovery
553          * has to start out in down mode.
554          */
555
556         error = dlm_recoverd_start(ls);
557         if (error) {
558                 log_error(ls, "can't start dlm_recoverd %d", error);
559                 goto out_callback;
560         }
561
562         wait_event(ls->ls_recover_lock_wait,
563                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564
565         ls->ls_kobj.kset = dlm_kset;
566         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567                                      "%s", ls->ls_name);
568         if (error)
569                 goto out_recoverd;
570         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571
572         /* This uevent triggers dlm_controld in userspace to add us to the
573            group of nodes that are members of this lockspace (managed by the
574            cluster infrastructure.)  Once it's done that, it tells us who the
575            current lockspace members are (via configfs) and then tells the
576            lockspace to start running (via sysfs) in dlm_ls_start(). */
577
578         error = do_uevent(ls, 1);
579         if (error)
580                 goto out_recoverd;
581
582         /* wait until recovery is successful or failed */
583         wait_for_completion(&ls->ls_recovery_done);
584         error = ls->ls_recovery_result;
585         if (error)
586                 goto out_members;
587
588         dlm_create_debug_file(ls);
589
590         log_rinfo(ls, "join complete");
591         *lockspace = ls;
592         return 0;
593
594  out_members:
595         do_uevent(ls, 0);
596         dlm_clear_members(ls);
597         kfree(ls->ls_node_array);
598  out_recoverd:
599         dlm_recoverd_stop(ls);
600  out_callback:
601         dlm_callback_stop(ls);
602  out_delist:
603         spin_lock_bh(&lslist_lock);
604         list_del(&ls->ls_list);
605         spin_unlock_bh(&lslist_lock);
606         xa_destroy(&ls->ls_recover_xa);
607         kfree(ls->ls_recover_buf);
608  out_lkbxa:
609         xa_destroy(&ls->ls_lkbxa);
610         rhashtable_destroy(&ls->ls_rsbtbl);
611  out_lsfree:
612         kobject_put(&ls->ls_kobj);
613         kfree(ls);
614  out:
615         module_put(THIS_MODULE);
616         return error;
617 }
618
619 static int __dlm_new_lockspace(const char *name, const char *cluster,
620                                uint32_t flags, int lvblen,
621                                const struct dlm_lockspace_ops *ops,
622                                void *ops_arg, int *ops_result,
623                                dlm_lockspace_t **lockspace)
624 {
625         int error = 0;
626
627         mutex_lock(&ls_lock);
628         if (!ls_count)
629                 error = threads_start();
630         if (error)
631                 goto out;
632
633         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634                               ops_result, lockspace);
635         if (!error)
636                 ls_count++;
637         if (error > 0)
638                 error = 0;
639         if (!ls_count) {
640                 dlm_midcomms_shutdown();
641                 dlm_midcomms_stop();
642         }
643  out:
644         mutex_unlock(&ls_lock);
645         return error;
646 }
647
648 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649                       int lvblen, const struct dlm_lockspace_ops *ops,
650                       void *ops_arg, int *ops_result,
651                       dlm_lockspace_t **lockspace)
652 {
653         return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654                                    ops, ops_arg, ops_result, lockspace);
655 }
656
657 int dlm_new_user_lockspace(const char *name, const char *cluster,
658                            uint32_t flags, int lvblen,
659                            const struct dlm_lockspace_ops *ops,
660                            void *ops_arg, int *ops_result,
661                            dlm_lockspace_t **lockspace)
662 {
663         if (flags & DLM_LSFL_SOFTIRQ)
664                 return -EINVAL;
665
666         return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667                                    ops_arg, ops_result, lockspace);
668 }
669
670 /* NOTE: We check the lkbxa here rather than the resource table.
671    This is because there may be LKBs queued as ASTs that have been unlinked
672    from their RSBs and are pending deletion once the AST has been delivered */
673
674 static int lockspace_busy(struct dlm_ls *ls, int force)
675 {
676         struct dlm_lkb *lkb;
677         unsigned long id;
678         int rv = 0;
679
680         read_lock_bh(&ls->ls_lkbxa_lock);
681         if (force == 0) {
682                 xa_for_each(&ls->ls_lkbxa, id, lkb) {
683                         rv = 1;
684                         break;
685                 }
686         } else if (force == 1) {
687                 xa_for_each(&ls->ls_lkbxa, id, lkb) {
688                         if (lkb->lkb_nodeid == 0 &&
689                             lkb->lkb_grmode != DLM_LOCK_IV) {
690                                 rv = 1;
691                                 break;
692                         }
693                 }
694         } else {
695                 rv = 0;
696         }
697         read_unlock_bh(&ls->ls_lkbxa_lock);
698         return rv;
699 }
700
701 static int release_lockspace(struct dlm_ls *ls, int force)
702 {
703         int busy, rv;
704
705         busy = lockspace_busy(ls, force);
706
707         spin_lock_bh(&lslist_lock);
708         if (ls->ls_create_count == 1) {
709                 if (busy) {
710                         rv = -EBUSY;
711                 } else {
712                         /* remove_lockspace takes ls off lslist */
713                         ls->ls_create_count = 0;
714                         rv = 0;
715                 }
716         } else if (ls->ls_create_count > 1) {
717                 rv = --ls->ls_create_count;
718         } else {
719                 rv = -EINVAL;
720         }
721         spin_unlock_bh(&lslist_lock);
722
723         if (rv) {
724                 log_debug(ls, "release_lockspace no remove %d", rv);
725                 return rv;
726         }
727
728         if (ls_count == 1)
729                 dlm_midcomms_version_wait();
730
731         dlm_device_deregister(ls);
732
733         if (force < 3 && dlm_user_daemon_available())
734                 do_uevent(ls, 0);
735
736         dlm_recoverd_stop(ls);
737
738         /* clear the LSFL_RUNNING flag to fast up
739          * time_shutdown_sync(), we don't care anymore
740          */
741         clear_bit(LSFL_RUNNING, &ls->ls_flags);
742         timer_shutdown_sync(&ls->ls_scan_timer);
743
744         if (ls_count == 1) {
745                 dlm_clear_members(ls);
746                 dlm_midcomms_shutdown();
747         }
748
749         dlm_callback_stop(ls);
750
751         remove_lockspace(ls);
752
753         dlm_delete_debug_file(ls);
754
755         kobject_put(&ls->ls_kobj);
756
757         xa_destroy(&ls->ls_recover_xa);
758         kfree(ls->ls_recover_buf);
759
760         /*
761          * Free structures on any other lists
762          */
763
764         dlm_purge_requestqueue(ls);
765         kfree(ls->ls_recover_args);
766         dlm_clear_members(ls);
767         dlm_clear_members_gone(ls);
768         kfree(ls->ls_node_array);
769
770         log_rinfo(ls, "%s final free", __func__);
771
772         /* delayed free of data structures see free_lockspace() */
773         queue_work(dlm_wq, &ls->ls_free_work);
774         module_put(THIS_MODULE);
775         return 0;
776 }
777
778 /*
779  * Called when a system has released all its locks and is not going to use the
780  * lockspace any longer.  We free everything we're managing for this lockspace.
781  * Remaining nodes will go through the recovery process as if we'd died.  The
782  * lockspace must continue to function as usual, participating in recoveries,
783  * until this returns.
784  *
785  * Force has 4 possible values:
786  * 0 - don't destroy lockspace if it has any LKBs
787  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788  * 2 - destroy lockspace regardless of LKBs
789  * 3 - destroy lockspace as part of a forced shutdown
790  */
791
792 int dlm_release_lockspace(void *lockspace, int force)
793 {
794         struct dlm_ls *ls;
795         int error;
796
797         ls = dlm_find_lockspace_local(lockspace);
798         if (!ls)
799                 return -EINVAL;
800         dlm_put_lockspace(ls);
801
802         mutex_lock(&ls_lock);
803         error = release_lockspace(ls, force);
804         if (!error)
805                 ls_count--;
806         if (!ls_count)
807                 dlm_midcomms_stop();
808         mutex_unlock(&ls_lock);
809
810         return error;
811 }
812
813 void dlm_stop_lockspaces(void)
814 {
815         struct dlm_ls *ls;
816         int count;
817
818  restart:
819         count = 0;
820         spin_lock_bh(&lslist_lock);
821         list_for_each_entry(ls, &lslist, ls_list) {
822                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823                         count++;
824                         continue;
825                 }
826                 spin_unlock_bh(&lslist_lock);
827                 log_error(ls, "no userland control daemon, stopping lockspace");
828                 dlm_ls_stop(ls);
829                 goto restart;
830         }
831         spin_unlock_bh(&lslist_lock);
832
833         if (count)
834                 log_print("dlm user daemon left %d lockspaces", count);
835 }