Merge tag 'driver-core-6.3-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-block.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164                              char *buf)
165 {
166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168         return a->show ? a->show(ls, buf) : 0;
169 }
170
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172                               const char *buf, size_t len)
173 {
174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176         return a->store ? a->store(ls, buf, len) : len;
177 }
178
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182         kfree(ls);
183 }
184
185 static const struct sysfs_ops dlm_attr_ops = {
186         .show  = dlm_attr_show,
187         .store = dlm_attr_store,
188 };
189
190 static struct kobj_type dlm_ktype = {
191         .default_groups = dlm_groups,
192         .sysfs_ops     = &dlm_attr_ops,
193         .release       = lockspace_kobj_release,
194 };
195
196 static struct kset *dlm_kset;
197
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200         if (in)
201                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202         else
203                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207         /* dlm_controld will see the uevent, do the necessary group management
208            and then write to sysfs to wake us */
209
210         wait_event(ls->ls_uevent_wait,
211                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215         return ls->ls_uevent_result;
216 }
217
218 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
219 {
220         const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221
222         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223         return 0;
224 }
225
226 static const struct kset_uevent_ops dlm_uevent_ops = {
227         .uevent = dlm_uevent,
228 };
229
230 int __init dlm_lockspace_init(void)
231 {
232         ls_count = 0;
233         mutex_init(&ls_lock);
234         INIT_LIST_HEAD(&lslist);
235         spin_lock_init(&lslist_lock);
236
237         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238         if (!dlm_kset) {
239                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
240                 return -ENOMEM;
241         }
242         return 0;
243 }
244
245 void dlm_lockspace_exit(void)
246 {
247         kset_unregister(dlm_kset);
248 }
249
250 static struct dlm_ls *find_ls_to_scan(void)
251 {
252         struct dlm_ls *ls;
253
254         spin_lock(&lslist_lock);
255         list_for_each_entry(ls, &lslist, ls_list) {
256                 if (time_after_eq(jiffies, ls->ls_scan_time +
257                                             dlm_config.ci_scan_secs * HZ)) {
258                         spin_unlock(&lslist_lock);
259                         return ls;
260                 }
261         }
262         spin_unlock(&lslist_lock);
263         return NULL;
264 }
265
266 static int dlm_scand(void *data)
267 {
268         struct dlm_ls *ls;
269
270         while (!kthread_should_stop()) {
271                 ls = find_ls_to_scan();
272                 if (ls) {
273                         if (dlm_lock_recovery_try(ls)) {
274                                 ls->ls_scan_time = jiffies;
275                                 dlm_scan_rsbs(ls);
276                                 dlm_scan_timeout(ls);
277                                 dlm_unlock_recovery(ls);
278                         } else {
279                                 ls->ls_scan_time += HZ;
280                         }
281                         continue;
282                 }
283                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
284         }
285         return 0;
286 }
287
288 static int dlm_scand_start(void)
289 {
290         struct task_struct *p;
291         int error = 0;
292
293         p = kthread_run(dlm_scand, NULL, "dlm_scand");
294         if (IS_ERR(p))
295                 error = PTR_ERR(p);
296         else
297                 scand_task = p;
298         return error;
299 }
300
301 static void dlm_scand_stop(void)
302 {
303         kthread_stop(scand_task);
304 }
305
306 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
307 {
308         struct dlm_ls *ls;
309
310         spin_lock(&lslist_lock);
311
312         list_for_each_entry(ls, &lslist, ls_list) {
313                 if (ls->ls_global_id == id) {
314                         atomic_inc(&ls->ls_count);
315                         goto out;
316                 }
317         }
318         ls = NULL;
319  out:
320         spin_unlock(&lslist_lock);
321         return ls;
322 }
323
324 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
325 {
326         struct dlm_ls *ls;
327
328         spin_lock(&lslist_lock);
329         list_for_each_entry(ls, &lslist, ls_list) {
330                 if (ls->ls_local_handle == lockspace) {
331                         atomic_inc(&ls->ls_count);
332                         goto out;
333                 }
334         }
335         ls = NULL;
336  out:
337         spin_unlock(&lslist_lock);
338         return ls;
339 }
340
341 struct dlm_ls *dlm_find_lockspace_device(int minor)
342 {
343         struct dlm_ls *ls;
344
345         spin_lock(&lslist_lock);
346         list_for_each_entry(ls, &lslist, ls_list) {
347                 if (ls->ls_device.minor == minor) {
348                         atomic_inc(&ls->ls_count);
349                         goto out;
350                 }
351         }
352         ls = NULL;
353  out:
354         spin_unlock(&lslist_lock);
355         return ls;
356 }
357
358 void dlm_put_lockspace(struct dlm_ls *ls)
359 {
360         if (atomic_dec_and_test(&ls->ls_count))
361                 wake_up(&ls->ls_count_wait);
362 }
363
364 static void remove_lockspace(struct dlm_ls *ls)
365 {
366 retry:
367         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
368
369         spin_lock(&lslist_lock);
370         if (atomic_read(&ls->ls_count) != 0) {
371                 spin_unlock(&lslist_lock);
372                 goto retry;
373         }
374
375         WARN_ON(ls->ls_create_count != 0);
376         list_del(&ls->ls_list);
377         spin_unlock(&lslist_lock);
378 }
379
380 static int threads_start(void)
381 {
382         int error;
383
384         /* Thread for sending/receiving messages for all lockspace's */
385         error = dlm_midcomms_start();
386         if (error) {
387                 log_print("cannot start dlm midcomms %d", error);
388                 goto fail;
389         }
390
391         error = dlm_scand_start();
392         if (error) {
393                 log_print("cannot start dlm_scand thread %d", error);
394                 goto midcomms_fail;
395         }
396
397         return 0;
398
399  midcomms_fail:
400         dlm_midcomms_stop();
401  fail:
402         return error;
403 }
404
405 static int new_lockspace(const char *name, const char *cluster,
406                          uint32_t flags, int lvblen,
407                          const struct dlm_lockspace_ops *ops, void *ops_arg,
408                          int *ops_result, dlm_lockspace_t **lockspace)
409 {
410         struct dlm_ls *ls;
411         int i, size, error;
412         int do_unreg = 0;
413         int namelen = strlen(name);
414
415         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
416                 return -EINVAL;
417
418         if (lvblen % 8)
419                 return -EINVAL;
420
421         if (!try_module_get(THIS_MODULE))
422                 return -EINVAL;
423
424         if (!dlm_user_daemon_available()) {
425                 log_print("dlm user daemon not available");
426                 error = -EUNATCH;
427                 goto out;
428         }
429
430         if (ops && ops_result) {
431                 if (!dlm_config.ci_recover_callbacks)
432                         *ops_result = -EOPNOTSUPP;
433                 else
434                         *ops_result = 0;
435         }
436
437         if (!cluster)
438                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
439                           dlm_config.ci_cluster_name);
440
441         if (dlm_config.ci_recover_callbacks && cluster &&
442             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443                 log_print("dlm cluster name '%s' does not match "
444                           "the application cluster name '%s'",
445                           dlm_config.ci_cluster_name, cluster);
446                 error = -EBADR;
447                 goto out;
448         }
449
450         error = 0;
451
452         spin_lock(&lslist_lock);
453         list_for_each_entry(ls, &lslist, ls_list) {
454                 WARN_ON(ls->ls_create_count <= 0);
455                 if (ls->ls_namelen != namelen)
456                         continue;
457                 if (memcmp(ls->ls_name, name, namelen))
458                         continue;
459                 if (flags & DLM_LSFL_NEWEXCL) {
460                         error = -EEXIST;
461                         break;
462                 }
463                 ls->ls_create_count++;
464                 *lockspace = ls;
465                 error = 1;
466                 break;
467         }
468         spin_unlock(&lslist_lock);
469
470         if (error)
471                 goto out;
472
473         error = -ENOMEM;
474
475         ls = kzalloc(sizeof(*ls), GFP_NOFS);
476         if (!ls)
477                 goto out;
478         memcpy(ls->ls_name, name, namelen);
479         ls->ls_namelen = namelen;
480         ls->ls_lvblen = lvblen;
481         atomic_set(&ls->ls_count, 0);
482         init_waitqueue_head(&ls->ls_count_wait);
483         ls->ls_flags = 0;
484         ls->ls_scan_time = jiffies;
485
486         if (ops && dlm_config.ci_recover_callbacks) {
487                 ls->ls_ops = ops;
488                 ls->ls_ops_arg = ops_arg;
489         }
490
491 #ifdef CONFIG_DLM_DEPRECATED_API
492         if (flags & DLM_LSFL_TIMEWARN) {
493                 pr_warn_once("===============================================================\n"
494                              "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
495                              "         will be removed in v6.2!\n"
496                              "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
497                              "===============================================================\n");
498
499                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500         }
501
502         /* ls_exflags are forced to match among nodes, and we don't
503          * need to require all nodes to have some flags set
504          */
505         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
506                                     DLM_LSFL_NEWEXCL));
507 #else
508         /* ls_exflags are forced to match among nodes, and we don't
509          * need to require all nodes to have some flags set
510          */
511         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
512 #endif
513
514         size = READ_ONCE(dlm_config.ci_rsbtbl_size);
515         ls->ls_rsbtbl_size = size;
516
517         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
518         if (!ls->ls_rsbtbl)
519                 goto out_lsfree;
520         for (i = 0; i < size; i++) {
521                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
522                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
523                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
524         }
525
526         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
527                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
528                                                  GFP_KERNEL);
529                 if (!ls->ls_remove_names[i])
530                         goto out_rsbtbl;
531         }
532
533         idr_init(&ls->ls_lkbidr);
534         spin_lock_init(&ls->ls_lkbidr_spin);
535
536         INIT_LIST_HEAD(&ls->ls_waiters);
537         mutex_init(&ls->ls_waiters_mutex);
538         INIT_LIST_HEAD(&ls->ls_orphans);
539         mutex_init(&ls->ls_orphans_mutex);
540 #ifdef CONFIG_DLM_DEPRECATED_API
541         INIT_LIST_HEAD(&ls->ls_timeout);
542         mutex_init(&ls->ls_timeout_mutex);
543 #endif
544
545         INIT_LIST_HEAD(&ls->ls_new_rsb);
546         spin_lock_init(&ls->ls_new_rsb_spin);
547
548         INIT_LIST_HEAD(&ls->ls_nodes);
549         INIT_LIST_HEAD(&ls->ls_nodes_gone);
550         ls->ls_num_nodes = 0;
551         ls->ls_low_nodeid = 0;
552         ls->ls_total_weight = 0;
553         ls->ls_node_array = NULL;
554
555         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
556         ls->ls_stub_rsb.res_ls = ls;
557
558         ls->ls_debug_rsb_dentry = NULL;
559         ls->ls_debug_waiters_dentry = NULL;
560
561         init_waitqueue_head(&ls->ls_uevent_wait);
562         ls->ls_uevent_result = 0;
563         init_completion(&ls->ls_recovery_done);
564         ls->ls_recovery_result = -1;
565
566         spin_lock_init(&ls->ls_cb_lock);
567         INIT_LIST_HEAD(&ls->ls_cb_delay);
568
569         ls->ls_recoverd_task = NULL;
570         mutex_init(&ls->ls_recoverd_active);
571         spin_lock_init(&ls->ls_recover_lock);
572         spin_lock_init(&ls->ls_rcom_spin);
573         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
574         ls->ls_recover_status = 0;
575         ls->ls_recover_seq = get_random_u64();
576         ls->ls_recover_args = NULL;
577         init_rwsem(&ls->ls_in_recovery);
578         init_rwsem(&ls->ls_recv_active);
579         INIT_LIST_HEAD(&ls->ls_requestqueue);
580         atomic_set(&ls->ls_requestqueue_cnt, 0);
581         init_waitqueue_head(&ls->ls_requestqueue_wait);
582         mutex_init(&ls->ls_requestqueue_mutex);
583         spin_lock_init(&ls->ls_clear_proc_locks);
584
585         /* Due backwards compatibility with 3.1 we need to use maximum
586          * possible dlm message size to be sure the message will fit and
587          * not having out of bounds issues. However on sending side 3.2
588          * might send less.
589          */
590         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
591         if (!ls->ls_recover_buf)
592                 goto out_lkbidr;
593
594         ls->ls_slot = 0;
595         ls->ls_num_slots = 0;
596         ls->ls_slots_size = 0;
597         ls->ls_slots = NULL;
598
599         INIT_LIST_HEAD(&ls->ls_recover_list);
600         spin_lock_init(&ls->ls_recover_list_lock);
601         idr_init(&ls->ls_recover_idr);
602         spin_lock_init(&ls->ls_recover_idr_lock);
603         ls->ls_recover_list_count = 0;
604         ls->ls_local_handle = ls;
605         init_waitqueue_head(&ls->ls_wait_general);
606         INIT_LIST_HEAD(&ls->ls_root_list);
607         init_rwsem(&ls->ls_root_sem);
608
609         spin_lock(&lslist_lock);
610         ls->ls_create_count = 1;
611         list_add(&ls->ls_list, &lslist);
612         spin_unlock(&lslist_lock);
613
614         if (flags & DLM_LSFL_FS) {
615                 error = dlm_callback_start(ls);
616                 if (error) {
617                         log_error(ls, "can't start dlm_callback %d", error);
618                         goto out_delist;
619                 }
620         }
621
622         init_waitqueue_head(&ls->ls_recover_lock_wait);
623
624         /*
625          * Once started, dlm_recoverd first looks for ls in lslist, then
626          * initializes ls_in_recovery as locked in "down" mode.  We need
627          * to wait for the wakeup from dlm_recoverd because in_recovery
628          * has to start out in down mode.
629          */
630
631         error = dlm_recoverd_start(ls);
632         if (error) {
633                 log_error(ls, "can't start dlm_recoverd %d", error);
634                 goto out_callback;
635         }
636
637         wait_event(ls->ls_recover_lock_wait,
638                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
639
640         /* let kobject handle freeing of ls if there's an error */
641         do_unreg = 1;
642
643         ls->ls_kobj.kset = dlm_kset;
644         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
645                                      "%s", ls->ls_name);
646         if (error)
647                 goto out_recoverd;
648         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
649
650         /* This uevent triggers dlm_controld in userspace to add us to the
651            group of nodes that are members of this lockspace (managed by the
652            cluster infrastructure.)  Once it's done that, it tells us who the
653            current lockspace members are (via configfs) and then tells the
654            lockspace to start running (via sysfs) in dlm_ls_start(). */
655
656         error = do_uevent(ls, 1);
657         if (error)
658                 goto out_recoverd;
659
660         /* wait until recovery is successful or failed */
661         wait_for_completion(&ls->ls_recovery_done);
662         error = ls->ls_recovery_result;
663         if (error)
664                 goto out_members;
665
666         dlm_create_debug_file(ls);
667
668         log_rinfo(ls, "join complete");
669         *lockspace = ls;
670         return 0;
671
672  out_members:
673         do_uevent(ls, 0);
674         dlm_clear_members(ls);
675         kfree(ls->ls_node_array);
676  out_recoverd:
677         dlm_recoverd_stop(ls);
678  out_callback:
679         dlm_callback_stop(ls);
680  out_delist:
681         spin_lock(&lslist_lock);
682         list_del(&ls->ls_list);
683         spin_unlock(&lslist_lock);
684         idr_destroy(&ls->ls_recover_idr);
685         kfree(ls->ls_recover_buf);
686  out_lkbidr:
687         idr_destroy(&ls->ls_lkbidr);
688  out_rsbtbl:
689         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
690                 kfree(ls->ls_remove_names[i]);
691         vfree(ls->ls_rsbtbl);
692  out_lsfree:
693         if (do_unreg)
694                 kobject_put(&ls->ls_kobj);
695         else
696                 kfree(ls);
697  out:
698         module_put(THIS_MODULE);
699         return error;
700 }
701
702 static int __dlm_new_lockspace(const char *name, const char *cluster,
703                                uint32_t flags, int lvblen,
704                                const struct dlm_lockspace_ops *ops,
705                                void *ops_arg, int *ops_result,
706                                dlm_lockspace_t **lockspace)
707 {
708         int error = 0;
709
710         mutex_lock(&ls_lock);
711         if (!ls_count)
712                 error = threads_start();
713         if (error)
714                 goto out;
715
716         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
717                               ops_result, lockspace);
718         if (!error)
719                 ls_count++;
720         if (error > 0)
721                 error = 0;
722         if (!ls_count) {
723                 dlm_scand_stop();
724                 dlm_midcomms_shutdown();
725                 dlm_midcomms_stop();
726         }
727  out:
728         mutex_unlock(&ls_lock);
729         return error;
730 }
731
732 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
733                       int lvblen, const struct dlm_lockspace_ops *ops,
734                       void *ops_arg, int *ops_result,
735                       dlm_lockspace_t **lockspace)
736 {
737         return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
738                                    ops, ops_arg, ops_result, lockspace);
739 }
740
741 int dlm_new_user_lockspace(const char *name, const char *cluster,
742                            uint32_t flags, int lvblen,
743                            const struct dlm_lockspace_ops *ops,
744                            void *ops_arg, int *ops_result,
745                            dlm_lockspace_t **lockspace)
746 {
747         return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
748                                    ops_arg, ops_result, lockspace);
749 }
750
751 static int lkb_idr_is_local(int id, void *p, void *data)
752 {
753         struct dlm_lkb *lkb = p;
754
755         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
756 }
757
758 static int lkb_idr_is_any(int id, void *p, void *data)
759 {
760         return 1;
761 }
762
763 static int lkb_idr_free(int id, void *p, void *data)
764 {
765         struct dlm_lkb *lkb = p;
766
767         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
768                 dlm_free_lvb(lkb->lkb_lvbptr);
769
770         dlm_free_lkb(lkb);
771         return 0;
772 }
773
774 /* NOTE: We check the lkbidr here rather than the resource table.
775    This is because there may be LKBs queued as ASTs that have been unlinked
776    from their RSBs and are pending deletion once the AST has been delivered */
777
778 static int lockspace_busy(struct dlm_ls *ls, int force)
779 {
780         int rv;
781
782         spin_lock(&ls->ls_lkbidr_spin);
783         if (force == 0) {
784                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
785         } else if (force == 1) {
786                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
787         } else {
788                 rv = 0;
789         }
790         spin_unlock(&ls->ls_lkbidr_spin);
791         return rv;
792 }
793
794 static int release_lockspace(struct dlm_ls *ls, int force)
795 {
796         struct dlm_rsb *rsb;
797         struct rb_node *n;
798         int i, busy, rv;
799
800         busy = lockspace_busy(ls, force);
801
802         spin_lock(&lslist_lock);
803         if (ls->ls_create_count == 1) {
804                 if (busy) {
805                         rv = -EBUSY;
806                 } else {
807                         /* remove_lockspace takes ls off lslist */
808                         ls->ls_create_count = 0;
809                         rv = 0;
810                 }
811         } else if (ls->ls_create_count > 1) {
812                 rv = --ls->ls_create_count;
813         } else {
814                 rv = -EINVAL;
815         }
816         spin_unlock(&lslist_lock);
817
818         if (rv) {
819                 log_debug(ls, "release_lockspace no remove %d", rv);
820                 return rv;
821         }
822
823         if (ls_count == 1)
824                 dlm_midcomms_version_wait();
825
826         dlm_device_deregister(ls);
827
828         if (force < 3 && dlm_user_daemon_available())
829                 do_uevent(ls, 0);
830
831         dlm_recoverd_stop(ls);
832
833         if (ls_count == 1) {
834                 dlm_scand_stop();
835                 dlm_clear_members(ls);
836                 dlm_midcomms_shutdown();
837         }
838
839         dlm_callback_stop(ls);
840
841         remove_lockspace(ls);
842
843         dlm_delete_debug_file(ls);
844
845         idr_destroy(&ls->ls_recover_idr);
846         kfree(ls->ls_recover_buf);
847
848         /*
849          * Free all lkb's in idr
850          */
851
852         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
853         idr_destroy(&ls->ls_lkbidr);
854
855         /*
856          * Free all rsb's on rsbtbl[] lists
857          */
858
859         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
860                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
861                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
862                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
863                         dlm_free_rsb(rsb);
864                 }
865
866                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
867                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
868                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
869                         dlm_free_rsb(rsb);
870                 }
871         }
872
873         vfree(ls->ls_rsbtbl);
874
875         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
876                 kfree(ls->ls_remove_names[i]);
877
878         while (!list_empty(&ls->ls_new_rsb)) {
879                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
880                                        res_hashchain);
881                 list_del(&rsb->res_hashchain);
882                 dlm_free_rsb(rsb);
883         }
884
885         /*
886          * Free structures on any other lists
887          */
888
889         dlm_purge_requestqueue(ls);
890         kfree(ls->ls_recover_args);
891         dlm_clear_members(ls);
892         dlm_clear_members_gone(ls);
893         kfree(ls->ls_node_array);
894         log_rinfo(ls, "release_lockspace final free");
895         kobject_put(&ls->ls_kobj);
896         /* The ls structure will be freed when the kobject is done with */
897
898         module_put(THIS_MODULE);
899         return 0;
900 }
901
902 /*
903  * Called when a system has released all its locks and is not going to use the
904  * lockspace any longer.  We free everything we're managing for this lockspace.
905  * Remaining nodes will go through the recovery process as if we'd died.  The
906  * lockspace must continue to function as usual, participating in recoveries,
907  * until this returns.
908  *
909  * Force has 4 possible values:
910  * 0 - don't destroy lockspace if it has any LKBs
911  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
912  * 2 - destroy lockspace regardless of LKBs
913  * 3 - destroy lockspace as part of a forced shutdown
914  */
915
916 int dlm_release_lockspace(void *lockspace, int force)
917 {
918         struct dlm_ls *ls;
919         int error;
920
921         ls = dlm_find_lockspace_local(lockspace);
922         if (!ls)
923                 return -EINVAL;
924         dlm_put_lockspace(ls);
925
926         mutex_lock(&ls_lock);
927         error = release_lockspace(ls, force);
928         if (!error)
929                 ls_count--;
930         if (!ls_count)
931                 dlm_midcomms_stop();
932         mutex_unlock(&ls_lock);
933
934         return error;
935 }
936
937 void dlm_stop_lockspaces(void)
938 {
939         struct dlm_ls *ls;
940         int count;
941
942  restart:
943         count = 0;
944         spin_lock(&lslist_lock);
945         list_for_each_entry(ls, &lslist, ls_list) {
946                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
947                         count++;
948                         continue;
949                 }
950                 spin_unlock(&lslist_lock);
951                 log_error(ls, "no userland control daemon, stopping lockspace");
952                 dlm_ls_stop(ls);
953                 goto restart;
954         }
955         spin_unlock(&lslist_lock);
956
957         if (count)
958                 log_print("dlm user daemon left %d lockspaces", count);
959 }
960
961 void dlm_stop_lockspaces_check(void)
962 {
963         struct dlm_ls *ls;
964
965         spin_lock(&lslist_lock);
966         list_for_each_entry(ls, &lslist, ls_list) {
967                 if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
968                             !dlm_locking_stopped(ls)))
969                         break;
970         }
971         spin_unlock(&lslist_lock);
972 }