overlayfs: Implement splice-read
[linux-block.git] / fs / dlm / lockspace.c
CommitLineData
2522fe45 1// SPDX-License-Identifier: GPL-2.0-only
e7fd4179
DT
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
60f98d18 6** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
e7fd4179 7**
e7fd4179
DT
8**
9*******************************************************************************
10******************************************************************************/
11
7963b8a5
PG
12#include <linux/module.h>
13
e7fd4179
DT
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "recoverd.h"
e7fd4179 18#include "dir.h"
a070a91c 19#include "midcomms.h"
e7fd4179
DT
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
c56b39cd 23#include "recover.h"
2896ee37 24#include "requestqueue.h"
0f8e0d9a 25#include "user.h"
23e8e1aa 26#include "ast.h"
e7fd4179 27
e7fd4179 28static int ls_count;
90135925 29static struct mutex ls_lock;
e7fd4179
DT
30static struct list_head lslist;
31static spinlock_t lslist_lock;
32static struct task_struct * scand_task;
33
34
35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36{
37 ssize_t ret = len;
6edb5687
FF
38 int n;
39 int rc = kstrtoint(buf, 0, &n);
e7fd4179 40
6edb5687
FF
41 if (rc)
42 return rc;
e2de7f56
PC
43 ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 if (!ls)
45 return -EINVAL;
46
e7fd4179
DT
47 switch (n) {
48 case 0:
49 dlm_ls_stop(ls);
50 break;
51 case 1:
52 dlm_ls_start(ls);
53 break;
54 default:
55 ret = -EINVAL;
56 }
e2de7f56 57 dlm_put_lockspace(ls);
e7fd4179
DT
58 return ret;
59}
60
61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62{
6edb5687
FF
63 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65 if (rc)
66 return rc;
e7fd4179
DT
67 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 wake_up(&ls->ls_uevent_wait);
69 return len;
70}
71
72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73{
a1d144c7 74 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
e7fd4179
DT
75}
76
77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78{
6edb5687
FF
79 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81 if (rc)
82 return rc;
e7fd4179
DT
83 return len;
84}
85
4875647a
DT
86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87{
88 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89}
90
91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92{
6edb5687
FF
93 int val;
94 int rc = kstrtoint(buf, 0, &val);
95
96 if (rc)
97 return rc;
4875647a
DT
98 if (val == 1)
99 set_bit(LSFL_NODIR, &ls->ls_flags);
100 return len;
101}
102
c56b39cd
DT
103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104{
105 uint32_t status = dlm_recover_status(ls);
a1d144c7 106 return snprintf(buf, PAGE_SIZE, "%x\n", status);
c56b39cd
DT
107}
108
faa0f267
DT
109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110{
a1d144c7 111 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
faa0f267
DT
112}
113
e7fd4179
DT
114struct dlm_attr {
115 struct attribute attr;
116 ssize_t (*show)(struct dlm_ls *, char *);
117 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118};
119
120static struct dlm_attr dlm_attr_control = {
121 .attr = {.name = "control", .mode = S_IWUSR},
122 .store = dlm_control_store
123};
124
125static struct dlm_attr dlm_attr_event = {
126 .attr = {.name = "event_done", .mode = S_IWUSR},
127 .store = dlm_event_store
128};
129
130static struct dlm_attr dlm_attr_id = {
131 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 .show = dlm_id_show,
133 .store = dlm_id_store
134};
135
4875647a
DT
136static struct dlm_attr dlm_attr_nodir = {
137 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 .show = dlm_nodir_show,
139 .store = dlm_nodir_store
140};
141
c56b39cd
DT
142static struct dlm_attr dlm_attr_recover_status = {
143 .attr = {.name = "recover_status", .mode = S_IRUGO},
144 .show = dlm_recover_status_show
145};
146
faa0f267
DT
147static struct dlm_attr dlm_attr_recover_nodeid = {
148 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
149 .show = dlm_recover_nodeid_show
150};
151
e7fd4179
DT
152static struct attribute *dlm_attrs[] = {
153 &dlm_attr_control.attr,
154 &dlm_attr_event.attr,
155 &dlm_attr_id.attr,
4875647a 156 &dlm_attr_nodir.attr,
c56b39cd 157 &dlm_attr_recover_status.attr,
faa0f267 158 &dlm_attr_recover_nodeid.attr,
e7fd4179
DT
159 NULL,
160};
c9c5b5e1 161ATTRIBUTE_GROUPS(dlm);
e7fd4179
DT
162
163static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 char *buf)
165{
166 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
167 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 return a->show ? a->show(ls, buf) : 0;
169}
170
171static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 const char *buf, size_t len)
173{
174 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
175 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 return a->store ? a->store(ls, buf, len) : len;
177}
178
ba542e3b
PC
179static void lockspace_kobj_release(struct kobject *k)
180{
181 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
182 kfree(ls);
183}
184
52cf25d0 185static const struct sysfs_ops dlm_attr_ops = {
e7fd4179
DT
186 .show = dlm_attr_show,
187 .store = dlm_attr_store,
188};
189
190static struct kobj_type dlm_ktype = {
c9c5b5e1 191 .default_groups = dlm_groups,
e7fd4179 192 .sysfs_ops = &dlm_attr_ops,
ba542e3b 193 .release = lockspace_kobj_release,
e7fd4179
DT
194};
195
d405936b 196static struct kset *dlm_kset;
e7fd4179 197
e7fd4179
DT
198static int do_uevent(struct dlm_ls *ls, int in)
199{
e7fd4179
DT
200 if (in)
201 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 else
203 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
075f0177 205 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
8b0e7b2c
DT
206
207 /* dlm_controld will see the uevent, do the necessary group management
208 and then write to sysfs to wake us */
209
f084a4f4
RL
210 wait_event(ls->ls_uevent_wait,
211 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
8b0e7b2c 212
f084a4f4 213 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
e7fd4179 214
f084a4f4 215 return ls->ls_uevent_result;
e7fd4179
DT
216}
217
56d5f362 218static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
b4a5d4bc 219{
56d5f362 220 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
b4a5d4bc
SW
221
222 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223 return 0;
224}
225
417f7c59 226static const struct kset_uevent_ops dlm_uevent_ops = {
b4a5d4bc
SW
227 .uevent = dlm_uevent,
228};
e7fd4179 229
30727174 230int __init dlm_lockspace_init(void)
e7fd4179 231{
e7fd4179 232 ls_count = 0;
90135925 233 mutex_init(&ls_lock);
e7fd4179
DT
234 INIT_LIST_HEAD(&lslist);
235 spin_lock_init(&lslist_lock);
236
b4a5d4bc 237 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
d405936b 238 if (!dlm_kset) {
8e24eea7 239 printk(KERN_WARNING "%s: can not create kset\n", __func__);
d405936b
GKH
240 return -ENOMEM;
241 }
242 return 0;
e7fd4179
DT
243}
244
245void dlm_lockspace_exit(void)
246{
d405936b 247 kset_unregister(dlm_kset);
e7fd4179
DT
248}
249
c1dcf65f
DT
250static struct dlm_ls *find_ls_to_scan(void)
251{
252 struct dlm_ls *ls;
253
254 spin_lock(&lslist_lock);
255 list_for_each_entry(ls, &lslist, ls_list) {
256 if (time_after_eq(jiffies, ls->ls_scan_time +
257 dlm_config.ci_scan_secs * HZ)) {
258 spin_unlock(&lslist_lock);
259 return ls;
260 }
261 }
262 spin_unlock(&lslist_lock);
263 return NULL;
264}
265
e7fd4179
DT
266static int dlm_scand(void *data)
267{
268 struct dlm_ls *ls;
269
270 while (!kthread_should_stop()) {
c1dcf65f
DT
271 ls = find_ls_to_scan();
272 if (ls) {
85e86edf 273 if (dlm_lock_recovery_try(ls)) {
c1dcf65f 274 ls->ls_scan_time = jiffies;
85e86edf
DT
275 dlm_scan_rsbs(ls);
276 dlm_unlock_recovery(ls);
c1dcf65f
DT
277 } else {
278 ls->ls_scan_time += HZ;
85e86edf 279 }
c6ff669b 280 continue;
85e86edf 281 }
c6ff669b 282 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
e7fd4179
DT
283 }
284 return 0;
285}
286
287static int dlm_scand_start(void)
288{
289 struct task_struct *p;
290 int error = 0;
291
292 p = kthread_run(dlm_scand, NULL, "dlm_scand");
293 if (IS_ERR(p))
294 error = PTR_ERR(p);
295 else
296 scand_task = p;
297 return error;
298}
299
300static void dlm_scand_stop(void)
301{
302 kthread_stop(scand_task);
303}
304
e7fd4179
DT
305struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
306{
307 struct dlm_ls *ls;
308
309 spin_lock(&lslist_lock);
310
311 list_for_each_entry(ls, &lslist, ls_list) {
312 if (ls->ls_global_id == id) {
3cb5977c 313 atomic_inc(&ls->ls_count);
e7fd4179
DT
314 goto out;
315 }
316 }
317 ls = NULL;
318 out:
319 spin_unlock(&lslist_lock);
320 return ls;
321}
322
597d0cae 323struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
e7fd4179 324{
597d0cae 325 struct dlm_ls *ls;
e7fd4179
DT
326
327 spin_lock(&lslist_lock);
597d0cae
DT
328 list_for_each_entry(ls, &lslist, ls_list) {
329 if (ls->ls_local_handle == lockspace) {
3cb5977c 330 atomic_inc(&ls->ls_count);
597d0cae
DT
331 goto out;
332 }
333 }
334 ls = NULL;
335 out:
336 spin_unlock(&lslist_lock);
337 return ls;
338}
339
340struct dlm_ls *dlm_find_lockspace_device(int minor)
341{
342 struct dlm_ls *ls;
343
344 spin_lock(&lslist_lock);
345 list_for_each_entry(ls, &lslist, ls_list) {
346 if (ls->ls_device.minor == minor) {
3cb5977c 347 atomic_inc(&ls->ls_count);
597d0cae
DT
348 goto out;
349 }
350 }
351 ls = NULL;
352 out:
e7fd4179
DT
353 spin_unlock(&lslist_lock);
354 return ls;
355}
356
357void dlm_put_lockspace(struct dlm_ls *ls)
358{
3cb5977c
AA
359 if (atomic_dec_and_test(&ls->ls_count))
360 wake_up(&ls->ls_count_wait);
e7fd4179
DT
361}
362
363static void remove_lockspace(struct dlm_ls *ls)
364{
3cb5977c
AA
365retry:
366 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
367
368 spin_lock(&lslist_lock);
369 if (atomic_read(&ls->ls_count) != 0) {
e7fd4179 370 spin_unlock(&lslist_lock);
3cb5977c 371 goto retry;
e7fd4179 372 }
3cb5977c
AA
373
374 WARN_ON(ls->ls_create_count != 0);
375 list_del(&ls->ls_list);
376 spin_unlock(&lslist_lock);
e7fd4179
DT
377}
378
379static int threads_start(void)
380{
381 int error;
382
aad633dc
AA
383 /* Thread for sending/receiving messages for all lockspace's */
384 error = dlm_midcomms_start();
e7fd4179 385 if (error) {
aad633dc 386 log_print("cannot start dlm midcomms %d", error);
23e8e1aa 387 goto fail;
e7fd4179
DT
388 }
389
aad633dc 390 error = dlm_scand_start();
e7fd4179 391 if (error) {
aad633dc
AA
392 log_print("cannot start dlm_scand thread %d", error);
393 goto midcomms_fail;
e7fd4179
DT
394 }
395
396 return 0;
397
aad633dc
AA
398 midcomms_fail:
399 dlm_midcomms_stop();
e7fd4179
DT
400 fail:
401 return error;
402}
403
60f98d18
DT
404static int new_lockspace(const char *name, const char *cluster,
405 uint32_t flags, int lvblen,
406 const struct dlm_lockspace_ops *ops, void *ops_arg,
407 int *ops_result, dlm_lockspace_t **lockspace)
e7fd4179
DT
408{
409 struct dlm_ls *ls;
0f8e0d9a 410 int i, size, error;
79d72b54 411 int do_unreg = 0;
60f98d18 412 int namelen = strlen(name);
e7fd4179 413
3f0806d2 414 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
e7fd4179
DT
415 return -EINVAL;
416
b5c9d37c 417 if (lvblen % 8)
e7fd4179
DT
418 return -EINVAL;
419
420 if (!try_module_get(THIS_MODULE))
421 return -EINVAL;
422
dc68c7ed 423 if (!dlm_user_daemon_available()) {
60f98d18
DT
424 log_print("dlm user daemon not available");
425 error = -EUNATCH;
426 goto out;
427 }
428
429 if (ops && ops_result) {
430 if (!dlm_config.ci_recover_callbacks)
431 *ops_result = -EOPNOTSUPP;
432 else
433 *ops_result = 0;
434 }
435
3b0e761b
ZL
436 if (!cluster)
437 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
438 dlm_config.ci_cluster_name);
439
60f98d18
DT
440 if (dlm_config.ci_recover_callbacks && cluster &&
441 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
8e174374
GH
442 log_print("dlm cluster name '%s' does not match "
443 "the application cluster name '%s'",
60f98d18
DT
444 dlm_config.ci_cluster_name, cluster);
445 error = -EBADR;
446 goto out;
dc68c7ed
DT
447 }
448
0f8e0d9a
DT
449 error = 0;
450
451 spin_lock(&lslist_lock);
452 list_for_each_entry(ls, &lslist, ls_list) {
453 WARN_ON(ls->ls_create_count <= 0);
454 if (ls->ls_namelen != namelen)
455 continue;
456 if (memcmp(ls->ls_name, name, namelen))
457 continue;
458 if (flags & DLM_LSFL_NEWEXCL) {
459 error = -EEXIST;
460 break;
461 }
462 ls->ls_create_count++;
8511a272
DT
463 *lockspace = ls;
464 error = 1;
0f8e0d9a 465 break;
e7fd4179 466 }
0f8e0d9a
DT
467 spin_unlock(&lslist_lock);
468
0f8e0d9a 469 if (error)
8511a272 470 goto out;
0f8e0d9a
DT
471
472 error = -ENOMEM;
e7fd4179 473
d96d0f96 474 ls = kzalloc(sizeof(*ls), GFP_NOFS);
e7fd4179
DT
475 if (!ls)
476 goto out;
e7fd4179
DT
477 memcpy(ls->ls_name, name, namelen);
478 ls->ls_namelen = namelen;
e7fd4179 479 ls->ls_lvblen = lvblen;
3cb5977c
AA
480 atomic_set(&ls->ls_count, 0);
481 init_waitqueue_head(&ls->ls_count_wait);
e7fd4179 482 ls->ls_flags = 0;
c1dcf65f 483 ls->ls_scan_time = jiffies;
e7fd4179 484
60f98d18
DT
485 if (ops && dlm_config.ci_recover_callbacks) {
486 ls->ls_ops = ops;
487 ls->ls_ops_arg = ops_arg;
488 }
489
6b0afc0c
AA
490 /* ls_exflags are forced to match among nodes, and we don't
491 * need to require all nodes to have some flags set
492 */
493 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
fad59c13 494
d921a23f 495 size = READ_ONCE(dlm_config.ci_rsbtbl_size);
e7fd4179
DT
496 ls->ls_rsbtbl_size = size;
497
42bc47b3 498 ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
e7fd4179
DT
499 if (!ls->ls_rsbtbl)
500 goto out_lsfree;
501 for (i = 0; i < size; i++) {
9beb3bf5
BP
502 ls->ls_rsbtbl[i].keep.rb_node = NULL;
503 ls->ls_rsbtbl[i].toss.rb_node = NULL;
c7be761a 504 spin_lock_init(&ls->ls_rsbtbl[i].lock);
e7fd4179
DT
505 }
506
05c32f47
DT
507 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
508 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
509 GFP_KERNEL);
510 if (!ls->ls_remove_names[i])
511 goto out_rsbtbl;
512 }
513
3d6aa675
DT
514 idr_init(&ls->ls_lkbidr);
515 spin_lock_init(&ls->ls_lkbidr_spin);
e7fd4179 516
e7fd4179 517 INIT_LIST_HEAD(&ls->ls_waiters);
90135925 518 mutex_init(&ls->ls_waiters_mutex);
ef0c2bb0
DT
519 INIT_LIST_HEAD(&ls->ls_orphans);
520 mutex_init(&ls->ls_orphans_mutex);
e7fd4179 521
3881ac04
DT
522 INIT_LIST_HEAD(&ls->ls_new_rsb);
523 spin_lock_init(&ls->ls_new_rsb_spin);
524
e7fd4179
DT
525 INIT_LIST_HEAD(&ls->ls_nodes);
526 INIT_LIST_HEAD(&ls->ls_nodes_gone);
527 ls->ls_num_nodes = 0;
528 ls->ls_low_nodeid = 0;
529 ls->ls_total_weight = 0;
530 ls->ls_node_array = NULL;
531
a7e7ffac
AA
532 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
533 ls->ls_local_rsb.res_ls = ls;
e7fd4179 534
5de6319b
DT
535 ls->ls_debug_rsb_dentry = NULL;
536 ls->ls_debug_waiters_dentry = NULL;
e7fd4179
DT
537
538 init_waitqueue_head(&ls->ls_uevent_wait);
539 ls->ls_uevent_result = 0;
682bb91b
AA
540 init_completion(&ls->ls_recovery_done);
541 ls->ls_recovery_result = -1;
e7fd4179 542
a4c0352b 543 spin_lock_init(&ls->ls_cb_lock);
23e8e1aa
DT
544 INIT_LIST_HEAD(&ls->ls_cb_delay);
545
e7fd4179 546 ls->ls_recoverd_task = NULL;
90135925 547 mutex_init(&ls->ls_recoverd_active);
e7fd4179 548 spin_lock_init(&ls->ls_recover_lock);
98f176fb
DT
549 spin_lock_init(&ls->ls_rcom_spin);
550 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
e7fd4179 551 ls->ls_recover_status = 0;
317dd6ba 552 ls->ls_recover_seq = get_random_u64();
e7fd4179
DT
553 ls->ls_recover_args = NULL;
554 init_rwsem(&ls->ls_in_recovery);
c36258b5 555 init_rwsem(&ls->ls_recv_active);
e7fd4179 556 INIT_LIST_HEAD(&ls->ls_requestqueue);
164d88ab
AA
557 atomic_set(&ls->ls_requestqueue_cnt, 0);
558 init_waitqueue_head(&ls->ls_requestqueue_wait);
90135925 559 mutex_init(&ls->ls_requestqueue_mutex);
296d9d1e 560 spin_lock_init(&ls->ls_clear_proc_locks);
e7fd4179 561
489d8e55
AA
562 /* Due backwards compatibility with 3.1 we need to use maximum
563 * possible dlm message size to be sure the message will fit and
564 * not having out of bounds issues. However on sending side 3.2
565 * might send less.
566 */
d10a0b88 567 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
e7fd4179 568 if (!ls->ls_recover_buf)
05c32f47 569 goto out_lkbidr;
e7fd4179 570
757a4271
DT
571 ls->ls_slot = 0;
572 ls->ls_num_slots = 0;
573 ls->ls_slots_size = 0;
574 ls->ls_slots = NULL;
575
e7fd4179
DT
576 INIT_LIST_HEAD(&ls->ls_recover_list);
577 spin_lock_init(&ls->ls_recover_list_lock);
1d7c484e
DT
578 idr_init(&ls->ls_recover_idr);
579 spin_lock_init(&ls->ls_recover_idr_lock);
e7fd4179 580 ls->ls_recover_list_count = 0;
597d0cae 581 ls->ls_local_handle = ls;
e7fd4179
DT
582 init_waitqueue_head(&ls->ls_wait_general);
583 INIT_LIST_HEAD(&ls->ls_root_list);
584 init_rwsem(&ls->ls_root_sem);
585
5f88f1ea 586 spin_lock(&lslist_lock);
0f8e0d9a 587 ls->ls_create_count = 1;
5f88f1ea
DT
588 list_add(&ls->ls_list, &lslist);
589 spin_unlock(&lslist_lock);
590
23e8e1aa
DT
591 if (flags & DLM_LSFL_FS) {
592 error = dlm_callback_start(ls);
593 if (error) {
594 log_error(ls, "can't start dlm_callback %d", error);
595 goto out_delist;
596 }
597 }
598
475f230c
DT
599 init_waitqueue_head(&ls->ls_recover_lock_wait);
600
601 /*
602 * Once started, dlm_recoverd first looks for ls in lslist, then
603 * initializes ls_in_recovery as locked in "down" mode. We need
604 * to wait for the wakeup from dlm_recoverd because in_recovery
605 * has to start out in down mode.
606 */
607
e7fd4179
DT
608 error = dlm_recoverd_start(ls);
609 if (error) {
610 log_error(ls, "can't start dlm_recoverd %d", error);
23e8e1aa 611 goto out_callback;
e7fd4179
DT
612 }
613
475f230c
DT
614 wait_event(ls->ls_recover_lock_wait,
615 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
616
0ffddafc
WH
617 /* let kobject handle freeing of ls if there's an error */
618 do_unreg = 1;
619
901195ed
GKH
620 ls->ls_kobj.kset = dlm_kset;
621 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
622 "%s", ls->ls_name);
e7fd4179 623 if (error)
23e8e1aa 624 goto out_recoverd;
901195ed 625 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
79d72b54 626
8b0e7b2c
DT
627 /* This uevent triggers dlm_controld in userspace to add us to the
628 group of nodes that are members of this lockspace (managed by the
629 cluster infrastructure.) Once it's done that, it tells us who the
630 current lockspace members are (via configfs) and then tells the
631 lockspace to start running (via sysfs) in dlm_ls_start(). */
632
e7fd4179
DT
633 error = do_uevent(ls, 1);
634 if (error)
23e8e1aa 635 goto out_recoverd;
79d72b54 636
682bb91b
AA
637 /* wait until recovery is successful or failed */
638 wait_for_completion(&ls->ls_recovery_done);
639 error = ls->ls_recovery_result;
8b0e7b2c
DT
640 if (error)
641 goto out_members;
642
79d72b54
DT
643 dlm_create_debug_file(ls);
644
075f0177 645 log_rinfo(ls, "join complete");
e7fd4179
DT
646 *lockspace = ls;
647 return 0;
648
8b0e7b2c
DT
649 out_members:
650 do_uevent(ls, 0);
651 dlm_clear_members(ls);
652 kfree(ls->ls_node_array);
23e8e1aa 653 out_recoverd:
5f88f1ea 654 dlm_recoverd_stop(ls);
23e8e1aa
DT
655 out_callback:
656 dlm_callback_stop(ls);
79d72b54 657 out_delist:
e7fd4179
DT
658 spin_lock(&lslist_lock);
659 list_del(&ls->ls_list);
660 spin_unlock(&lslist_lock);
1d7c484e 661 idr_destroy(&ls->ls_recover_idr);
e7fd4179 662 kfree(ls->ls_recover_buf);
05c32f47 663 out_lkbidr:
3d6aa675 664 idr_destroy(&ls->ls_lkbidr);
b982896c 665 out_rsbtbl:
3456880f
TM
666 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
667 kfree(ls->ls_remove_names[i]);
c282af49 668 vfree(ls->ls_rsbtbl);
e7fd4179 669 out_lsfree:
79d72b54 670 if (do_unreg)
197b12d6 671 kobject_put(&ls->ls_kobj);
79d72b54
DT
672 else
673 kfree(ls);
e7fd4179
DT
674 out:
675 module_put(THIS_MODULE);
676 return error;
677}
678
12cda13c
AA
679static int __dlm_new_lockspace(const char *name, const char *cluster,
680 uint32_t flags, int lvblen,
681 const struct dlm_lockspace_ops *ops,
682 void *ops_arg, int *ops_result,
683 dlm_lockspace_t **lockspace)
e7fd4179
DT
684{
685 int error = 0;
686
90135925 687 mutex_lock(&ls_lock);
e7fd4179
DT
688 if (!ls_count)
689 error = threads_start();
690 if (error)
691 goto out;
692
60f98d18
DT
693 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
694 ops_result, lockspace);
e7fd4179
DT
695 if (!error)
696 ls_count++;
8511a272
DT
697 if (error > 0)
698 error = 0;
9d232469
AA
699 if (!ls_count) {
700 dlm_scand_stop();
a070a91c 701 dlm_midcomms_shutdown();
8b0188b0 702 dlm_midcomms_stop();
9d232469 703 }
e7fd4179 704 out:
90135925 705 mutex_unlock(&ls_lock);
e7fd4179
DT
706 return error;
707}
708
12cda13c
AA
709int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
710 int lvblen, const struct dlm_lockspace_ops *ops,
711 void *ops_arg, int *ops_result,
712 dlm_lockspace_t **lockspace)
713{
714 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
715 ops, ops_arg, ops_result, lockspace);
716}
717
718int dlm_new_user_lockspace(const char *name, const char *cluster,
719 uint32_t flags, int lvblen,
720 const struct dlm_lockspace_ops *ops,
721 void *ops_arg, int *ops_result,
722 dlm_lockspace_t **lockspace)
723{
724 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
725 ops_arg, ops_result, lockspace);
726}
727
3d6aa675 728static int lkb_idr_is_local(int id, void *p, void *data)
e7fd4179 729{
3d6aa675
DT
730 struct dlm_lkb *lkb = p;
731
a97f4a66 732 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
3d6aa675
DT
733}
734
735static int lkb_idr_is_any(int id, void *p, void *data)
736{
737 return 1;
738}
739
740static int lkb_idr_free(int id, void *p, void *data)
741{
742 struct dlm_lkb *lkb = p;
743
e1af8728 744 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
3d6aa675
DT
745 dlm_free_lvb(lkb->lkb_lvbptr);
746
747 dlm_free_lkb(lkb);
748 return 0;
749}
750
751/* NOTE: We check the lkbidr here rather than the resource table.
752 This is because there may be LKBs queued as ASTs that have been unlinked
753 from their RSBs and are pending deletion once the AST has been delivered */
754
755static int lockspace_busy(struct dlm_ls *ls, int force)
756{
757 int rv;
758
759 spin_lock(&ls->ls_lkbidr_spin);
760 if (force == 0) {
761 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
762 } else if (force == 1) {
763 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
764 } else {
765 rv = 0;
e7fd4179 766 }
3d6aa675
DT
767 spin_unlock(&ls->ls_lkbidr_spin);
768 return rv;
e7fd4179
DT
769}
770
771static int release_lockspace(struct dlm_ls *ls, int force)
772{
e7fd4179 773 struct dlm_rsb *rsb;
9beb3bf5 774 struct rb_node *n;
0f8e0d9a
DT
775 int i, busy, rv;
776
3d6aa675 777 busy = lockspace_busy(ls, force);
0f8e0d9a
DT
778
779 spin_lock(&lslist_lock);
780 if (ls->ls_create_count == 1) {
3d6aa675 781 if (busy) {
0f8e0d9a 782 rv = -EBUSY;
3d6aa675 783 } else {
0f8e0d9a
DT
784 /* remove_lockspace takes ls off lslist */
785 ls->ls_create_count = 0;
786 rv = 0;
787 }
788 } else if (ls->ls_create_count > 1) {
789 rv = --ls->ls_create_count;
790 } else {
791 rv = -EINVAL;
792 }
793 spin_unlock(&lslist_lock);
794
795 if (rv) {
796 log_debug(ls, "release_lockspace no remove %d", rv);
797 return rv;
798 }
e7fd4179 799
b8b750e0
AA
800 if (ls_count == 1)
801 dlm_midcomms_version_wait();
802
0f8e0d9a 803 dlm_device_deregister(ls);
e7fd4179 804
dc68c7ed 805 if (force < 3 && dlm_user_daemon_available())
e7fd4179
DT
806 do_uevent(ls, 0);
807
808 dlm_recoverd_stop(ls);
809
9d232469
AA
810 if (ls_count == 1) {
811 dlm_scand_stop();
ecd95673 812 dlm_clear_members(ls);
a070a91c 813 dlm_midcomms_shutdown();
9d232469
AA
814 }
815
23e8e1aa
DT
816 dlm_callback_stop(ls);
817
e7fd4179
DT
818 remove_lockspace(ls);
819
820 dlm_delete_debug_file(ls);
821
8fc6ed9a 822 idr_destroy(&ls->ls_recover_idr);
e7fd4179
DT
823 kfree(ls->ls_recover_buf);
824
e7fd4179 825 /*
3d6aa675 826 * Free all lkb's in idr
e7fd4179
DT
827 */
828
3d6aa675 829 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
3d6aa675 830 idr_destroy(&ls->ls_lkbidr);
e7fd4179 831
e7fd4179
DT
832 /*
833 * Free all rsb's on rsbtbl[] lists
834 */
835
836 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
9beb3bf5
BP
837 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
838 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
839 rb_erase(n, &ls->ls_rsbtbl[i].keep);
52bda2b5 840 dlm_free_rsb(rsb);
e7fd4179
DT
841 }
842
9beb3bf5
BP
843 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
844 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
845 rb_erase(n, &ls->ls_rsbtbl[i].toss);
52bda2b5 846 dlm_free_rsb(rsb);
e7fd4179
DT
847 }
848 }
849
c282af49 850 vfree(ls->ls_rsbtbl);
e7fd4179 851
05c32f47
DT
852 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
853 kfree(ls->ls_remove_names[i]);
854
3881ac04
DT
855 while (!list_empty(&ls->ls_new_rsb)) {
856 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
857 res_hashchain);
858 list_del(&rsb->res_hashchain);
859 dlm_free_rsb(rsb);
860 }
861
e7fd4179
DT
862 /*
863 * Free structures on any other lists
864 */
865
2896ee37 866 dlm_purge_requestqueue(ls);
e7fd4179 867 kfree(ls->ls_recover_args);
e7fd4179
DT
868 dlm_clear_members(ls);
869 dlm_clear_members_gone(ls);
870 kfree(ls->ls_node_array);
075f0177 871 log_rinfo(ls, "release_lockspace final free");
197b12d6 872 kobject_put(&ls->ls_kobj);
79d72b54 873 /* The ls structure will be freed when the kobject is done with */
e7fd4179 874
e7fd4179
DT
875 module_put(THIS_MODULE);
876 return 0;
877}
878
879/*
880 * Called when a system has released all its locks and is not going to use the
881 * lockspace any longer. We free everything we're managing for this lockspace.
882 * Remaining nodes will go through the recovery process as if we'd died. The
883 * lockspace must continue to function as usual, participating in recoveries,
884 * until this returns.
885 *
886 * Force has 4 possible values:
bb6866a5 887 * 0 - don't destroy lockspace if it has any LKBs
e7fd4179
DT
888 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
889 * 2 - destroy lockspace regardless of LKBs
890 * 3 - destroy lockspace as part of a forced shutdown
891 */
892
893int dlm_release_lockspace(void *lockspace, int force)
894{
895 struct dlm_ls *ls;
0f8e0d9a 896 int error;
e7fd4179
DT
897
898 ls = dlm_find_lockspace_local(lockspace);
899 if (!ls)
900 return -EINVAL;
901 dlm_put_lockspace(ls);
0f8e0d9a
DT
902
903 mutex_lock(&ls_lock);
904 error = release_lockspace(ls, force);
905 if (!error)
906 ls_count--;
278afcbf 907 if (!ls_count)
8b0188b0 908 dlm_midcomms_stop();
0f8e0d9a
DT
909 mutex_unlock(&ls_lock);
910
911 return error;
e7fd4179
DT
912}
913
dc68c7ed
DT
914void dlm_stop_lockspaces(void)
915{
916 struct dlm_ls *ls;
696b3d84 917 int count;
dc68c7ed
DT
918
919 restart:
696b3d84 920 count = 0;
dc68c7ed
DT
921 spin_lock(&lslist_lock);
922 list_for_each_entry(ls, &lslist, ls_list) {
696b3d84
DT
923 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
924 count++;
dc68c7ed 925 continue;
696b3d84 926 }
dc68c7ed
DT
927 spin_unlock(&lslist_lock);
928 log_error(ls, "no userland control daemon, stopping lockspace");
929 dlm_ls_stop(ls);
930 goto restart;
931 }
932 spin_unlock(&lslist_lock);
696b3d84
DT
933
934 if (count)
935 log_print("dlm user daemon left %d lockspaces", count);
dc68c7ed
DT
936}
937
2c3fa6ae
AA
938void dlm_stop_lockspaces_check(void)
939{
940 struct dlm_ls *ls;
941
942 spin_lock(&lslist_lock);
943 list_for_each_entry(ls, &lslist, ls_list) {
944 if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
945 !dlm_locking_stopped(ls)))
946 break;
947 }
948 spin_unlock(&lslist_lock);
949}