[DLM] The core of the DLM for GFS2/CLVM
[linux-2.6-block.git] / fs / dlm / lockspace.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "recoverd.h"
18#include "ast.h"
19#include "dir.h"
20#include "lowcomms.h"
21#include "config.h"
22#include "memory.h"
23#include "lock.h"
24
25#ifdef CONFIG_DLM_DEBUG
26int dlm_create_debug_file(struct dlm_ls *ls);
27void dlm_delete_debug_file(struct dlm_ls *ls);
28#else
29static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
30static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
31#endif
32
33static int ls_count;
34static struct semaphore ls_lock;
35static struct list_head lslist;
36static spinlock_t lslist_lock;
37static struct task_struct * scand_task;
38
39
40static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
41{
42 ssize_t ret = len;
43 int n = simple_strtol(buf, NULL, 0);
44
45 switch (n) {
46 case 0:
47 dlm_ls_stop(ls);
48 break;
49 case 1:
50 dlm_ls_start(ls);
51 break;
52 default:
53 ret = -EINVAL;
54 }
55 return ret;
56}
57
58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59{
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
63 return len;
64}
65
66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67{
68 return sprintf(buf, "%u\n", ls->ls_global_id);
69}
70
71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72{
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 return len;
75}
76
77struct dlm_attr {
78 struct attribute attr;
79 ssize_t (*show)(struct dlm_ls *, char *);
80 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
81};
82
83static struct dlm_attr dlm_attr_control = {
84 .attr = {.name = "control", .mode = S_IWUSR},
85 .store = dlm_control_store
86};
87
88static struct dlm_attr dlm_attr_event = {
89 .attr = {.name = "event_done", .mode = S_IWUSR},
90 .store = dlm_event_store
91};
92
93static struct dlm_attr dlm_attr_id = {
94 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
95 .show = dlm_id_show,
96 .store = dlm_id_store
97};
98
99static struct attribute *dlm_attrs[] = {
100 &dlm_attr_control.attr,
101 &dlm_attr_event.attr,
102 &dlm_attr_id.attr,
103 NULL,
104};
105
106static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
107 char *buf)
108{
109 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
110 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
111 return a->show ? a->show(ls, buf) : 0;
112}
113
114static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
115 const char *buf, size_t len)
116{
117 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
118 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
119 return a->store ? a->store(ls, buf, len) : len;
120}
121
122static struct sysfs_ops dlm_attr_ops = {
123 .show = dlm_attr_show,
124 .store = dlm_attr_store,
125};
126
127static struct kobj_type dlm_ktype = {
128 .default_attrs = dlm_attrs,
129 .sysfs_ops = &dlm_attr_ops,
130};
131
132static struct kset dlm_kset = {
133 .subsys = &kernel_subsys,
134 .kobj = {.name = "dlm",},
135 .ktype = &dlm_ktype,
136};
137
138static int kobject_setup(struct dlm_ls *ls)
139{
140 char lsname[DLM_LOCKSPACE_LEN];
141 int error;
142
143 memset(lsname, 0, DLM_LOCKSPACE_LEN);
144 snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
145
146 error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
147 if (error)
148 return error;
149
150 ls->ls_kobj.kset = &dlm_kset;
151 ls->ls_kobj.ktype = &dlm_ktype;
152 return 0;
153}
154
155static int do_uevent(struct dlm_ls *ls, int in)
156{
157 int error;
158
159 if (in)
160 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
161 else
162 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
163
164 error = wait_event_interruptible(ls->ls_uevent_wait,
165 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
166 if (error)
167 goto out;
168
169 error = ls->ls_uevent_result;
170 out:
171 return error;
172}
173
174
175int dlm_lockspace_init(void)
176{
177 int error;
178
179 ls_count = 0;
180 init_MUTEX(&ls_lock);
181 INIT_LIST_HEAD(&lslist);
182 spin_lock_init(&lslist_lock);
183
184 error = kset_register(&dlm_kset);
185 if (error)
186 printk("dlm_lockspace_init: cannot register kset %d\n", error);
187 return error;
188}
189
190void dlm_lockspace_exit(void)
191{
192 kset_unregister(&dlm_kset);
193}
194
195static int dlm_scand(void *data)
196{
197 struct dlm_ls *ls;
198
199 while (!kthread_should_stop()) {
200 list_for_each_entry(ls, &lslist, ls_list)
201 dlm_scan_rsbs(ls);
202 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
203 }
204 return 0;
205}
206
207static int dlm_scand_start(void)
208{
209 struct task_struct *p;
210 int error = 0;
211
212 p = kthread_run(dlm_scand, NULL, "dlm_scand");
213 if (IS_ERR(p))
214 error = PTR_ERR(p);
215 else
216 scand_task = p;
217 return error;
218}
219
220static void dlm_scand_stop(void)
221{
222 kthread_stop(scand_task);
223}
224
225static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
226{
227 struct dlm_ls *ls;
228
229 spin_lock(&lslist_lock);
230
231 list_for_each_entry(ls, &lslist, ls_list) {
232 if (ls->ls_namelen == namelen &&
233 memcmp(ls->ls_name, name, namelen) == 0)
234 goto out;
235 }
236 ls = NULL;
237 out:
238 spin_unlock(&lslist_lock);
239 return ls;
240}
241
242struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
243{
244 struct dlm_ls *ls;
245
246 spin_lock(&lslist_lock);
247
248 list_for_each_entry(ls, &lslist, ls_list) {
249 if (ls->ls_global_id == id) {
250 ls->ls_count++;
251 goto out;
252 }
253 }
254 ls = NULL;
255 out:
256 spin_unlock(&lslist_lock);
257 return ls;
258}
259
260struct dlm_ls *dlm_find_lockspace_local(void *id)
261{
262 struct dlm_ls *ls = id;
263
264 spin_lock(&lslist_lock);
265 ls->ls_count++;
266 spin_unlock(&lslist_lock);
267 return ls;
268}
269
270void dlm_put_lockspace(struct dlm_ls *ls)
271{
272 spin_lock(&lslist_lock);
273 ls->ls_count--;
274 spin_unlock(&lslist_lock);
275}
276
277static void remove_lockspace(struct dlm_ls *ls)
278{
279 for (;;) {
280 spin_lock(&lslist_lock);
281 if (ls->ls_count == 0) {
282 list_del(&ls->ls_list);
283 spin_unlock(&lslist_lock);
284 return;
285 }
286 spin_unlock(&lslist_lock);
287 ssleep(1);
288 }
289}
290
291static int threads_start(void)
292{
293 int error;
294
295 /* Thread which process lock requests for all lockspace's */
296 error = dlm_astd_start();
297 if (error) {
298 log_print("cannot start dlm_astd thread %d", error);
299 goto fail;
300 }
301
302 error = dlm_scand_start();
303 if (error) {
304 log_print("cannot start dlm_scand thread %d", error);
305 goto astd_fail;
306 }
307
308 /* Thread for sending/receiving messages for all lockspace's */
309 error = dlm_lowcomms_start();
310 if (error) {
311 log_print("cannot start dlm lowcomms %d", error);
312 goto scand_fail;
313 }
314
315 return 0;
316
317 scand_fail:
318 dlm_scand_stop();
319 astd_fail:
320 dlm_astd_stop();
321 fail:
322 return error;
323}
324
325static void threads_stop(void)
326{
327 dlm_scand_stop();
328 dlm_lowcomms_stop();
329 dlm_astd_stop();
330}
331
332static int new_lockspace(char *name, int namelen, void **lockspace,
333 uint32_t flags, int lvblen)
334{
335 struct dlm_ls *ls;
336 int i, size, error = -ENOMEM;
337
338 if (namelen > DLM_LOCKSPACE_LEN)
339 return -EINVAL;
340
341 if (!lvblen || (lvblen % 8))
342 return -EINVAL;
343
344 if (!try_module_get(THIS_MODULE))
345 return -EINVAL;
346
347 ls = dlm_find_lockspace_name(name, namelen);
348 if (ls) {
349 *lockspace = ls;
350 module_put(THIS_MODULE);
351 return -EEXIST;
352 }
353
354 ls = kmalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
355 if (!ls)
356 goto out;
357 memset(ls, 0, sizeof(struct dlm_ls) + namelen);
358 memcpy(ls->ls_name, name, namelen);
359 ls->ls_namelen = namelen;
360 ls->ls_exflags = flags;
361 ls->ls_lvblen = lvblen;
362 ls->ls_count = 0;
363 ls->ls_flags = 0;
364
365 size = dlm_config.rsbtbl_size;
366 ls->ls_rsbtbl_size = size;
367
368 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
369 if (!ls->ls_rsbtbl)
370 goto out_lsfree;
371 for (i = 0; i < size; i++) {
372 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
373 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
374 rwlock_init(&ls->ls_rsbtbl[i].lock);
375 }
376
377 size = dlm_config.lkbtbl_size;
378 ls->ls_lkbtbl_size = size;
379
380 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
381 if (!ls->ls_lkbtbl)
382 goto out_rsbfree;
383 for (i = 0; i < size; i++) {
384 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
385 rwlock_init(&ls->ls_lkbtbl[i].lock);
386 ls->ls_lkbtbl[i].counter = 1;
387 }
388
389 size = dlm_config.dirtbl_size;
390 ls->ls_dirtbl_size = size;
391
392 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
393 if (!ls->ls_dirtbl)
394 goto out_lkbfree;
395 for (i = 0; i < size; i++) {
396 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
397 rwlock_init(&ls->ls_dirtbl[i].lock);
398 }
399
400 INIT_LIST_HEAD(&ls->ls_waiters);
401 init_MUTEX(&ls->ls_waiters_sem);
402
403 INIT_LIST_HEAD(&ls->ls_nodes);
404 INIT_LIST_HEAD(&ls->ls_nodes_gone);
405 ls->ls_num_nodes = 0;
406 ls->ls_low_nodeid = 0;
407 ls->ls_total_weight = 0;
408 ls->ls_node_array = NULL;
409
410 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
411 ls->ls_stub_rsb.res_ls = ls;
412
413 ls->ls_debug_dentry = NULL;
414
415 init_waitqueue_head(&ls->ls_uevent_wait);
416 ls->ls_uevent_result = 0;
417
418 ls->ls_recoverd_task = NULL;
419 init_MUTEX(&ls->ls_recoverd_active);
420 spin_lock_init(&ls->ls_recover_lock);
421 ls->ls_recover_status = 0;
422 ls->ls_recover_seq = 0;
423 ls->ls_recover_args = NULL;
424 init_rwsem(&ls->ls_in_recovery);
425 INIT_LIST_HEAD(&ls->ls_requestqueue);
426 init_MUTEX(&ls->ls_requestqueue_lock);
427
428 ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
429 if (!ls->ls_recover_buf)
430 goto out_dirfree;
431
432 INIT_LIST_HEAD(&ls->ls_recover_list);
433 spin_lock_init(&ls->ls_recover_list_lock);
434 ls->ls_recover_list_count = 0;
435 init_waitqueue_head(&ls->ls_wait_general);
436 INIT_LIST_HEAD(&ls->ls_root_list);
437 init_rwsem(&ls->ls_root_sem);
438
439 down_write(&ls->ls_in_recovery);
440
441 error = dlm_recoverd_start(ls);
442 if (error) {
443 log_error(ls, "can't start dlm_recoverd %d", error);
444 goto out_rcomfree;
445 }
446
447 spin_lock(&lslist_lock);
448 list_add(&ls->ls_list, &lslist);
449 spin_unlock(&lslist_lock);
450
451 dlm_create_debug_file(ls);
452
453 error = kobject_setup(ls);
454 if (error)
455 goto out_del;
456
457 error = kobject_register(&ls->ls_kobj);
458 if (error)
459 goto out_del;
460
461 error = do_uevent(ls, 1);
462 if (error)
463 goto out_unreg;
464
465 *lockspace = ls;
466 return 0;
467
468 out_unreg:
469 kobject_unregister(&ls->ls_kobj);
470 out_del:
471 dlm_delete_debug_file(ls);
472 spin_lock(&lslist_lock);
473 list_del(&ls->ls_list);
474 spin_unlock(&lslist_lock);
475 dlm_recoverd_stop(ls);
476 out_rcomfree:
477 kfree(ls->ls_recover_buf);
478 out_dirfree:
479 kfree(ls->ls_dirtbl);
480 out_lkbfree:
481 kfree(ls->ls_lkbtbl);
482 out_rsbfree:
483 kfree(ls->ls_rsbtbl);
484 out_lsfree:
485 kfree(ls);
486 out:
487 module_put(THIS_MODULE);
488 return error;
489}
490
491int dlm_new_lockspace(char *name, int namelen, void **lockspace,
492 uint32_t flags, int lvblen)
493{
494 int error = 0;
495
496 down(&ls_lock);
497 if (!ls_count)
498 error = threads_start();
499 if (error)
500 goto out;
501
502 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
503 if (!error)
504 ls_count++;
505 out:
506 up(&ls_lock);
507 return error;
508}
509
510/* Return 1 if the lockspace still has active remote locks,
511 * 2 if the lockspace still has active local locks.
512 */
513static int lockspace_busy(struct dlm_ls *ls)
514{
515 int i, lkb_found = 0;
516 struct dlm_lkb *lkb;
517
518 /* NOTE: We check the lockidtbl here rather than the resource table.
519 This is because there may be LKBs queued as ASTs that have been
520 unlinked from their RSBs and are pending deletion once the AST has
521 been delivered */
522
523 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
524 read_lock(&ls->ls_lkbtbl[i].lock);
525 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
526 lkb_found = 1;
527 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
528 lkb_idtbl_list) {
529 if (!lkb->lkb_nodeid) {
530 read_unlock(&ls->ls_lkbtbl[i].lock);
531 return 2;
532 }
533 }
534 }
535 read_unlock(&ls->ls_lkbtbl[i].lock);
536 }
537 return lkb_found;
538}
539
540static int release_lockspace(struct dlm_ls *ls, int force)
541{
542 struct dlm_lkb *lkb;
543 struct dlm_rsb *rsb;
544 struct list_head *head;
545 int i;
546 int busy = lockspace_busy(ls);
547
548 if (busy > force)
549 return -EBUSY;
550
551 if (force < 3)
552 do_uevent(ls, 0);
553
554 dlm_recoverd_stop(ls);
555
556 remove_lockspace(ls);
557
558 dlm_delete_debug_file(ls);
559
560 dlm_astd_suspend();
561
562 kfree(ls->ls_recover_buf);
563
564 /*
565 * Free direntry structs.
566 */
567
568 dlm_dir_clear(ls);
569 kfree(ls->ls_dirtbl);
570
571 /*
572 * Free all lkb's on lkbtbl[] lists.
573 */
574
575 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
576 head = &ls->ls_lkbtbl[i].list;
577 while (!list_empty(head)) {
578 lkb = list_entry(head->next, struct dlm_lkb,
579 lkb_idtbl_list);
580
581 list_del(&lkb->lkb_idtbl_list);
582
583 dlm_del_ast(lkb);
584
585 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
586 free_lvb(lkb->lkb_lvbptr);
587
588 free_lkb(lkb);
589 }
590 }
591 dlm_astd_resume();
592
593 kfree(ls->ls_lkbtbl);
594
595 /*
596 * Free all rsb's on rsbtbl[] lists
597 */
598
599 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
600 head = &ls->ls_rsbtbl[i].list;
601 while (!list_empty(head)) {
602 rsb = list_entry(head->next, struct dlm_rsb,
603 res_hashchain);
604
605 list_del(&rsb->res_hashchain);
606 free_rsb(rsb);
607 }
608
609 head = &ls->ls_rsbtbl[i].toss;
610 while (!list_empty(head)) {
611 rsb = list_entry(head->next, struct dlm_rsb,
612 res_hashchain);
613 list_del(&rsb->res_hashchain);
614 free_rsb(rsb);
615 }
616 }
617
618 kfree(ls->ls_rsbtbl);
619
620 /*
621 * Free structures on any other lists
622 */
623
624 kfree(ls->ls_recover_args);
625 dlm_clear_free_entries(ls);
626 dlm_clear_members(ls);
627 dlm_clear_members_gone(ls);
628 kfree(ls->ls_node_array);
629 kobject_unregister(&ls->ls_kobj);
630 kfree(ls);
631
632 down(&ls_lock);
633 ls_count--;
634 if (!ls_count)
635 threads_stop();
636 up(&ls_lock);
637
638 module_put(THIS_MODULE);
639 return 0;
640}
641
642/*
643 * Called when a system has released all its locks and is not going to use the
644 * lockspace any longer. We free everything we're managing for this lockspace.
645 * Remaining nodes will go through the recovery process as if we'd died. The
646 * lockspace must continue to function as usual, participating in recoveries,
647 * until this returns.
648 *
649 * Force has 4 possible values:
650 * 0 - don't destroy locksapce if it has any LKBs
651 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
652 * 2 - destroy lockspace regardless of LKBs
653 * 3 - destroy lockspace as part of a forced shutdown
654 */
655
656int dlm_release_lockspace(void *lockspace, int force)
657{
658 struct dlm_ls *ls;
659
660 ls = dlm_find_lockspace_local(lockspace);
661 if (!ls)
662 return -EINVAL;
663 dlm_put_lockspace(ls);
664 return release_lockspace(ls, force);
665}
666