Merge tag 'nfsd-6.0-1' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux
[linux-block.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 #include "ast.h"
28
29 static int                      ls_count;
30 static struct mutex             ls_lock;
31 static struct list_head         lslist;
32 static spinlock_t               lslist_lock;
33 static struct task_struct *     scand_task;
34
35
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 {
38         ssize_t ret = len;
39         int n;
40         int rc = kstrtoint(buf, 0, &n);
41
42         if (rc)
43                 return rc;
44         ls = dlm_find_lockspace_local(ls->ls_local_handle);
45         if (!ls)
46                 return -EINVAL;
47
48         switch (n) {
49         case 0:
50                 dlm_ls_stop(ls);
51                 break;
52         case 1:
53                 dlm_ls_start(ls);
54                 break;
55         default:
56                 ret = -EINVAL;
57         }
58         dlm_put_lockspace(ls);
59         return ret;
60 }
61
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 {
64         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65
66         if (rc)
67                 return rc;
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
81
82         if (rc)
83                 return rc;
84         return len;
85 }
86
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 {
89         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
90 }
91
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
93 {
94         int val;
95         int rc = kstrtoint(buf, 0, &val);
96
97         if (rc)
98                 return rc;
99         if (val == 1)
100                 set_bit(LSFL_NODIR, &ls->ls_flags);
101         return len;
102 }
103
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 {
106         uint32_t status = dlm_recover_status(ls);
107         return snprintf(buf, PAGE_SIZE, "%x\n", status);
108 }
109
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 {
112         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 }
114
115 struct dlm_attr {
116         struct attribute attr;
117         ssize_t (*show)(struct dlm_ls *, char *);
118         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119 };
120
121 static struct dlm_attr dlm_attr_control = {
122         .attr  = {.name = "control", .mode = S_IWUSR},
123         .store = dlm_control_store
124 };
125
126 static struct dlm_attr dlm_attr_event = {
127         .attr  = {.name = "event_done", .mode = S_IWUSR},
128         .store = dlm_event_store
129 };
130
131 static struct dlm_attr dlm_attr_id = {
132         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133         .show  = dlm_id_show,
134         .store = dlm_id_store
135 };
136
137 static struct dlm_attr dlm_attr_nodir = {
138         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139         .show  = dlm_nodir_show,
140         .store = dlm_nodir_store
141 };
142
143 static struct dlm_attr dlm_attr_recover_status = {
144         .attr  = {.name = "recover_status", .mode = S_IRUGO},
145         .show  = dlm_recover_status_show
146 };
147
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150         .show  = dlm_recover_nodeid_show
151 };
152
153 static struct attribute *dlm_attrs[] = {
154         &dlm_attr_control.attr,
155         &dlm_attr_event.attr,
156         &dlm_attr_id.attr,
157         &dlm_attr_nodir.attr,
158         &dlm_attr_recover_status.attr,
159         &dlm_attr_recover_nodeid.attr,
160         NULL,
161 };
162 ATTRIBUTE_GROUPS(dlm);
163
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190
191 static struct kobj_type dlm_ktype = {
192         .default_groups = dlm_groups,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196
197 static struct kset *dlm_kset;
198
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         if (in)
202                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203         else
204                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208         /* dlm_controld will see the uevent, do the necessary group management
209            and then write to sysfs to wake us */
210
211         wait_event(ls->ls_uevent_wait,
212                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215
216         return ls->ls_uevent_result;
217 }
218
219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
220 {
221         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222
223         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224         return 0;
225 }
226
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228         .uevent = dlm_uevent,
229 };
230
231 int __init dlm_lockspace_init(void)
232 {
233         ls_count = 0;
234         mutex_init(&ls_lock);
235         INIT_LIST_HEAD(&lslist);
236         spin_lock_init(&lslist_lock);
237
238         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239         if (!dlm_kset) {
240                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
241                 return -ENOMEM;
242         }
243         return 0;
244 }
245
246 void dlm_lockspace_exit(void)
247 {
248         kset_unregister(dlm_kset);
249 }
250
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253         struct dlm_ls *ls;
254
255         spin_lock(&lslist_lock);
256         list_for_each_entry(ls, &lslist, ls_list) {
257                 if (time_after_eq(jiffies, ls->ls_scan_time +
258                                             dlm_config.ci_scan_secs * HZ)) {
259                         spin_unlock(&lslist_lock);
260                         return ls;
261                 }
262         }
263         spin_unlock(&lslist_lock);
264         return NULL;
265 }
266
267 static int dlm_scand(void *data)
268 {
269         struct dlm_ls *ls;
270
271         while (!kthread_should_stop()) {
272                 ls = find_ls_to_scan();
273                 if (ls) {
274                         if (dlm_lock_recovery_try(ls)) {
275                                 ls->ls_scan_time = jiffies;
276                                 dlm_scan_rsbs(ls);
277                                 dlm_scan_timeout(ls);
278                                 dlm_unlock_recovery(ls);
279                         } else {
280                                 ls->ls_scan_time += HZ;
281                         }
282                         continue;
283                 }
284                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
285         }
286         return 0;
287 }
288
289 static int dlm_scand_start(void)
290 {
291         struct task_struct *p;
292         int error = 0;
293
294         p = kthread_run(dlm_scand, NULL, "dlm_scand");
295         if (IS_ERR(p))
296                 error = PTR_ERR(p);
297         else
298                 scand_task = p;
299         return error;
300 }
301
302 static void dlm_scand_stop(void)
303 {
304         kthread_stop(scand_task);
305 }
306
307 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
308 {
309         struct dlm_ls *ls;
310
311         spin_lock(&lslist_lock);
312
313         list_for_each_entry(ls, &lslist, ls_list) {
314                 if (ls->ls_global_id == id) {
315                         atomic_inc(&ls->ls_count);
316                         goto out;
317                 }
318         }
319         ls = NULL;
320  out:
321         spin_unlock(&lslist_lock);
322         return ls;
323 }
324
325 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
326 {
327         struct dlm_ls *ls;
328
329         spin_lock(&lslist_lock);
330         list_for_each_entry(ls, &lslist, ls_list) {
331                 if (ls->ls_local_handle == lockspace) {
332                         atomic_inc(&ls->ls_count);
333                         goto out;
334                 }
335         }
336         ls = NULL;
337  out:
338         spin_unlock(&lslist_lock);
339         return ls;
340 }
341
342 struct dlm_ls *dlm_find_lockspace_device(int minor)
343 {
344         struct dlm_ls *ls;
345
346         spin_lock(&lslist_lock);
347         list_for_each_entry(ls, &lslist, ls_list) {
348                 if (ls->ls_device.minor == minor) {
349                         atomic_inc(&ls->ls_count);
350                         goto out;
351                 }
352         }
353         ls = NULL;
354  out:
355         spin_unlock(&lslist_lock);
356         return ls;
357 }
358
359 void dlm_put_lockspace(struct dlm_ls *ls)
360 {
361         if (atomic_dec_and_test(&ls->ls_count))
362                 wake_up(&ls->ls_count_wait);
363 }
364
365 static void remove_lockspace(struct dlm_ls *ls)
366 {
367 retry:
368         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
369
370         spin_lock(&lslist_lock);
371         if (atomic_read(&ls->ls_count) != 0) {
372                 spin_unlock(&lslist_lock);
373                 goto retry;
374         }
375
376         WARN_ON(ls->ls_create_count != 0);
377         list_del(&ls->ls_list);
378         spin_unlock(&lslist_lock);
379 }
380
381 static int threads_start(void)
382 {
383         int error;
384
385         error = dlm_scand_start();
386         if (error) {
387                 log_print("cannot start dlm_scand thread %d", error);
388                 goto fail;
389         }
390
391         /* Thread for sending/receiving messages for all lockspace's */
392         error = dlm_midcomms_start();
393         if (error) {
394                 log_print("cannot start dlm lowcomms %d", error);
395                 goto scand_fail;
396         }
397
398         return 0;
399
400  scand_fail:
401         dlm_scand_stop();
402  fail:
403         return error;
404 }
405
406 static int new_lockspace(const char *name, const char *cluster,
407                          uint32_t flags, int lvblen,
408                          const struct dlm_lockspace_ops *ops, void *ops_arg,
409                          int *ops_result, dlm_lockspace_t **lockspace)
410 {
411         struct dlm_ls *ls;
412         int i, size, error;
413         int do_unreg = 0;
414         int namelen = strlen(name);
415
416         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
417                 return -EINVAL;
418
419         if (!lvblen || (lvblen % 8))
420                 return -EINVAL;
421
422         if (!try_module_get(THIS_MODULE))
423                 return -EINVAL;
424
425         if (!dlm_user_daemon_available()) {
426                 log_print("dlm user daemon not available");
427                 error = -EUNATCH;
428                 goto out;
429         }
430
431         if (ops && ops_result) {
432                 if (!dlm_config.ci_recover_callbacks)
433                         *ops_result = -EOPNOTSUPP;
434                 else
435                         *ops_result = 0;
436         }
437
438         if (!cluster)
439                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
440                           dlm_config.ci_cluster_name);
441
442         if (dlm_config.ci_recover_callbacks && cluster &&
443             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
444                 log_print("dlm cluster name '%s' does not match "
445                           "the application cluster name '%s'",
446                           dlm_config.ci_cluster_name, cluster);
447                 error = -EBADR;
448                 goto out;
449         }
450
451         error = 0;
452
453         spin_lock(&lslist_lock);
454         list_for_each_entry(ls, &lslist, ls_list) {
455                 WARN_ON(ls->ls_create_count <= 0);
456                 if (ls->ls_namelen != namelen)
457                         continue;
458                 if (memcmp(ls->ls_name, name, namelen))
459                         continue;
460                 if (flags & DLM_LSFL_NEWEXCL) {
461                         error = -EEXIST;
462                         break;
463                 }
464                 ls->ls_create_count++;
465                 *lockspace = ls;
466                 error = 1;
467                 break;
468         }
469         spin_unlock(&lslist_lock);
470
471         if (error)
472                 goto out;
473
474         error = -ENOMEM;
475
476         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
477         if (!ls)
478                 goto out;
479         memcpy(ls->ls_name, name, namelen);
480         ls->ls_namelen = namelen;
481         ls->ls_lvblen = lvblen;
482         atomic_set(&ls->ls_count, 0);
483         init_waitqueue_head(&ls->ls_count_wait);
484         ls->ls_flags = 0;
485         ls->ls_scan_time = jiffies;
486
487         if (ops && dlm_config.ci_recover_callbacks) {
488                 ls->ls_ops = ops;
489                 ls->ls_ops_arg = ops_arg;
490         }
491
492 #ifdef CONFIG_DLM_DEPRECATED_API
493         if (flags & DLM_LSFL_TIMEWARN) {
494                 pr_warn_once("===============================================================\n"
495                              "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
496                              "         will be removed in v6.2!\n"
497                              "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
498                              "===============================================================\n");
499
500                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
501         }
502
503         /* ls_exflags are forced to match among nodes, and we don't
504          * need to require all nodes to have some flags set
505          */
506         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
507                                     DLM_LSFL_NEWEXCL));
508 #else
509         /* ls_exflags are forced to match among nodes, and we don't
510          * need to require all nodes to have some flags set
511          */
512         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
513 #endif
514
515         size = READ_ONCE(dlm_config.ci_rsbtbl_size);
516         ls->ls_rsbtbl_size = size;
517
518         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
519         if (!ls->ls_rsbtbl)
520                 goto out_lsfree;
521         for (i = 0; i < size; i++) {
522                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
523                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
524                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
525         }
526
527         spin_lock_init(&ls->ls_remove_spin);
528         init_waitqueue_head(&ls->ls_remove_wait);
529
530         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
531                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
532                                                  GFP_KERNEL);
533                 if (!ls->ls_remove_names[i])
534                         goto out_rsbtbl;
535         }
536
537         idr_init(&ls->ls_lkbidr);
538         spin_lock_init(&ls->ls_lkbidr_spin);
539
540         INIT_LIST_HEAD(&ls->ls_waiters);
541         mutex_init(&ls->ls_waiters_mutex);
542         INIT_LIST_HEAD(&ls->ls_orphans);
543         mutex_init(&ls->ls_orphans_mutex);
544 #ifdef CONFIG_DLM_DEPRECATED_API
545         INIT_LIST_HEAD(&ls->ls_timeout);
546         mutex_init(&ls->ls_timeout_mutex);
547 #endif
548
549         INIT_LIST_HEAD(&ls->ls_new_rsb);
550         spin_lock_init(&ls->ls_new_rsb_spin);
551
552         INIT_LIST_HEAD(&ls->ls_nodes);
553         INIT_LIST_HEAD(&ls->ls_nodes_gone);
554         ls->ls_num_nodes = 0;
555         ls->ls_low_nodeid = 0;
556         ls->ls_total_weight = 0;
557         ls->ls_node_array = NULL;
558
559         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
560         ls->ls_stub_rsb.res_ls = ls;
561
562         ls->ls_debug_rsb_dentry = NULL;
563         ls->ls_debug_waiters_dentry = NULL;
564
565         init_waitqueue_head(&ls->ls_uevent_wait);
566         ls->ls_uevent_result = 0;
567         init_completion(&ls->ls_recovery_done);
568         ls->ls_recovery_result = -1;
569
570         mutex_init(&ls->ls_cb_mutex);
571         INIT_LIST_HEAD(&ls->ls_cb_delay);
572
573         ls->ls_recoverd_task = NULL;
574         mutex_init(&ls->ls_recoverd_active);
575         spin_lock_init(&ls->ls_recover_lock);
576         spin_lock_init(&ls->ls_rcom_spin);
577         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
578         ls->ls_recover_status = 0;
579         ls->ls_recover_seq = 0;
580         ls->ls_recover_args = NULL;
581         init_rwsem(&ls->ls_in_recovery);
582         init_rwsem(&ls->ls_recv_active);
583         INIT_LIST_HEAD(&ls->ls_requestqueue);
584         atomic_set(&ls->ls_requestqueue_cnt, 0);
585         init_waitqueue_head(&ls->ls_requestqueue_wait);
586         mutex_init(&ls->ls_requestqueue_mutex);
587         mutex_init(&ls->ls_clear_proc_locks);
588
589         /* Due backwards compatibility with 3.1 we need to use maximum
590          * possible dlm message size to be sure the message will fit and
591          * not having out of bounds issues. However on sending side 3.2
592          * might send less.
593          */
594         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
595         if (!ls->ls_recover_buf)
596                 goto out_lkbidr;
597
598         ls->ls_slot = 0;
599         ls->ls_num_slots = 0;
600         ls->ls_slots_size = 0;
601         ls->ls_slots = NULL;
602
603         INIT_LIST_HEAD(&ls->ls_recover_list);
604         spin_lock_init(&ls->ls_recover_list_lock);
605         idr_init(&ls->ls_recover_idr);
606         spin_lock_init(&ls->ls_recover_idr_lock);
607         ls->ls_recover_list_count = 0;
608         ls->ls_local_handle = ls;
609         init_waitqueue_head(&ls->ls_wait_general);
610         INIT_LIST_HEAD(&ls->ls_root_list);
611         init_rwsem(&ls->ls_root_sem);
612
613         spin_lock(&lslist_lock);
614         ls->ls_create_count = 1;
615         list_add(&ls->ls_list, &lslist);
616         spin_unlock(&lslist_lock);
617
618         if (flags & DLM_LSFL_FS) {
619                 error = dlm_callback_start(ls);
620                 if (error) {
621                         log_error(ls, "can't start dlm_callback %d", error);
622                         goto out_delist;
623                 }
624         }
625
626         init_waitqueue_head(&ls->ls_recover_lock_wait);
627
628         /*
629          * Once started, dlm_recoverd first looks for ls in lslist, then
630          * initializes ls_in_recovery as locked in "down" mode.  We need
631          * to wait for the wakeup from dlm_recoverd because in_recovery
632          * has to start out in down mode.
633          */
634
635         error = dlm_recoverd_start(ls);
636         if (error) {
637                 log_error(ls, "can't start dlm_recoverd %d", error);
638                 goto out_callback;
639         }
640
641         wait_event(ls->ls_recover_lock_wait,
642                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
643
644         /* let kobject handle freeing of ls if there's an error */
645         do_unreg = 1;
646
647         ls->ls_kobj.kset = dlm_kset;
648         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
649                                      "%s", ls->ls_name);
650         if (error)
651                 goto out_recoverd;
652         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
653
654         /* This uevent triggers dlm_controld in userspace to add us to the
655            group of nodes that are members of this lockspace (managed by the
656            cluster infrastructure.)  Once it's done that, it tells us who the
657            current lockspace members are (via configfs) and then tells the
658            lockspace to start running (via sysfs) in dlm_ls_start(). */
659
660         error = do_uevent(ls, 1);
661         if (error)
662                 goto out_recoverd;
663
664         /* wait until recovery is successful or failed */
665         wait_for_completion(&ls->ls_recovery_done);
666         error = ls->ls_recovery_result;
667         if (error)
668                 goto out_members;
669
670         dlm_create_debug_file(ls);
671
672         log_rinfo(ls, "join complete");
673         *lockspace = ls;
674         return 0;
675
676  out_members:
677         do_uevent(ls, 0);
678         dlm_clear_members(ls);
679         kfree(ls->ls_node_array);
680  out_recoverd:
681         dlm_recoverd_stop(ls);
682  out_callback:
683         dlm_callback_stop(ls);
684  out_delist:
685         spin_lock(&lslist_lock);
686         list_del(&ls->ls_list);
687         spin_unlock(&lslist_lock);
688         idr_destroy(&ls->ls_recover_idr);
689         kfree(ls->ls_recover_buf);
690  out_lkbidr:
691         idr_destroy(&ls->ls_lkbidr);
692  out_rsbtbl:
693         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
694                 kfree(ls->ls_remove_names[i]);
695         vfree(ls->ls_rsbtbl);
696  out_lsfree:
697         if (do_unreg)
698                 kobject_put(&ls->ls_kobj);
699         else
700                 kfree(ls);
701  out:
702         module_put(THIS_MODULE);
703         return error;
704 }
705
706 int dlm_new_lockspace(const char *name, const char *cluster,
707                       uint32_t flags, int lvblen,
708                       const struct dlm_lockspace_ops *ops, void *ops_arg,
709                       int *ops_result, dlm_lockspace_t **lockspace)
710 {
711         int error = 0;
712
713         mutex_lock(&ls_lock);
714         if (!ls_count)
715                 error = threads_start();
716         if (error)
717                 goto out;
718
719         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
720                               ops_result, lockspace);
721         if (!error)
722                 ls_count++;
723         if (error > 0)
724                 error = 0;
725         if (!ls_count) {
726                 dlm_scand_stop();
727                 dlm_midcomms_shutdown();
728                 dlm_lowcomms_stop();
729         }
730  out:
731         mutex_unlock(&ls_lock);
732         return error;
733 }
734
735 static int lkb_idr_is_local(int id, void *p, void *data)
736 {
737         struct dlm_lkb *lkb = p;
738
739         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
740 }
741
742 static int lkb_idr_is_any(int id, void *p, void *data)
743 {
744         return 1;
745 }
746
747 static int lkb_idr_free(int id, void *p, void *data)
748 {
749         struct dlm_lkb *lkb = p;
750
751         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
752                 dlm_free_lvb(lkb->lkb_lvbptr);
753
754         dlm_free_lkb(lkb);
755         return 0;
756 }
757
758 /* NOTE: We check the lkbidr here rather than the resource table.
759    This is because there may be LKBs queued as ASTs that have been unlinked
760    from their RSBs and are pending deletion once the AST has been delivered */
761
762 static int lockspace_busy(struct dlm_ls *ls, int force)
763 {
764         int rv;
765
766         spin_lock(&ls->ls_lkbidr_spin);
767         if (force == 0) {
768                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
769         } else if (force == 1) {
770                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
771         } else {
772                 rv = 0;
773         }
774         spin_unlock(&ls->ls_lkbidr_spin);
775         return rv;
776 }
777
778 static int release_lockspace(struct dlm_ls *ls, int force)
779 {
780         struct dlm_rsb *rsb;
781         struct rb_node *n;
782         int i, busy, rv;
783
784         busy = lockspace_busy(ls, force);
785
786         spin_lock(&lslist_lock);
787         if (ls->ls_create_count == 1) {
788                 if (busy) {
789                         rv = -EBUSY;
790                 } else {
791                         /* remove_lockspace takes ls off lslist */
792                         ls->ls_create_count = 0;
793                         rv = 0;
794                 }
795         } else if (ls->ls_create_count > 1) {
796                 rv = --ls->ls_create_count;
797         } else {
798                 rv = -EINVAL;
799         }
800         spin_unlock(&lslist_lock);
801
802         if (rv) {
803                 log_debug(ls, "release_lockspace no remove %d", rv);
804                 return rv;
805         }
806
807         dlm_device_deregister(ls);
808
809         if (force < 3 && dlm_user_daemon_available())
810                 do_uevent(ls, 0);
811
812         dlm_recoverd_stop(ls);
813
814         if (ls_count == 1) {
815                 dlm_scand_stop();
816                 dlm_clear_members(ls);
817                 dlm_midcomms_shutdown();
818         }
819
820         dlm_callback_stop(ls);
821
822         remove_lockspace(ls);
823
824         dlm_delete_debug_file(ls);
825
826         idr_destroy(&ls->ls_recover_idr);
827         kfree(ls->ls_recover_buf);
828
829         /*
830          * Free all lkb's in idr
831          */
832
833         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
834         idr_destroy(&ls->ls_lkbidr);
835
836         /*
837          * Free all rsb's on rsbtbl[] lists
838          */
839
840         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
841                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
842                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
843                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
844                         dlm_free_rsb(rsb);
845                 }
846
847                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
848                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
849                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
850                         dlm_free_rsb(rsb);
851                 }
852         }
853
854         vfree(ls->ls_rsbtbl);
855
856         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
857                 kfree(ls->ls_remove_names[i]);
858
859         while (!list_empty(&ls->ls_new_rsb)) {
860                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
861                                        res_hashchain);
862                 list_del(&rsb->res_hashchain);
863                 dlm_free_rsb(rsb);
864         }
865
866         /*
867          * Free structures on any other lists
868          */
869
870         dlm_purge_requestqueue(ls);
871         kfree(ls->ls_recover_args);
872         dlm_clear_members(ls);
873         dlm_clear_members_gone(ls);
874         kfree(ls->ls_node_array);
875         log_rinfo(ls, "release_lockspace final free");
876         kobject_put(&ls->ls_kobj);
877         /* The ls structure will be freed when the kobject is done with */
878
879         module_put(THIS_MODULE);
880         return 0;
881 }
882
883 /*
884  * Called when a system has released all its locks and is not going to use the
885  * lockspace any longer.  We free everything we're managing for this lockspace.
886  * Remaining nodes will go through the recovery process as if we'd died.  The
887  * lockspace must continue to function as usual, participating in recoveries,
888  * until this returns.
889  *
890  * Force has 4 possible values:
891  * 0 - don't destroy lockspace if it has any LKBs
892  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
893  * 2 - destroy lockspace regardless of LKBs
894  * 3 - destroy lockspace as part of a forced shutdown
895  */
896
897 int dlm_release_lockspace(void *lockspace, int force)
898 {
899         struct dlm_ls *ls;
900         int error;
901
902         ls = dlm_find_lockspace_local(lockspace);
903         if (!ls)
904                 return -EINVAL;
905         dlm_put_lockspace(ls);
906
907         mutex_lock(&ls_lock);
908         error = release_lockspace(ls, force);
909         if (!error)
910                 ls_count--;
911         if (!ls_count)
912                 dlm_lowcomms_stop();
913         mutex_unlock(&ls_lock);
914
915         return error;
916 }
917
918 void dlm_stop_lockspaces(void)
919 {
920         struct dlm_ls *ls;
921         int count;
922
923  restart:
924         count = 0;
925         spin_lock(&lslist_lock);
926         list_for_each_entry(ls, &lslist, ls_list) {
927                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
928                         count++;
929                         continue;
930                 }
931                 spin_unlock(&lslist_lock);
932                 log_error(ls, "no userland control daemon, stopping lockspace");
933                 dlm_ls_stop(ls);
934                 goto restart;
935         }
936         spin_unlock(&lslist_lock);
937
938         if (count)
939                 log_print("dlm user daemon left %d lockspaces", count);
940 }
941
942 void dlm_stop_lockspaces_check(void)
943 {
944         struct dlm_ls *ls;
945
946         spin_lock(&lslist_lock);
947         list_for_each_entry(ls, &lslist, ls_list) {
948                 if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
949                             !dlm_locking_stopped(ls)))
950                         break;
951         }
952         spin_unlock(&lslist_lock);
953 }