[PATCH] ocfs2: fix hang in dlm lock resource mastery
[linux-2.6-block.git] / fs / ocfs2 / dlm / dlmmaster.c
CommitLineData
6714d8e8
KH
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmmod.c
5 *
6 * standalone DLM module
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27
28#include <linux/module.h>
29#include <linux/fs.h>
30#include <linux/types.h>
31#include <linux/slab.h>
32#include <linux/highmem.h>
33#include <linux/utsname.h>
34#include <linux/init.h>
35#include <linux/sysctl.h>
36#include <linux/random.h>
37#include <linux/blkdev.h>
38#include <linux/socket.h>
39#include <linux/inet.h>
40#include <linux/spinlock.h>
41#include <linux/delay.h>
42
43
44#include "cluster/heartbeat.h"
45#include "cluster/nodemanager.h"
46#include "cluster/tcp.h"
47
48#include "dlmapi.h"
49#include "dlmcommon.h"
50#include "dlmdebug.h"
82353b59 51#include "dlmdomain.h"
6714d8e8
KH
52
53#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54#include "cluster/masklog.h"
55
56enum dlm_mle_type {
57 DLM_MLE_BLOCK,
58 DLM_MLE_MASTER,
59 DLM_MLE_MIGRATION
60};
61
62struct dlm_lock_name
63{
64 u8 len;
65 u8 name[DLM_LOCKID_NAME_MAX];
66};
67
68struct dlm_master_list_entry
69{
70 struct list_head list;
71 struct list_head hb_events;
72 struct dlm_ctxt *dlm;
73 spinlock_t spinlock;
74 wait_queue_head_t wq;
75 atomic_t woken;
76 struct kref mle_refs;
77 unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78 unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79 unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
80 unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
81 u8 master;
82 u8 new_master;
83 enum dlm_mle_type type;
84 struct o2hb_callback_func mle_hb_up;
85 struct o2hb_callback_func mle_hb_down;
86 union {
87 struct dlm_lock_resource *res;
88 struct dlm_lock_name name;
89 } u;
90};
91
92static void dlm_mle_node_down(struct dlm_ctxt *dlm,
93 struct dlm_master_list_entry *mle,
94 struct o2nm_node *node,
95 int idx);
96static void dlm_mle_node_up(struct dlm_ctxt *dlm,
97 struct dlm_master_list_entry *mle,
98 struct o2nm_node *node,
99 int idx);
100
101static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
103 unsigned int namelen, void *nodemap,
104 u32 flags);
105
106static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107 struct dlm_master_list_entry *mle,
108 const char *name,
109 unsigned int namelen)
110{
111 struct dlm_lock_resource *res;
112
113 if (dlm != mle->dlm)
114 return 0;
115
116 if (mle->type == DLM_MLE_BLOCK ||
117 mle->type == DLM_MLE_MIGRATION) {
118 if (namelen != mle->u.name.len ||
119 memcmp(name, mle->u.name.name, namelen)!=0)
120 return 0;
121 } else {
122 res = mle->u.res;
123 if (namelen != res->lockname.len ||
124 memcmp(res->lockname.name, name, namelen) != 0)
125 return 0;
126 }
127 return 1;
128}
129
130#if 0
131/* Code here is included but defined out as it aids debugging */
132
133void dlm_print_one_mle(struct dlm_master_list_entry *mle)
134{
135 int i = 0, refs;
136 char *type;
137 char attached;
138 u8 master;
139 unsigned int namelen;
140 const char *name;
141 struct kref *k;
142
143 k = &mle->mle_refs;
144 if (mle->type == DLM_MLE_BLOCK)
145 type = "BLK";
146 else if (mle->type == DLM_MLE_MASTER)
147 type = "MAS";
148 else
149 type = "MIG";
150 refs = atomic_read(&k->refcount);
151 master = mle->master;
152 attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
153
154 if (mle->type != DLM_MLE_MASTER) {
155 namelen = mle->u.name.len;
156 name = mle->u.name.name;
157 } else {
158 namelen = mle->u.res->lockname.len;
159 name = mle->u.res->lockname.name;
160 }
161
162 mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n",
163 i, type, refs, master, mle->new_master, attached,
164 namelen, namelen, name);
165}
166
167static void dlm_dump_mles(struct dlm_ctxt *dlm)
168{
169 struct dlm_master_list_entry *mle;
170 struct list_head *iter;
171
172 mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
173 mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
174 spin_lock(&dlm->master_lock);
175 list_for_each(iter, &dlm->master_list) {
176 mle = list_entry(iter, struct dlm_master_list_entry, list);
177 dlm_print_one_mle(mle);
178 }
179 spin_unlock(&dlm->master_lock);
180}
181
6714d8e8
KH
182int dlm_dump_all_mles(const char __user *data, unsigned int len)
183{
184 struct list_head *iter;
185 struct dlm_ctxt *dlm;
186
187 spin_lock(&dlm_domain_lock);
188 list_for_each(iter, &dlm_domains) {
189 dlm = list_entry (iter, struct dlm_ctxt, list);
190 mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
191 dlm_dump_mles(dlm);
192 }
193 spin_unlock(&dlm_domain_lock);
194 return len;
195}
196EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
197
198#endif /* 0 */
199
200
201static kmem_cache_t *dlm_mle_cache = NULL;
202
203
204static void dlm_mle_release(struct kref *kref);
205static void dlm_init_mle(struct dlm_master_list_entry *mle,
206 enum dlm_mle_type type,
207 struct dlm_ctxt *dlm,
208 struct dlm_lock_resource *res,
209 const char *name,
210 unsigned int namelen);
211static void dlm_put_mle(struct dlm_master_list_entry *mle);
212static void __dlm_put_mle(struct dlm_master_list_entry *mle);
213static int dlm_find_mle(struct dlm_ctxt *dlm,
214 struct dlm_master_list_entry **mle,
215 char *name, unsigned int namelen);
216
217static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
218
219
220static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
221 struct dlm_lock_resource *res,
222 struct dlm_master_list_entry *mle,
223 int *blocked);
224static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
225 struct dlm_lock_resource *res,
226 struct dlm_master_list_entry *mle,
227 int blocked);
228static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
229 struct dlm_lock_resource *res,
230 struct dlm_master_list_entry *mle,
231 struct dlm_master_list_entry **oldmle,
232 const char *name, unsigned int namelen,
233 u8 new_master, u8 master);
234
235static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
236 struct dlm_lock_resource *res);
237static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
238 struct dlm_lock_resource *res);
239static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
240 struct dlm_lock_resource *res,
241 u8 target);
242
243
244int dlm_is_host_down(int errno)
245{
246 switch (errno) {
247 case -EBADF:
248 case -ECONNREFUSED:
249 case -ENOTCONN:
250 case -ECONNRESET:
251 case -EPIPE:
252 case -EHOSTDOWN:
253 case -EHOSTUNREACH:
254 case -ETIMEDOUT:
255 case -ECONNABORTED:
256 case -ENETDOWN:
257 case -ENETUNREACH:
258 case -ENETRESET:
259 case -ESHUTDOWN:
260 case -ENOPROTOOPT:
261 case -EINVAL: /* if returned from our tcp code,
262 this means there is no socket */
263 return 1;
264 }
265 return 0;
266}
267
268
269/*
270 * MASTER LIST FUNCTIONS
271 */
272
273
274/*
275 * regarding master list entries and heartbeat callbacks:
276 *
277 * in order to avoid sleeping and allocation that occurs in
278 * heartbeat, master list entries are simply attached to the
279 * dlm's established heartbeat callbacks. the mle is attached
280 * when it is created, and since the dlm->spinlock is held at
281 * that time, any heartbeat event will be properly discovered
282 * by the mle. the mle needs to be detached from the
283 * dlm->mle_hb_events list as soon as heartbeat events are no
284 * longer useful to the mle, and before the mle is freed.
285 *
286 * as a general rule, heartbeat events are no longer needed by
287 * the mle once an "answer" regarding the lock master has been
288 * received.
289 */
290static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
291 struct dlm_master_list_entry *mle)
292{
293 assert_spin_locked(&dlm->spinlock);
294
295 list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
296}
297
298
299static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
300 struct dlm_master_list_entry *mle)
301{
302 if (!list_empty(&mle->hb_events))
303 list_del_init(&mle->hb_events);
304}
305
306
307static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
308 struct dlm_master_list_entry *mle)
309{
310 spin_lock(&dlm->spinlock);
311 __dlm_mle_detach_hb_events(dlm, mle);
312 spin_unlock(&dlm->spinlock);
313}
314
315/* remove from list and free */
316static void __dlm_put_mle(struct dlm_master_list_entry *mle)
317{
318 struct dlm_ctxt *dlm;
319 dlm = mle->dlm;
320
321 assert_spin_locked(&dlm->spinlock);
322 assert_spin_locked(&dlm->master_lock);
323 BUG_ON(!atomic_read(&mle->mle_refs.refcount));
324
325 kref_put(&mle->mle_refs, dlm_mle_release);
326}
327
328
329/* must not have any spinlocks coming in */
330static void dlm_put_mle(struct dlm_master_list_entry *mle)
331{
332 struct dlm_ctxt *dlm;
333 dlm = mle->dlm;
334
335 spin_lock(&dlm->spinlock);
336 spin_lock(&dlm->master_lock);
337 __dlm_put_mle(mle);
338 spin_unlock(&dlm->master_lock);
339 spin_unlock(&dlm->spinlock);
340}
341
342static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
343{
344 kref_get(&mle->mle_refs);
345}
346
347static void dlm_init_mle(struct dlm_master_list_entry *mle,
348 enum dlm_mle_type type,
349 struct dlm_ctxt *dlm,
350 struct dlm_lock_resource *res,
351 const char *name,
352 unsigned int namelen)
353{
354 assert_spin_locked(&dlm->spinlock);
355
356 mle->dlm = dlm;
357 mle->type = type;
358 INIT_LIST_HEAD(&mle->list);
359 INIT_LIST_HEAD(&mle->hb_events);
360 memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
361 spin_lock_init(&mle->spinlock);
362 init_waitqueue_head(&mle->wq);
363 atomic_set(&mle->woken, 0);
364 kref_init(&mle->mle_refs);
365 memset(mle->response_map, 0, sizeof(mle->response_map));
366 mle->master = O2NM_MAX_NODES;
367 mle->new_master = O2NM_MAX_NODES;
368
369 if (mle->type == DLM_MLE_MASTER) {
370 BUG_ON(!res);
371 mle->u.res = res;
372 } else if (mle->type == DLM_MLE_BLOCK) {
373 BUG_ON(!name);
374 memcpy(mle->u.name.name, name, namelen);
375 mle->u.name.len = namelen;
376 } else /* DLM_MLE_MIGRATION */ {
377 BUG_ON(!name);
378 memcpy(mle->u.name.name, name, namelen);
379 mle->u.name.len = namelen;
380 }
381
382 /* copy off the node_map and register hb callbacks on our copy */
383 memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
384 memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
385 clear_bit(dlm->node_num, mle->vote_map);
386 clear_bit(dlm->node_num, mle->node_map);
387
388 /* attach the mle to the domain node up/down events */
389 __dlm_mle_attach_hb_events(dlm, mle);
390}
391
392
393/* returns 1 if found, 0 if not */
394static int dlm_find_mle(struct dlm_ctxt *dlm,
395 struct dlm_master_list_entry **mle,
396 char *name, unsigned int namelen)
397{
398 struct dlm_master_list_entry *tmpmle;
399 struct list_head *iter;
400
401 assert_spin_locked(&dlm->master_lock);
402
403 list_for_each(iter, &dlm->master_list) {
404 tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
405 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
406 continue;
407 dlm_get_mle(tmpmle);
408 *mle = tmpmle;
409 return 1;
410 }
411 return 0;
412}
413
414void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
415{
416 struct dlm_master_list_entry *mle;
417 struct list_head *iter;
418
419 assert_spin_locked(&dlm->spinlock);
420
421 list_for_each(iter, &dlm->mle_hb_events) {
422 mle = list_entry(iter, struct dlm_master_list_entry,
423 hb_events);
424 if (node_up)
425 dlm_mle_node_up(dlm, mle, NULL, idx);
426 else
427 dlm_mle_node_down(dlm, mle, NULL, idx);
428 }
429}
430
431static void dlm_mle_node_down(struct dlm_ctxt *dlm,
432 struct dlm_master_list_entry *mle,
433 struct o2nm_node *node, int idx)
434{
435 spin_lock(&mle->spinlock);
436
437 if (!test_bit(idx, mle->node_map))
438 mlog(0, "node %u already removed from nodemap!\n", idx);
439 else
440 clear_bit(idx, mle->node_map);
441
442 spin_unlock(&mle->spinlock);
443}
444
445static void dlm_mle_node_up(struct dlm_ctxt *dlm,
446 struct dlm_master_list_entry *mle,
447 struct o2nm_node *node, int idx)
448{
449 spin_lock(&mle->spinlock);
450
451 if (test_bit(idx, mle->node_map))
452 mlog(0, "node %u already in node map!\n", idx);
453 else
454 set_bit(idx, mle->node_map);
455
456 spin_unlock(&mle->spinlock);
457}
458
459
460int dlm_init_mle_cache(void)
461{
462 dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
463 sizeof(struct dlm_master_list_entry),
464 0, SLAB_HWCACHE_ALIGN,
465 NULL, NULL);
466 if (dlm_mle_cache == NULL)
467 return -ENOMEM;
468 return 0;
469}
470
471void dlm_destroy_mle_cache(void)
472{
473 if (dlm_mle_cache)
474 kmem_cache_destroy(dlm_mle_cache);
475}
476
477static void dlm_mle_release(struct kref *kref)
478{
479 struct dlm_master_list_entry *mle;
480 struct dlm_ctxt *dlm;
481
482 mlog_entry_void();
483
484 mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
485 dlm = mle->dlm;
486
487 if (mle->type != DLM_MLE_MASTER) {
488 mlog(0, "calling mle_release for %.*s, type %d\n",
489 mle->u.name.len, mle->u.name.name, mle->type);
490 } else {
491 mlog(0, "calling mle_release for %.*s, type %d\n",
492 mle->u.res->lockname.len,
493 mle->u.res->lockname.name, mle->type);
494 }
495 assert_spin_locked(&dlm->spinlock);
496 assert_spin_locked(&dlm->master_lock);
497
498 /* remove from list if not already */
499 if (!list_empty(&mle->list))
500 list_del_init(&mle->list);
501
502 /* detach the mle from the domain node up/down events */
503 __dlm_mle_detach_hb_events(dlm, mle);
504
505 /* NOTE: kfree under spinlock here.
506 * if this is bad, we can move this to a freelist. */
507 kmem_cache_free(dlm_mle_cache, mle);
508}
509
510
511/*
512 * LOCK RESOURCE FUNCTIONS
513 */
514
515static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
516 struct dlm_lock_resource *res,
517 u8 owner)
518{
519 assert_spin_locked(&res->spinlock);
520
521 mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
522
523 if (owner == dlm->node_num)
524 atomic_inc(&dlm->local_resources);
525 else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
526 atomic_inc(&dlm->unknown_resources);
527 else
528 atomic_inc(&dlm->remote_resources);
529
530 res->owner = owner;
531}
532
533void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
534 struct dlm_lock_resource *res, u8 owner)
535{
536 assert_spin_locked(&res->spinlock);
537
538 if (owner == res->owner)
539 return;
540
541 if (res->owner == dlm->node_num)
542 atomic_dec(&dlm->local_resources);
543 else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
544 atomic_dec(&dlm->unknown_resources);
545 else
546 atomic_dec(&dlm->remote_resources);
547
548 dlm_set_lockres_owner(dlm, res, owner);
549}
550
551
552static void dlm_lockres_release(struct kref *kref)
553{
554 struct dlm_lock_resource *res;
555
556 res = container_of(kref, struct dlm_lock_resource, refs);
557
558 /* This should not happen -- all lockres' have a name
559 * associated with them at init time. */
560 BUG_ON(!res->lockname.name);
561
562 mlog(0, "destroying lockres %.*s\n", res->lockname.len,
563 res->lockname.name);
564
565 /* By the time we're ready to blow this guy away, we shouldn't
566 * be on any lists. */
81f2094a 567 BUG_ON(!hlist_unhashed(&res->hash_node));
6714d8e8
KH
568 BUG_ON(!list_empty(&res->granted));
569 BUG_ON(!list_empty(&res->converting));
570 BUG_ON(!list_empty(&res->blocked));
571 BUG_ON(!list_empty(&res->dirty));
572 BUG_ON(!list_empty(&res->recovering));
573 BUG_ON(!list_empty(&res->purge));
574
575 kfree(res->lockname.name);
576
577 kfree(res);
578}
579
580void dlm_lockres_get(struct dlm_lock_resource *res)
581{
582 kref_get(&res->refs);
583}
584
585void dlm_lockres_put(struct dlm_lock_resource *res)
586{
587 kref_put(&res->refs, dlm_lockres_release);
588}
589
590static void dlm_init_lockres(struct dlm_ctxt *dlm,
591 struct dlm_lock_resource *res,
592 const char *name, unsigned int namelen)
593{
594 char *qname;
595
596 /* If we memset here, we lose our reference to the kmalloc'd
597 * res->lockname.name, so be sure to init every field
598 * correctly! */
599
600 qname = (char *) res->lockname.name;
601 memcpy(qname, name, namelen);
602
603 res->lockname.len = namelen;
604 res->lockname.hash = full_name_hash(name, namelen);
605
606 init_waitqueue_head(&res->wq);
607 spin_lock_init(&res->spinlock);
81f2094a 608 INIT_HLIST_NODE(&res->hash_node);
6714d8e8
KH
609 INIT_LIST_HEAD(&res->granted);
610 INIT_LIST_HEAD(&res->converting);
611 INIT_LIST_HEAD(&res->blocked);
612 INIT_LIST_HEAD(&res->dirty);
613 INIT_LIST_HEAD(&res->recovering);
614 INIT_LIST_HEAD(&res->purge);
615 atomic_set(&res->asts_reserved, 0);
616 res->migration_pending = 0;
617
618 kref_init(&res->refs);
619
620 /* just for consistency */
621 spin_lock(&res->spinlock);
622 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
623 spin_unlock(&res->spinlock);
624
625 res->state = DLM_LOCK_RES_IN_PROGRESS;
626
627 res->last_used = 0;
628
629 memset(res->lvb, 0, DLM_LVB_LEN);
630}
631
632struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
633 const char *name,
634 unsigned int namelen)
635{
636 struct dlm_lock_resource *res;
637
638 res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
639 if (!res)
640 return NULL;
641
642 res->lockname.name = kmalloc(namelen, GFP_KERNEL);
643 if (!res->lockname.name) {
644 kfree(res);
645 return NULL;
646 }
647
648 dlm_init_lockres(dlm, res, name, namelen);
649 return res;
650}
651
652/*
653 * lookup a lock resource by name.
654 * may already exist in the hashtable.
655 * lockid is null terminated
656 *
657 * if not, allocate enough for the lockres and for
658 * the temporary structure used in doing the mastering.
659 *
660 * also, do a lookup in the dlm->master_list to see
661 * if another node has begun mastering the same lock.
662 * if so, there should be a block entry in there
663 * for this name, and we should *not* attempt to master
664 * the lock here. need to wait around for that node
665 * to assert_master (or die).
666 *
667 */
668struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
669 const char *lockid,
670 int flags)
671{
672 struct dlm_lock_resource *tmpres=NULL, *res=NULL;
673 struct dlm_master_list_entry *mle = NULL;
674 struct dlm_master_list_entry *alloc_mle = NULL;
675 int blocked = 0;
676 int ret, nodenum;
677 struct dlm_node_iter iter;
678 unsigned int namelen;
679 int tries = 0;
680
681 BUG_ON(!lockid);
682
683 namelen = strlen(lockid);
684
685 mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
686
687lookup:
688 spin_lock(&dlm->spinlock);
689 tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
690 if (tmpres) {
691 spin_unlock(&dlm->spinlock);
692 mlog(0, "found in hash!\n");
693 if (res)
694 dlm_lockres_put(res);
695 res = tmpres;
696 goto leave;
697 }
698
699 if (!res) {
700 spin_unlock(&dlm->spinlock);
701 mlog(0, "allocating a new resource\n");
702 /* nothing found and we need to allocate one. */
703 alloc_mle = (struct dlm_master_list_entry *)
704 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
705 if (!alloc_mle)
706 goto leave;
707 res = dlm_new_lockres(dlm, lockid, namelen);
708 if (!res)
709 goto leave;
710 goto lookup;
711 }
712
713 mlog(0, "no lockres found, allocated our own: %p\n", res);
714
715 if (flags & LKM_LOCAL) {
716 /* caller knows it's safe to assume it's not mastered elsewhere
717 * DONE! return right away */
718 spin_lock(&res->spinlock);
719 dlm_change_lockres_owner(dlm, res, dlm->node_num);
720 __dlm_insert_lockres(dlm, res);
721 spin_unlock(&res->spinlock);
722 spin_unlock(&dlm->spinlock);
723 /* lockres still marked IN_PROGRESS */
724 goto wake_waiters;
725 }
726
727 /* check master list to see if another node has started mastering it */
728 spin_lock(&dlm->master_lock);
729
730 /* if we found a block, wait for lock to be mastered by another node */
731 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
732 if (blocked) {
733 if (mle->type == DLM_MLE_MASTER) {
734 mlog(ML_ERROR, "master entry for nonexistent lock!\n");
735 BUG();
736 } else if (mle->type == DLM_MLE_MIGRATION) {
737 /* migration is in progress! */
738 /* the good news is that we now know the
739 * "current" master (mle->master). */
740
741 spin_unlock(&dlm->master_lock);
742 assert_spin_locked(&dlm->spinlock);
743
744 /* set the lockres owner and hash it */
745 spin_lock(&res->spinlock);
746 dlm_set_lockres_owner(dlm, res, mle->master);
747 __dlm_insert_lockres(dlm, res);
748 spin_unlock(&res->spinlock);
749 spin_unlock(&dlm->spinlock);
750
751 /* master is known, detach */
752 dlm_mle_detach_hb_events(dlm, mle);
753 dlm_put_mle(mle);
754 mle = NULL;
755 goto wake_waiters;
756 }
757 } else {
758 /* go ahead and try to master lock on this node */
759 mle = alloc_mle;
760 /* make sure this does not get freed below */
761 alloc_mle = NULL;
762 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
763 set_bit(dlm->node_num, mle->maybe_map);
764 list_add(&mle->list, &dlm->master_list);
765 }
766
767 /* at this point there is either a DLM_MLE_BLOCK or a
768 * DLM_MLE_MASTER on the master list, so it's safe to add the
769 * lockres to the hashtable. anyone who finds the lock will
770 * still have to wait on the IN_PROGRESS. */
771
772 /* finally add the lockres to its hash bucket */
773 __dlm_insert_lockres(dlm, res);
774 /* get an extra ref on the mle in case this is a BLOCK
775 * if so, the creator of the BLOCK may try to put the last
776 * ref at this time in the assert master handler, so we
777 * need an extra one to keep from a bad ptr deref. */
778 dlm_get_mle(mle);
779 spin_unlock(&dlm->master_lock);
780 spin_unlock(&dlm->spinlock);
781
782 /* must wait for lock to be mastered elsewhere */
783 if (blocked)
784 goto wait;
785
786redo_request:
787 ret = -EINVAL;
788 dlm_node_iter_init(mle->vote_map, &iter);
789 while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
790 ret = dlm_do_master_request(mle, nodenum);
791 if (ret < 0)
792 mlog_errno(ret);
793 if (mle->master != O2NM_MAX_NODES) {
794 /* found a master ! */
9c6510a5
KH
795 if (mle->master <= nodenum)
796 break;
797 /* if our master request has not reached the master
798 * yet, keep going until it does. this is how the
799 * master will know that asserts are needed back to
800 * the lower nodes. */
801 mlog(0, "%s:%.*s: requests only up to %u but master "
802 "is %u, keep going\n", dlm->name, namelen,
803 lockid, nodenum, mle->master);
6714d8e8
KH
804 }
805 }
806
807wait:
808 /* keep going until the response map includes all nodes */
809 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
810 if (ret < 0) {
811 mlog(0, "%s:%.*s: node map changed, redo the "
812 "master request now, blocked=%d\n",
813 dlm->name, res->lockname.len,
814 res->lockname.name, blocked);
815 if (++tries > 20) {
816 mlog(ML_ERROR, "%s:%.*s: spinning on "
817 "dlm_wait_for_lock_mastery, blocked=%d\n",
818 dlm->name, res->lockname.len,
819 res->lockname.name, blocked);
820 dlm_print_one_lock_resource(res);
821 /* dlm_print_one_mle(mle); */
822 tries = 0;
823 }
824 goto redo_request;
825 }
826
827 mlog(0, "lockres mastered by %u\n", res->owner);
828 /* make sure we never continue without this */
829 BUG_ON(res->owner == O2NM_MAX_NODES);
830
831 /* master is known, detach if not already detached */
832 dlm_mle_detach_hb_events(dlm, mle);
833 dlm_put_mle(mle);
834 /* put the extra ref */
835 dlm_put_mle(mle);
836
837wake_waiters:
838 spin_lock(&res->spinlock);
839 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
840 spin_unlock(&res->spinlock);
841 wake_up(&res->wq);
842
843leave:
844 /* need to free the unused mle */
845 if (alloc_mle)
846 kmem_cache_free(dlm_mle_cache, alloc_mle);
847
848 return res;
849}
850
851
852#define DLM_MASTERY_TIMEOUT_MS 5000
853
854static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
855 struct dlm_lock_resource *res,
856 struct dlm_master_list_entry *mle,
857 int *blocked)
858{
859 u8 m;
860 int ret, bit;
861 int map_changed, voting_done;
862 int assert, sleep;
863
864recheck:
865 ret = 0;
866 assert = 0;
867
868 /* check if another node has already become the owner */
869 spin_lock(&res->spinlock);
870 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
9c6510a5
KH
871 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
872 res->lockname.len, res->lockname.name, res->owner);
6714d8e8 873 spin_unlock(&res->spinlock);
9c6510a5
KH
874 /* this will cause the master to re-assert across
875 * the whole cluster, freeing up mles */
876 ret = dlm_do_master_request(mle, res->owner);
877 if (ret < 0) {
878 /* give recovery a chance to run */
879 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
880 msleep(500);
881 goto recheck;
882 }
883 ret = 0;
6714d8e8
KH
884 goto leave;
885 }
886 spin_unlock(&res->spinlock);
887
888 spin_lock(&mle->spinlock);
889 m = mle->master;
890 map_changed = (memcmp(mle->vote_map, mle->node_map,
891 sizeof(mle->vote_map)) != 0);
892 voting_done = (memcmp(mle->vote_map, mle->response_map,
893 sizeof(mle->vote_map)) == 0);
894
895 /* restart if we hit any errors */
896 if (map_changed) {
897 int b;
898 mlog(0, "%s: %.*s: node map changed, restarting\n",
899 dlm->name, res->lockname.len, res->lockname.name);
900 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
901 b = (mle->type == DLM_MLE_BLOCK);
902 if ((*blocked && !b) || (!*blocked && b)) {
903 mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
904 dlm->name, res->lockname.len, res->lockname.name,
905 *blocked, b);
906 *blocked = b;
907 }
908 spin_unlock(&mle->spinlock);
909 if (ret < 0) {
910 mlog_errno(ret);
911 goto leave;
912 }
913 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
914 "rechecking now\n", dlm->name, res->lockname.len,
915 res->lockname.name);
916 goto recheck;
917 }
918
919 if (m != O2NM_MAX_NODES) {
920 /* another node has done an assert!
921 * all done! */
922 sleep = 0;
923 } else {
924 sleep = 1;
925 /* have all nodes responded? */
926 if (voting_done && !*blocked) {
927 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
928 if (dlm->node_num <= bit) {
929 /* my node number is lowest.
930 * now tell other nodes that I am
931 * mastering this. */
932 mle->master = dlm->node_num;
933 assert = 1;
934 sleep = 0;
935 }
936 /* if voting is done, but we have not received
937 * an assert master yet, we must sleep */
938 }
939 }
940
941 spin_unlock(&mle->spinlock);
942
943 /* sleep if we haven't finished voting yet */
944 if (sleep) {
945 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
946
947 /*
948 if (atomic_read(&mle->mle_refs.refcount) < 2)
949 mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
950 atomic_read(&mle->mle_refs.refcount),
951 res->lockname.len, res->lockname.name);
952 */
953 atomic_set(&mle->woken, 0);
954 (void)wait_event_timeout(mle->wq,
955 (atomic_read(&mle->woken) == 1),
956 timeo);
957 if (res->owner == O2NM_MAX_NODES) {
958 mlog(0, "waiting again\n");
959 goto recheck;
960 }
961 mlog(0, "done waiting, master is %u\n", res->owner);
962 ret = 0;
963 goto leave;
964 }
965
966 ret = 0; /* done */
967 if (assert) {
968 m = dlm->node_num;
969 mlog(0, "about to master %.*s here, this=%u\n",
970 res->lockname.len, res->lockname.name, m);
971 ret = dlm_do_assert_master(dlm, res->lockname.name,
972 res->lockname.len, mle->vote_map, 0);
973 if (ret) {
974 /* This is a failure in the network path,
975 * not in the response to the assert_master
976 * (any nonzero response is a BUG on this node).
977 * Most likely a socket just got disconnected
978 * due to node death. */
979 mlog_errno(ret);
980 }
981 /* no longer need to restart lock mastery.
982 * all living nodes have been contacted. */
983 ret = 0;
984 }
985
986 /* set the lockres owner */
987 spin_lock(&res->spinlock);
988 dlm_change_lockres_owner(dlm, res, m);
989 spin_unlock(&res->spinlock);
990
991leave:
992 return ret;
993}
994
995struct dlm_bitmap_diff_iter
996{
997 int curnode;
998 unsigned long *orig_bm;
999 unsigned long *cur_bm;
1000 unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1001};
1002
1003enum dlm_node_state_change
1004{
1005 NODE_DOWN = -1,
1006 NODE_NO_CHANGE = 0,
1007 NODE_UP
1008};
1009
1010static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1011 unsigned long *orig_bm,
1012 unsigned long *cur_bm)
1013{
1014 unsigned long p1, p2;
1015 int i;
1016
1017 iter->curnode = -1;
1018 iter->orig_bm = orig_bm;
1019 iter->cur_bm = cur_bm;
1020
1021 for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1022 p1 = *(iter->orig_bm + i);
1023 p2 = *(iter->cur_bm + i);
1024 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1025 }
1026}
1027
1028static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1029 enum dlm_node_state_change *state)
1030{
1031 int bit;
1032
1033 if (iter->curnode >= O2NM_MAX_NODES)
1034 return -ENOENT;
1035
1036 bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1037 iter->curnode+1);
1038 if (bit >= O2NM_MAX_NODES) {
1039 iter->curnode = O2NM_MAX_NODES;
1040 return -ENOENT;
1041 }
1042
1043 /* if it was there in the original then this node died */
1044 if (test_bit(bit, iter->orig_bm))
1045 *state = NODE_DOWN;
1046 else
1047 *state = NODE_UP;
1048
1049 iter->curnode = bit;
1050 return bit;
1051}
1052
1053
1054static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1055 struct dlm_lock_resource *res,
1056 struct dlm_master_list_entry *mle,
1057 int blocked)
1058{
1059 struct dlm_bitmap_diff_iter bdi;
1060 enum dlm_node_state_change sc;
1061 int node;
1062 int ret = 0;
1063
1064 mlog(0, "something happened such that the "
1065 "master process may need to be restarted!\n");
1066
1067 assert_spin_locked(&mle->spinlock);
1068
1069 dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1070 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1071 while (node >= 0) {
1072 if (sc == NODE_UP) {
e2faea4c
KH
1073 /* a node came up. clear any old vote from
1074 * the response map and set it in the vote map
1075 * then restart the mastery. */
1076 mlog(ML_NOTICE, "node %d up while restarting\n", node);
6714d8e8
KH
1077
1078 /* redo the master request, but only for the new node */
1079 mlog(0, "sending request to new node\n");
1080 clear_bit(node, mle->response_map);
1081 set_bit(node, mle->vote_map);
1082 } else {
1083 mlog(ML_ERROR, "node down! %d\n", node);
1084
1085 /* if the node wasn't involved in mastery skip it,
1086 * but clear it out from the maps so that it will
1087 * not affect mastery of this lockres */
1088 clear_bit(node, mle->response_map);
1089 clear_bit(node, mle->vote_map);
1090 if (!test_bit(node, mle->maybe_map))
1091 goto next;
1092
1093 /* if we're already blocked on lock mastery, and the
1094 * dead node wasn't the expected master, or there is
1095 * another node in the maybe_map, keep waiting */
1096 if (blocked) {
1097 int lowest = find_next_bit(mle->maybe_map,
1098 O2NM_MAX_NODES, 0);
1099
1100 /* act like it was never there */
1101 clear_bit(node, mle->maybe_map);
1102
1103 if (node != lowest)
1104 goto next;
1105
1106 mlog(ML_ERROR, "expected master %u died while "
1107 "this node was blocked waiting on it!\n",
1108 node);
1109 lowest = find_next_bit(mle->maybe_map,
1110 O2NM_MAX_NODES,
1111 lowest+1);
1112 if (lowest < O2NM_MAX_NODES) {
1113 mlog(0, "still blocked. waiting "
1114 "on %u now\n", lowest);
1115 goto next;
1116 }
1117
1118 /* mle is an MLE_BLOCK, but there is now
1119 * nothing left to block on. we need to return
1120 * all the way back out and try again with
1121 * an MLE_MASTER. dlm_do_local_recovery_cleanup
1122 * has already run, so the mle refcount is ok */
1123 mlog(0, "no longer blocking. we can "
1124 "try to master this here\n");
1125 mle->type = DLM_MLE_MASTER;
1126 memset(mle->maybe_map, 0,
1127 sizeof(mle->maybe_map));
1128 memset(mle->response_map, 0,
1129 sizeof(mle->maybe_map));
1130 memcpy(mle->vote_map, mle->node_map,
1131 sizeof(mle->node_map));
1132 mle->u.res = res;
1133 set_bit(dlm->node_num, mle->maybe_map);
1134
1135 ret = -EAGAIN;
1136 goto next;
1137 }
1138
1139 clear_bit(node, mle->maybe_map);
1140 if (node > dlm->node_num)
1141 goto next;
1142
1143 mlog(0, "dead node in map!\n");
1144 /* yuck. go back and re-contact all nodes
1145 * in the vote_map, removing this node. */
1146 memset(mle->response_map, 0,
1147 sizeof(mle->response_map));
1148 }
1149 ret = -EAGAIN;
1150next:
1151 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1152 }
1153 return ret;
1154}
1155
1156
1157/*
1158 * DLM_MASTER_REQUEST_MSG
1159 *
1160 * returns: 0 on success,
1161 * -errno on a network error
1162 *
1163 * on error, the caller should assume the target node is "dead"
1164 *
1165 */
1166
1167static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
1168{
1169 struct dlm_ctxt *dlm = mle->dlm;
1170 struct dlm_master_request request;
1171 int ret, response=0, resend;
1172
1173 memset(&request, 0, sizeof(request));
1174 request.node_idx = dlm->node_num;
1175
1176 BUG_ON(mle->type == DLM_MLE_MIGRATION);
1177
1178 if (mle->type != DLM_MLE_MASTER) {
1179 request.namelen = mle->u.name.len;
1180 memcpy(request.name, mle->u.name.name, request.namelen);
1181 } else {
1182 request.namelen = mle->u.res->lockname.len;
1183 memcpy(request.name, mle->u.res->lockname.name,
1184 request.namelen);
1185 }
1186
1187again:
1188 ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1189 sizeof(request), to, &response);
1190 if (ret < 0) {
1191 if (ret == -ESRCH) {
1192 /* should never happen */
1193 mlog(ML_ERROR, "TCP stack not ready!\n");
1194 BUG();
1195 } else if (ret == -EINVAL) {
1196 mlog(ML_ERROR, "bad args passed to o2net!\n");
1197 BUG();
1198 } else if (ret == -ENOMEM) {
1199 mlog(ML_ERROR, "out of memory while trying to send "
1200 "network message! retrying\n");
1201 /* this is totally crude */
1202 msleep(50);
1203 goto again;
1204 } else if (!dlm_is_host_down(ret)) {
1205 /* not a network error. bad. */
1206 mlog_errno(ret);
1207 mlog(ML_ERROR, "unhandled error!");
1208 BUG();
1209 }
1210 /* all other errors should be network errors,
1211 * and likely indicate node death */
1212 mlog(ML_ERROR, "link to %d went down!\n", to);
1213 goto out;
1214 }
1215
1216 ret = 0;
1217 resend = 0;
1218 spin_lock(&mle->spinlock);
1219 switch (response) {
1220 case DLM_MASTER_RESP_YES:
1221 set_bit(to, mle->response_map);
1222 mlog(0, "node %u is the master, response=YES\n", to);
1223 mle->master = to;
1224 break;
1225 case DLM_MASTER_RESP_NO:
1226 mlog(0, "node %u not master, response=NO\n", to);
1227 set_bit(to, mle->response_map);
1228 break;
1229 case DLM_MASTER_RESP_MAYBE:
1230 mlog(0, "node %u not master, response=MAYBE\n", to);
1231 set_bit(to, mle->response_map);
1232 set_bit(to, mle->maybe_map);
1233 break;
1234 case DLM_MASTER_RESP_ERROR:
1235 mlog(0, "node %u hit an error, resending\n", to);
1236 resend = 1;
1237 response = 0;
1238 break;
1239 default:
1240 mlog(ML_ERROR, "bad response! %u\n", response);
1241 BUG();
1242 }
1243 spin_unlock(&mle->spinlock);
1244 if (resend) {
1245 /* this is also totally crude */
1246 msleep(50);
1247 goto again;
1248 }
1249
1250out:
1251 return ret;
1252}
1253
1254/*
1255 * locks that can be taken here:
1256 * dlm->spinlock
1257 * res->spinlock
1258 * mle->spinlock
1259 * dlm->master_list
1260 *
1261 * if possible, TRIM THIS DOWN!!!
1262 */
1263int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1264{
1265 u8 response = DLM_MASTER_RESP_MAYBE;
1266 struct dlm_ctxt *dlm = data;
9c6510a5 1267 struct dlm_lock_resource *res = NULL;
6714d8e8
KH
1268 struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1269 struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1270 char *name;
1271 unsigned int namelen;
1272 int found, ret;
1273 int set_maybe;
9c6510a5 1274 int dispatch_assert = 0;
6714d8e8
KH
1275
1276 if (!dlm_grab(dlm))
1277 return DLM_MASTER_RESP_NO;
1278
1279 if (!dlm_domain_fully_joined(dlm)) {
1280 response = DLM_MASTER_RESP_NO;
1281 goto send_response;
1282 }
1283
1284 name = request->name;
1285 namelen = request->namelen;
1286
1287 if (namelen > DLM_LOCKID_NAME_MAX) {
1288 response = DLM_IVBUFLEN;
1289 goto send_response;
1290 }
1291
1292way_up_top:
1293 spin_lock(&dlm->spinlock);
1294 res = __dlm_lookup_lockres(dlm, name, namelen);
1295 if (res) {
1296 spin_unlock(&dlm->spinlock);
1297
1298 /* take care of the easy cases up front */
1299 spin_lock(&res->spinlock);
1300 if (res->state & DLM_LOCK_RES_RECOVERING) {
1301 spin_unlock(&res->spinlock);
1302 mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1303 "being recovered\n");
1304 response = DLM_MASTER_RESP_ERROR;
1305 if (mle)
1306 kmem_cache_free(dlm_mle_cache, mle);
1307 goto send_response;
1308 }
1309
1310 if (res->owner == dlm->node_num) {
6714d8e8
KH
1311 spin_unlock(&res->spinlock);
1312 // mlog(0, "this node is the master\n");
1313 response = DLM_MASTER_RESP_YES;
1314 if (mle)
1315 kmem_cache_free(dlm_mle_cache, mle);
1316
1317 /* this node is the owner.
1318 * there is some extra work that needs to
1319 * happen now. the requesting node has
1320 * caused all nodes up to this one to
1321 * create mles. this node now needs to
1322 * go back and clean those up. */
9c6510a5 1323 dispatch_assert = 1;
6714d8e8
KH
1324 goto send_response;
1325 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1326 spin_unlock(&res->spinlock);
1327 // mlog(0, "node %u is the master\n", res->owner);
1328 response = DLM_MASTER_RESP_NO;
1329 if (mle)
1330 kmem_cache_free(dlm_mle_cache, mle);
1331 goto send_response;
1332 }
1333
1334 /* ok, there is no owner. either this node is
1335 * being blocked, or it is actively trying to
1336 * master this lock. */
1337 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1338 mlog(ML_ERROR, "lock with no owner should be "
1339 "in-progress!\n");
1340 BUG();
1341 }
1342
1343 // mlog(0, "lockres is in progress...\n");
1344 spin_lock(&dlm->master_lock);
1345 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1346 if (!found) {
1347 mlog(ML_ERROR, "no mle found for this lock!\n");
1348 BUG();
1349 }
1350 set_maybe = 1;
1351 spin_lock(&tmpmle->spinlock);
1352 if (tmpmle->type == DLM_MLE_BLOCK) {
1353 // mlog(0, "this node is waiting for "
1354 // "lockres to be mastered\n");
1355 response = DLM_MASTER_RESP_NO;
1356 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1357 mlog(0, "node %u is master, but trying to migrate to "
1358 "node %u.\n", tmpmle->master, tmpmle->new_master);
1359 if (tmpmle->master == dlm->node_num) {
1360 response = DLM_MASTER_RESP_YES;
1361 mlog(ML_ERROR, "no owner on lockres, but this "
1362 "node is trying to migrate it to %u?!\n",
1363 tmpmle->new_master);
1364 BUG();
1365 } else {
1366 /* the real master can respond on its own */
1367 response = DLM_MASTER_RESP_NO;
1368 }
1369 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1370 set_maybe = 0;
9c6510a5 1371 if (tmpmle->master == dlm->node_num) {
6714d8e8 1372 response = DLM_MASTER_RESP_YES;
9c6510a5
KH
1373 /* this node will be the owner.
1374 * go back and clean the mles on any
1375 * other nodes */
1376 dispatch_assert = 1;
1377 } else
6714d8e8
KH
1378 response = DLM_MASTER_RESP_NO;
1379 } else {
1380 // mlog(0, "this node is attempting to "
1381 // "master lockres\n");
1382 response = DLM_MASTER_RESP_MAYBE;
1383 }
1384 if (set_maybe)
1385 set_bit(request->node_idx, tmpmle->maybe_map);
1386 spin_unlock(&tmpmle->spinlock);
1387
1388 spin_unlock(&dlm->master_lock);
1389 spin_unlock(&res->spinlock);
1390
1391 /* keep the mle attached to heartbeat events */
1392 dlm_put_mle(tmpmle);
1393 if (mle)
1394 kmem_cache_free(dlm_mle_cache, mle);
1395 goto send_response;
1396 }
1397
1398 /*
1399 * lockres doesn't exist on this node
1400 * if there is an MLE_BLOCK, return NO
1401 * if there is an MLE_MASTER, return MAYBE
1402 * otherwise, add an MLE_BLOCK, return NO
1403 */
1404 spin_lock(&dlm->master_lock);
1405 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1406 if (!found) {
1407 /* this lockid has never been seen on this node yet */
1408 // mlog(0, "no mle found\n");
1409 if (!mle) {
1410 spin_unlock(&dlm->master_lock);
1411 spin_unlock(&dlm->spinlock);
1412
1413 mle = (struct dlm_master_list_entry *)
1414 kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
1415 if (!mle) {
6714d8e8 1416 response = DLM_MASTER_RESP_ERROR;
9c6510a5 1417 mlog_errno(-ENOMEM);
6714d8e8
KH
1418 goto send_response;
1419 }
1420 spin_lock(&dlm->spinlock);
1421 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
1422 name, namelen);
1423 spin_unlock(&dlm->spinlock);
1424 goto way_up_top;
1425 }
1426
1427 // mlog(0, "this is second time thru, already allocated, "
1428 // "add the block.\n");
1429 set_bit(request->node_idx, mle->maybe_map);
1430 list_add(&mle->list, &dlm->master_list);
1431 response = DLM_MASTER_RESP_NO;
1432 } else {
1433 // mlog(0, "mle was found\n");
1434 set_maybe = 1;
1435 spin_lock(&tmpmle->spinlock);
9c6510a5
KH
1436 if (tmpmle->master == dlm->node_num) {
1437 mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1438 BUG();
1439 }
6714d8e8
KH
1440 if (tmpmle->type == DLM_MLE_BLOCK)
1441 response = DLM_MASTER_RESP_NO;
1442 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1443 mlog(0, "migration mle was found (%u->%u)\n",
1444 tmpmle->master, tmpmle->new_master);
6714d8e8
KH
1445 /* real master can respond on its own */
1446 response = DLM_MASTER_RESP_NO;
9c6510a5
KH
1447 } else
1448 response = DLM_MASTER_RESP_MAYBE;
6714d8e8
KH
1449 if (set_maybe)
1450 set_bit(request->node_idx, tmpmle->maybe_map);
1451 spin_unlock(&tmpmle->spinlock);
1452 }
1453 spin_unlock(&dlm->master_lock);
1454 spin_unlock(&dlm->spinlock);
1455
1456 if (found) {
1457 /* keep the mle attached to heartbeat events */
1458 dlm_put_mle(tmpmle);
1459 }
1460send_response:
9c6510a5
KH
1461
1462 if (dispatch_assert) {
1463 if (response != DLM_MASTER_RESP_YES)
1464 mlog(ML_ERROR, "invalid response %d\n", response);
1465 if (!res) {
1466 mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1467 BUG();
1468 }
1469 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1470 dlm->node_num, res->lockname.len, res->lockname.name);
1471 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1472 DLM_ASSERT_MASTER_MLE_CLEANUP);
1473 if (ret < 0) {
1474 mlog(ML_ERROR, "failed to dispatch assert master work\n");
1475 response = DLM_MASTER_RESP_ERROR;
1476 }
1477 }
1478
6714d8e8
KH
1479 dlm_put(dlm);
1480 return response;
1481}
1482
1483/*
1484 * DLM_ASSERT_MASTER_MSG
1485 */
1486
1487
1488/*
1489 * NOTE: this can be used for debugging
1490 * can periodically run all locks owned by this node
1491 * and re-assert across the cluster...
1492 */
1493static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1494 unsigned int namelen, void *nodemap,
1495 u32 flags)
1496{
1497 struct dlm_assert_master assert;
1498 int to, tmpret;
1499 struct dlm_node_iter iter;
1500 int ret = 0;
9c6510a5 1501 int reassert;
6714d8e8
KH
1502
1503 BUG_ON(namelen > O2NM_MAX_NAME_LEN);
9c6510a5
KH
1504again:
1505 reassert = 0;
6714d8e8
KH
1506
1507 /* note that if this nodemap is empty, it returns 0 */
1508 dlm_node_iter_init(nodemap, &iter);
1509 while ((to = dlm_node_iter_next(&iter)) >= 0) {
1510 int r = 0;
1511 mlog(0, "sending assert master to %d (%.*s)\n", to,
1512 namelen, lockname);
1513 memset(&assert, 0, sizeof(assert));
1514 assert.node_idx = dlm->node_num;
1515 assert.namelen = namelen;
1516 memcpy(assert.name, lockname, namelen);
1517 assert.flags = cpu_to_be32(flags);
1518
1519 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1520 &assert, sizeof(assert), to, &r);
1521 if (tmpret < 0) {
1522 mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
1523 if (!dlm_is_host_down(tmpret)) {
1524 mlog(ML_ERROR, "unhandled error!\n");
1525 BUG();
1526 }
1527 /* a node died. finish out the rest of the nodes. */
1528 mlog(ML_ERROR, "link to %d went down!\n", to);
1529 /* any nonzero status return will do */
1530 ret = tmpret;
1531 } else if (r < 0) {
1532 /* ok, something horribly messed. kill thyself. */
1533 mlog(ML_ERROR,"during assert master of %.*s to %u, "
1534 "got %d.\n", namelen, lockname, to, r);
1535 dlm_dump_lock_resources(dlm);
1536 BUG();
9c6510a5
KH
1537 } else if (r == EAGAIN) {
1538 mlog(0, "%.*s: node %u create mles on other "
1539 "nodes and requests a re-assert\n",
1540 namelen, lockname, to);
1541 reassert = 1;
6714d8e8
KH
1542 }
1543 }
1544
9c6510a5
KH
1545 if (reassert)
1546 goto again;
1547
6714d8e8
KH
1548 return ret;
1549}
1550
1551/*
1552 * locks that can be taken here:
1553 * dlm->spinlock
1554 * res->spinlock
1555 * mle->spinlock
1556 * dlm->master_list
1557 *
1558 * if possible, TRIM THIS DOWN!!!
1559 */
1560int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1561{
1562 struct dlm_ctxt *dlm = data;
1563 struct dlm_master_list_entry *mle = NULL;
1564 struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1565 struct dlm_lock_resource *res = NULL;
1566 char *name;
1567 unsigned int namelen;
1568 u32 flags;
9c6510a5
KH
1569 int master_request = 0;
1570 int ret = 0;
6714d8e8
KH
1571
1572 if (!dlm_grab(dlm))
1573 return 0;
1574
1575 name = assert->name;
1576 namelen = assert->namelen;
1577 flags = be32_to_cpu(assert->flags);
1578
1579 if (namelen > DLM_LOCKID_NAME_MAX) {
1580 mlog(ML_ERROR, "Invalid name length!");
1581 goto done;
1582 }
1583
1584 spin_lock(&dlm->spinlock);
1585
1586 if (flags)
1587 mlog(0, "assert_master with flags: %u\n", flags);
1588
1589 /* find the MLE */
1590 spin_lock(&dlm->master_lock);
1591 if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1592 /* not an error, could be master just re-asserting */
1593 mlog(0, "just got an assert_master from %u, but no "
1594 "MLE for it! (%.*s)\n", assert->node_idx,
1595 namelen, name);
1596 } else {
1597 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1598 if (bit >= O2NM_MAX_NODES) {
1599 /* not necessarily an error, though less likely.
1600 * could be master just re-asserting. */
1601 mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
1602 "is asserting! (%.*s)\n", assert->node_idx,
1603 namelen, name);
1604 } else if (bit != assert->node_idx) {
1605 if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1606 mlog(0, "master %u was found, %u should "
1607 "back off\n", assert->node_idx, bit);
1608 } else {
1609 /* with the fix for bug 569, a higher node
1610 * number winning the mastery will respond
1611 * YES to mastery requests, but this node
1612 * had no way of knowing. let it pass. */
1613 mlog(ML_ERROR, "%u is the lowest node, "
1614 "%u is asserting. (%.*s) %u must "
1615 "have begun after %u won.\n", bit,
1616 assert->node_idx, namelen, name, bit,
1617 assert->node_idx);
1618 }
1619 }
1620 }
1621 spin_unlock(&dlm->master_lock);
1622
1623 /* ok everything checks out with the MLE
1624 * now check to see if there is a lockres */
1625 res = __dlm_lookup_lockres(dlm, name, namelen);
1626 if (res) {
1627 spin_lock(&res->spinlock);
1628 if (res->state & DLM_LOCK_RES_RECOVERING) {
1629 mlog(ML_ERROR, "%u asserting but %.*s is "
1630 "RECOVERING!\n", assert->node_idx, namelen, name);
1631 goto kill;
1632 }
1633 if (!mle) {
1634 if (res->owner != assert->node_idx) {
1635 mlog(ML_ERROR, "assert_master from "
1636 "%u, but current owner is "
1637 "%u! (%.*s)\n",
1638 assert->node_idx, res->owner,
1639 namelen, name);
1640 goto kill;
1641 }
1642 } else if (mle->type != DLM_MLE_MIGRATION) {
1643 if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1644 /* owner is just re-asserting */
1645 if (res->owner == assert->node_idx) {
1646 mlog(0, "owner %u re-asserting on "
1647 "lock %.*s\n", assert->node_idx,
1648 namelen, name);
1649 goto ok;
1650 }
1651 mlog(ML_ERROR, "got assert_master from "
1652 "node %u, but %u is the owner! "
1653 "(%.*s)\n", assert->node_idx,
1654 res->owner, namelen, name);
1655 goto kill;
1656 }
1657 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1658 mlog(ML_ERROR, "got assert from %u, but lock "
1659 "with no owner should be "
1660 "in-progress! (%.*s)\n",
1661 assert->node_idx,
1662 namelen, name);
1663 goto kill;
1664 }
1665 } else /* mle->type == DLM_MLE_MIGRATION */ {
1666 /* should only be getting an assert from new master */
1667 if (assert->node_idx != mle->new_master) {
1668 mlog(ML_ERROR, "got assert from %u, but "
1669 "new master is %u, and old master "
1670 "was %u (%.*s)\n",
1671 assert->node_idx, mle->new_master,
1672 mle->master, namelen, name);
1673 goto kill;
1674 }
1675
1676 }
1677ok:
1678 spin_unlock(&res->spinlock);
1679 }
1680 spin_unlock(&dlm->spinlock);
1681
1682 // mlog(0, "woo! got an assert_master from node %u!\n",
1683 // assert->node_idx);
1684 if (mle) {
9c6510a5
KH
1685 int extra_ref = 0;
1686 int nn = -1;
6714d8e8
KH
1687
1688 spin_lock(&mle->spinlock);
9c6510a5
KH
1689 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1690 extra_ref = 1;
1691 else {
1692 /* MASTER mle: if any bits set in the response map
1693 * then the calling node needs to re-assert to clear
1694 * up nodes that this node contacted */
1695 while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1696 nn+1)) < O2NM_MAX_NODES) {
1697 if (nn != dlm->node_num && nn != assert->node_idx)
1698 master_request = 1;
1699 }
1700 }
6714d8e8
KH
1701 mle->master = assert->node_idx;
1702 atomic_set(&mle->woken, 1);
1703 wake_up(&mle->wq);
1704 spin_unlock(&mle->spinlock);
1705
1706 if (mle->type == DLM_MLE_MIGRATION && res) {
1707 mlog(0, "finishing off migration of lockres %.*s, "
1708 "from %u to %u\n",
1709 res->lockname.len, res->lockname.name,
1710 dlm->node_num, mle->new_master);
1711 spin_lock(&res->spinlock);
1712 res->state &= ~DLM_LOCK_RES_MIGRATING;
1713 dlm_change_lockres_owner(dlm, res, mle->new_master);
1714 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1715 spin_unlock(&res->spinlock);
1716 }
1717 /* master is known, detach if not already detached */
1718 dlm_mle_detach_hb_events(dlm, mle);
1719 dlm_put_mle(mle);
1720
1721 if (extra_ref) {
1722 /* the assert master message now balances the extra
1723 * ref given by the master / migration request message.
1724 * if this is the last put, it will be removed
1725 * from the list. */
1726 dlm_put_mle(mle);
1727 }
1728 }
1729
1730done:
9c6510a5 1731 ret = 0;
6714d8e8
KH
1732 if (res)
1733 dlm_lockres_put(res);
1734 dlm_put(dlm);
9c6510a5
KH
1735 if (master_request) {
1736 mlog(0, "need to tell master to reassert\n");
1737 ret = EAGAIN; // positive. negative would shoot down the node.
1738 }
1739 return ret;
6714d8e8
KH
1740
1741kill:
1742 /* kill the caller! */
1743 spin_unlock(&res->spinlock);
1744 spin_unlock(&dlm->spinlock);
1745 dlm_lockres_put(res);
1746 mlog(ML_ERROR, "Bad message received from another node. Dumping state "
1747 "and killing the other node now! This node is OK and can continue.\n");
1748 dlm_dump_lock_resources(dlm);
1749 dlm_put(dlm);
1750 return -EINVAL;
1751}
1752
1753int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1754 struct dlm_lock_resource *res,
1755 int ignore_higher, u8 request_from, u32 flags)
1756{
1757 struct dlm_work_item *item;
1758 item = kcalloc(1, sizeof(*item), GFP_KERNEL);
1759 if (!item)
1760 return -ENOMEM;
1761
1762
1763 /* queue up work for dlm_assert_master_worker */
1764 dlm_grab(dlm); /* get an extra ref for the work item */
1765 dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
1766 item->u.am.lockres = res; /* already have a ref */
1767 /* can optionally ignore node numbers higher than this node */
1768 item->u.am.ignore_higher = ignore_higher;
1769 item->u.am.request_from = request_from;
1770 item->u.am.flags = flags;
1771
9c6510a5
KH
1772 if (ignore_higher)
1773 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
1774 res->lockname.name);
1775
6714d8e8
KH
1776 spin_lock(&dlm->work_lock);
1777 list_add_tail(&item->list, &dlm->work_list);
1778 spin_unlock(&dlm->work_lock);
1779
1780 schedule_work(&dlm->dispatched_work);
1781 return 0;
1782}
1783
1784static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1785{
1786 struct dlm_ctxt *dlm = data;
1787 int ret = 0;
1788 struct dlm_lock_resource *res;
1789 unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1790 int ignore_higher;
1791 int bit;
1792 u8 request_from;
1793 u32 flags;
1794
1795 dlm = item->dlm;
1796 res = item->u.am.lockres;
1797 ignore_higher = item->u.am.ignore_higher;
1798 request_from = item->u.am.request_from;
1799 flags = item->u.am.flags;
1800
1801 spin_lock(&dlm->spinlock);
1802 memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
1803 spin_unlock(&dlm->spinlock);
1804
1805 clear_bit(dlm->node_num, nodemap);
1806 if (ignore_higher) {
1807 /* if is this just to clear up mles for nodes below
1808 * this node, do not send the message to the original
1809 * caller or any node number higher than this */
1810 clear_bit(request_from, nodemap);
1811 bit = dlm->node_num;
1812 while (1) {
1813 bit = find_next_bit(nodemap, O2NM_MAX_NODES,
1814 bit+1);
1815 if (bit >= O2NM_MAX_NODES)
1816 break;
1817 clear_bit(bit, nodemap);
1818 }
1819 }
1820
1821 /* this call now finishes out the nodemap
1822 * even if one or more nodes die */
1823 mlog(0, "worker about to master %.*s here, this=%u\n",
1824 res->lockname.len, res->lockname.name, dlm->node_num);
1825 ret = dlm_do_assert_master(dlm, res->lockname.name,
1826 res->lockname.len,
1827 nodemap, flags);
1828 if (ret < 0) {
1829 /* no need to restart, we are done */
1830 mlog_errno(ret);
1831 }
1832
1833 dlm_lockres_put(res);
1834
1835 mlog(0, "finished with dlm_assert_master_worker\n");
1836}
1837
1838
1839/*
1840 * DLM_MIGRATE_LOCKRES
1841 */
1842
1843
1844int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1845 u8 target)
1846{
1847 struct dlm_master_list_entry *mle = NULL;
1848 struct dlm_master_list_entry *oldmle = NULL;
1849 struct dlm_migratable_lockres *mres = NULL;
1850 int ret = -EINVAL;
1851 const char *name;
1852 unsigned int namelen;
1853 int mle_added = 0;
1854 struct list_head *queue, *iter;
1855 int i;
1856 struct dlm_lock *lock;
1857 int empty = 1;
1858
1859 if (!dlm_grab(dlm))
1860 return -EINVAL;
1861
1862 name = res->lockname.name;
1863 namelen = res->lockname.len;
1864
1865 mlog(0, "migrating %.*s to %u\n", namelen, name, target);
1866
1867 /*
1868 * ensure this lockres is a proper candidate for migration
1869 */
1870 spin_lock(&res->spinlock);
1871 if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
1872 mlog(0, "cannot migrate lockres with unknown owner!\n");
1873 spin_unlock(&res->spinlock);
1874 goto leave;
1875 }
1876 if (res->owner != dlm->node_num) {
1877 mlog(0, "cannot migrate lockres this node doesn't own!\n");
1878 spin_unlock(&res->spinlock);
1879 goto leave;
1880 }
1881 mlog(0, "checking queues...\n");
1882 queue = &res->granted;
1883 for (i=0; i<3; i++) {
1884 list_for_each(iter, queue) {
1885 lock = list_entry (iter, struct dlm_lock, list);
1886 empty = 0;
1887 if (lock->ml.node == dlm->node_num) {
1888 mlog(0, "found a lock owned by this node "
1889 "still on the %s queue! will not "
1890 "migrate this lockres\n",
1891 i==0 ? "granted" :
1892 (i==1 ? "converting" : "blocked"));
1893 spin_unlock(&res->spinlock);
1894 ret = -ENOTEMPTY;
1895 goto leave;
1896 }
1897 }
1898 queue++;
1899 }
1900 mlog(0, "all locks on this lockres are nonlocal. continuing\n");
1901 spin_unlock(&res->spinlock);
1902
1903 /* no work to do */
1904 if (empty) {
1905 mlog(0, "no locks were found on this lockres! done!\n");
1906 ret = 0;
1907 goto leave;
1908 }
1909
1910 /*
1911 * preallocate up front
1912 * if this fails, abort
1913 */
1914
1915 ret = -ENOMEM;
1916 mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
1917 if (!mres) {
1918 mlog_errno(ret);
1919 goto leave;
1920 }
1921
1922 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
1923 GFP_KERNEL);
1924 if (!mle) {
1925 mlog_errno(ret);
1926 goto leave;
1927 }
1928 ret = 0;
1929
1930 /*
1931 * find a node to migrate the lockres to
1932 */
1933
1934 mlog(0, "picking a migration node\n");
1935 spin_lock(&dlm->spinlock);
1936 /* pick a new node */
1937 if (!test_bit(target, dlm->domain_map) ||
1938 target >= O2NM_MAX_NODES) {
1939 target = dlm_pick_migration_target(dlm, res);
1940 }
1941 mlog(0, "node %u chosen for migration\n", target);
1942
1943 if (target >= O2NM_MAX_NODES ||
1944 !test_bit(target, dlm->domain_map)) {
1945 /* target chosen is not alive */
1946 ret = -EINVAL;
1947 }
1948
1949 if (ret) {
1950 spin_unlock(&dlm->spinlock);
1951 goto fail;
1952 }
1953
1954 mlog(0, "continuing with target = %u\n", target);
1955
1956 /*
1957 * clear any existing master requests and
1958 * add the migration mle to the list
1959 */
1960 spin_lock(&dlm->master_lock);
1961 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
1962 namelen, target, dlm->node_num);
1963 spin_unlock(&dlm->master_lock);
1964 spin_unlock(&dlm->spinlock);
1965
1966 if (ret == -EEXIST) {
1967 mlog(0, "another process is already migrating it\n");
1968 goto fail;
1969 }
1970 mle_added = 1;
1971
1972 /*
1973 * set the MIGRATING flag and flush asts
1974 * if we fail after this we need to re-dirty the lockres
1975 */
1976 if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
1977 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
1978 "the target went down.\n", res->lockname.len,
1979 res->lockname.name, target);
1980 spin_lock(&res->spinlock);
1981 res->state &= ~DLM_LOCK_RES_MIGRATING;
1982 spin_unlock(&res->spinlock);
1983 ret = -EINVAL;
1984 }
1985
1986fail:
1987 if (oldmle) {
1988 /* master is known, detach if not already detached */
1989 dlm_mle_detach_hb_events(dlm, oldmle);
1990 dlm_put_mle(oldmle);
1991 }
1992
1993 if (ret < 0) {
1994 if (mle_added) {
1995 dlm_mle_detach_hb_events(dlm, mle);
1996 dlm_put_mle(mle);
1997 } else if (mle) {
1998 kmem_cache_free(dlm_mle_cache, mle);
1999 }
2000 goto leave;
2001 }
2002
2003 /*
2004 * at this point, we have a migration target, an mle
2005 * in the master list, and the MIGRATING flag set on
2006 * the lockres
2007 */
2008
2009
2010 /* get an extra reference on the mle.
2011 * otherwise the assert_master from the new
2012 * master will destroy this.
2013 * also, make sure that all callers of dlm_get_mle
2014 * take both dlm->spinlock and dlm->master_lock */
2015 spin_lock(&dlm->spinlock);
2016 spin_lock(&dlm->master_lock);
2017 dlm_get_mle(mle);
2018 spin_unlock(&dlm->master_lock);
2019 spin_unlock(&dlm->spinlock);
2020
2021 /* notify new node and send all lock state */
2022 /* call send_one_lockres with migration flag.
2023 * this serves as notice to the target node that a
2024 * migration is starting. */
2025 ret = dlm_send_one_lockres(dlm, res, mres, target,
2026 DLM_MRES_MIGRATION);
2027
2028 if (ret < 0) {
2029 mlog(0, "migration to node %u failed with %d\n",
2030 target, ret);
2031 /* migration failed, detach and clean up mle */
2032 dlm_mle_detach_hb_events(dlm, mle);
2033 dlm_put_mle(mle);
2034 dlm_put_mle(mle);
2035 goto leave;
2036 }
2037
2038 /* at this point, the target sends a message to all nodes,
2039 * (using dlm_do_migrate_request). this node is skipped since
2040 * we had to put an mle in the list to begin the process. this
2041 * node now waits for target to do an assert master. this node
2042 * will be the last one notified, ensuring that the migration
2043 * is complete everywhere. if the target dies while this is
2044 * going on, some nodes could potentially see the target as the
2045 * master, so it is important that my recovery finds the migration
2046 * mle and sets the master to UNKNONWN. */
2047
2048
2049 /* wait for new node to assert master */
2050 while (1) {
2051 ret = wait_event_interruptible_timeout(mle->wq,
2052 (atomic_read(&mle->woken) == 1),
2053 msecs_to_jiffies(5000));
2054
2055 if (ret >= 0) {
2056 if (atomic_read(&mle->woken) == 1 ||
2057 res->owner == target)
2058 break;
2059
2060 mlog(0, "timed out during migration\n");
e2faea4c
KH
2061 /* avoid hang during shutdown when migrating lockres
2062 * to a node which also goes down */
2063 if (dlm_is_node_dead(dlm, target)) {
2064 mlog(0, "%s:%.*s: expected migration target %u "
2065 "is no longer up. restarting.\n",
2066 dlm->name, res->lockname.len,
2067 res->lockname.name, target);
2068 ret = -ERESTARTSYS;
2069 }
6714d8e8
KH
2070 }
2071 if (ret == -ERESTARTSYS) {
2072 /* migration failed, detach and clean up mle */
2073 dlm_mle_detach_hb_events(dlm, mle);
2074 dlm_put_mle(mle);
2075 dlm_put_mle(mle);
2076 goto leave;
2077 }
2078 /* TODO: if node died: stop, clean up, return error */
2079 }
2080
2081 /* all done, set the owner, clear the flag */
2082 spin_lock(&res->spinlock);
2083 dlm_set_lockres_owner(dlm, res, target);
2084 res->state &= ~DLM_LOCK_RES_MIGRATING;
2085 dlm_remove_nonlocal_locks(dlm, res);
2086 spin_unlock(&res->spinlock);
2087 wake_up(&res->wq);
2088
2089 /* master is known, detach if not already detached */
2090 dlm_mle_detach_hb_events(dlm, mle);
2091 dlm_put_mle(mle);
2092 ret = 0;
2093
2094 dlm_lockres_calc_usage(dlm, res);
2095
2096leave:
2097 /* re-dirty the lockres if we failed */
2098 if (ret < 0)
2099 dlm_kick_thread(dlm, res);
2100
2101 /* TODO: cleanup */
2102 if (mres)
2103 free_page((unsigned long)mres);
2104
2105 dlm_put(dlm);
2106
2107 mlog(0, "returning %d\n", ret);
2108 return ret;
2109}
2110EXPORT_SYMBOL_GPL(dlm_migrate_lockres);
2111
2112int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2113{
2114 int ret;
2115 spin_lock(&dlm->ast_lock);
2116 spin_lock(&lock->spinlock);
2117 ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2118 spin_unlock(&lock->spinlock);
2119 spin_unlock(&dlm->ast_lock);
2120 return ret;
2121}
2122
2123static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2124 struct dlm_lock_resource *res,
2125 u8 mig_target)
2126{
2127 int can_proceed;
2128 spin_lock(&res->spinlock);
2129 can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2130 spin_unlock(&res->spinlock);
2131
2132 /* target has died, so make the caller break out of the
2133 * wait_event, but caller must recheck the domain_map */
2134 spin_lock(&dlm->spinlock);
2135 if (!test_bit(mig_target, dlm->domain_map))
2136 can_proceed = 1;
2137 spin_unlock(&dlm->spinlock);
2138 return can_proceed;
2139}
2140
2141int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2142{
2143 int ret;
2144 spin_lock(&res->spinlock);
2145 ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2146 spin_unlock(&res->spinlock);
2147 return ret;
2148}
2149
2150
2151static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2152 struct dlm_lock_resource *res,
2153 u8 target)
2154{
2155 int ret = 0;
2156
2157 mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2158 res->lockname.len, res->lockname.name, dlm->node_num,
2159 target);
2160 /* need to set MIGRATING flag on lockres. this is done by
2161 * ensuring that all asts have been flushed for this lockres. */
2162 spin_lock(&res->spinlock);
2163 BUG_ON(res->migration_pending);
2164 res->migration_pending = 1;
2165 /* strategy is to reserve an extra ast then release
2166 * it below, letting the release do all of the work */
2167 __dlm_lockres_reserve_ast(res);
2168 spin_unlock(&res->spinlock);
2169
2170 /* now flush all the pending asts.. hang out for a bit */
2171 dlm_kick_thread(dlm, res);
2172 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2173 dlm_lockres_release_ast(dlm, res);
2174
2175 mlog(0, "about to wait on migration_wq, dirty=%s\n",
2176 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2177 /* if the extra ref we just put was the final one, this
2178 * will pass thru immediately. otherwise, we need to wait
2179 * for the last ast to finish. */
2180again:
2181 ret = wait_event_interruptible_timeout(dlm->migration_wq,
2182 dlm_migration_can_proceed(dlm, res, target),
2183 msecs_to_jiffies(1000));
2184 if (ret < 0) {
2185 mlog(0, "woken again: migrating? %s, dead? %s\n",
2186 res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2187 test_bit(target, dlm->domain_map) ? "no":"yes");
2188 } else {
2189 mlog(0, "all is well: migrating? %s, dead? %s\n",
2190 res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2191 test_bit(target, dlm->domain_map) ? "no":"yes");
2192 }
2193 if (!dlm_migration_can_proceed(dlm, res, target)) {
2194 mlog(0, "trying again...\n");
2195 goto again;
2196 }
2197
2198 /* did the target go down or die? */
2199 spin_lock(&dlm->spinlock);
2200 if (!test_bit(target, dlm->domain_map)) {
2201 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2202 target);
2203 ret = -EHOSTDOWN;
2204 }
2205 spin_unlock(&dlm->spinlock);
2206
2207 /*
2208 * at this point:
2209 *
2210 * o the DLM_LOCK_RES_MIGRATING flag is set
2211 * o there are no pending asts on this lockres
2212 * o all processes trying to reserve an ast on this
2213 * lockres must wait for the MIGRATING flag to clear
2214 */
2215 return ret;
2216}
2217
2218/* last step in the migration process.
2219 * original master calls this to free all of the dlm_lock
2220 * structures that used to be for other nodes. */
2221static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2222 struct dlm_lock_resource *res)
2223{
2224 struct list_head *iter, *iter2;
2225 struct list_head *queue = &res->granted;
2226 int i;
2227 struct dlm_lock *lock;
2228
2229 assert_spin_locked(&res->spinlock);
2230
2231 BUG_ON(res->owner == dlm->node_num);
2232
2233 for (i=0; i<3; i++) {
2234 list_for_each_safe(iter, iter2, queue) {
2235 lock = list_entry (iter, struct dlm_lock, list);
2236 if (lock->ml.node != dlm->node_num) {
2237 mlog(0, "putting lock for node %u\n",
2238 lock->ml.node);
2239 /* be extra careful */
2240 BUG_ON(!list_empty(&lock->ast_list));
2241 BUG_ON(!list_empty(&lock->bast_list));
2242 BUG_ON(lock->ast_pending);
2243 BUG_ON(lock->bast_pending);
2244 list_del_init(&lock->list);
2245 dlm_lock_put(lock);
2246 }
2247 }
2248 queue++;
2249 }
2250}
2251
2252/* for now this is not too intelligent. we will
2253 * need stats to make this do the right thing.
2254 * this just finds the first lock on one of the
2255 * queues and uses that node as the target. */
2256static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2257 struct dlm_lock_resource *res)
2258{
2259 int i;
2260 struct list_head *queue = &res->granted;
2261 struct list_head *iter;
2262 struct dlm_lock *lock;
2263 int nodenum;
2264
2265 assert_spin_locked(&dlm->spinlock);
2266
2267 spin_lock(&res->spinlock);
2268 for (i=0; i<3; i++) {
2269 list_for_each(iter, queue) {
2270 /* up to the caller to make sure this node
2271 * is alive */
2272 lock = list_entry (iter, struct dlm_lock, list);
2273 if (lock->ml.node != dlm->node_num) {
2274 spin_unlock(&res->spinlock);
2275 return lock->ml.node;
2276 }
2277 }
2278 queue++;
2279 }
2280 spin_unlock(&res->spinlock);
2281 mlog(0, "have not found a suitable target yet! checking domain map\n");
2282
2283 /* ok now we're getting desperate. pick anyone alive. */
2284 nodenum = -1;
2285 while (1) {
2286 nodenum = find_next_bit(dlm->domain_map,
2287 O2NM_MAX_NODES, nodenum+1);
2288 mlog(0, "found %d in domain map\n", nodenum);
2289 if (nodenum >= O2NM_MAX_NODES)
2290 break;
2291 if (nodenum != dlm->node_num) {
2292 mlog(0, "picking %d\n", nodenum);
2293 return nodenum;
2294 }
2295 }
2296
2297 mlog(0, "giving up. no master to migrate to\n");
2298 return DLM_LOCK_RES_OWNER_UNKNOWN;
2299}
2300
2301
2302
2303/* this is called by the new master once all lockres
2304 * data has been received */
2305static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2306 struct dlm_lock_resource *res,
2307 u8 master, u8 new_master,
2308 struct dlm_node_iter *iter)
2309{
2310 struct dlm_migrate_request migrate;
2311 int ret, status = 0;
2312 int nodenum;
2313
2314 memset(&migrate, 0, sizeof(migrate));
2315 migrate.namelen = res->lockname.len;
2316 memcpy(migrate.name, res->lockname.name, migrate.namelen);
2317 migrate.new_master = new_master;
2318 migrate.master = master;
2319
2320 ret = 0;
2321
2322 /* send message to all nodes, except the master and myself */
2323 while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2324 if (nodenum == master ||
2325 nodenum == new_master)
2326 continue;
2327
2328 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2329 &migrate, sizeof(migrate), nodenum,
2330 &status);
2331 if (ret < 0)
2332 mlog_errno(ret);
2333 else if (status < 0) {
2334 mlog(0, "migrate request (node %u) returned %d!\n",
2335 nodenum, status);
2336 ret = status;
2337 }
2338 }
2339
2340 if (ret < 0)
2341 mlog_errno(ret);
2342
2343 mlog(0, "returning ret=%d\n", ret);
2344 return ret;
2345}
2346
2347
2348/* if there is an existing mle for this lockres, we now know who the master is.
2349 * (the one who sent us *this* message) we can clear it up right away.
2350 * since the process that put the mle on the list still has a reference to it,
2351 * we can unhash it now, set the master and wake the process. as a result,
2352 * we will have no mle in the list to start with. now we can add an mle for
2353 * the migration and this should be the only one found for those scanning the
2354 * list. */
2355int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2356{
2357 struct dlm_ctxt *dlm = data;
2358 struct dlm_lock_resource *res = NULL;
2359 struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2360 struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2361 const char *name;
2362 unsigned int namelen;
2363 int ret = 0;
2364
2365 if (!dlm_grab(dlm))
2366 return -EINVAL;
2367
2368 name = migrate->name;
2369 namelen = migrate->namelen;
2370
2371 /* preallocate.. if this fails, abort */
2372 mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2373 GFP_KERNEL);
2374
2375 if (!mle) {
2376 ret = -ENOMEM;
2377 goto leave;
2378 }
2379
2380 /* check for pre-existing lock */
2381 spin_lock(&dlm->spinlock);
2382 res = __dlm_lookup_lockres(dlm, name, namelen);
2383 spin_lock(&dlm->master_lock);
2384
2385 if (res) {
2386 spin_lock(&res->spinlock);
2387 if (res->state & DLM_LOCK_RES_RECOVERING) {
2388 /* if all is working ok, this can only mean that we got
2389 * a migrate request from a node that we now see as
2390 * dead. what can we do here? drop it to the floor? */
2391 spin_unlock(&res->spinlock);
2392 mlog(ML_ERROR, "Got a migrate request, but the "
2393 "lockres is marked as recovering!");
2394 kmem_cache_free(dlm_mle_cache, mle);
2395 ret = -EINVAL; /* need a better solution */
2396 goto unlock;
2397 }
2398 res->state |= DLM_LOCK_RES_MIGRATING;
2399 spin_unlock(&res->spinlock);
2400 }
2401
2402 /* ignore status. only nonzero status would BUG. */
2403 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
2404 name, namelen,
2405 migrate->new_master,
2406 migrate->master);
2407
2408unlock:
2409 spin_unlock(&dlm->master_lock);
2410 spin_unlock(&dlm->spinlock);
2411
2412 if (oldmle) {
2413 /* master is known, detach if not already detached */
2414 dlm_mle_detach_hb_events(dlm, oldmle);
2415 dlm_put_mle(oldmle);
2416 }
2417
2418 if (res)
2419 dlm_lockres_put(res);
2420leave:
2421 dlm_put(dlm);
2422 return ret;
2423}
2424
2425/* must be holding dlm->spinlock and dlm->master_lock
2426 * when adding a migration mle, we can clear any other mles
2427 * in the master list because we know with certainty that
2428 * the master is "master". so we remove any old mle from
2429 * the list after setting it's master field, and then add
2430 * the new migration mle. this way we can hold with the rule
2431 * of having only one mle for a given lock name at all times. */
2432static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2433 struct dlm_lock_resource *res,
2434 struct dlm_master_list_entry *mle,
2435 struct dlm_master_list_entry **oldmle,
2436 const char *name, unsigned int namelen,
2437 u8 new_master, u8 master)
2438{
2439 int found;
2440 int ret = 0;
2441
2442 *oldmle = NULL;
2443
2444 mlog_entry_void();
2445
2446 assert_spin_locked(&dlm->spinlock);
2447 assert_spin_locked(&dlm->master_lock);
2448
2449 /* caller is responsible for any ref taken here on oldmle */
2450 found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
2451 if (found) {
2452 struct dlm_master_list_entry *tmp = *oldmle;
2453 spin_lock(&tmp->spinlock);
2454 if (tmp->type == DLM_MLE_MIGRATION) {
2455 if (master == dlm->node_num) {
2456 /* ah another process raced me to it */
2457 mlog(0, "tried to migrate %.*s, but some "
2458 "process beat me to it\n",
2459 namelen, name);
2460 ret = -EEXIST;
2461 } else {
2462 /* bad. 2 NODES are trying to migrate! */
2463 mlog(ML_ERROR, "migration error mle: "
2464 "master=%u new_master=%u // request: "
2465 "master=%u new_master=%u // "
2466 "lockres=%.*s\n",
2467 tmp->master, tmp->new_master,
2468 master, new_master,
2469 namelen, name);
2470 BUG();
2471 }
2472 } else {
2473 /* this is essentially what assert_master does */
2474 tmp->master = master;
2475 atomic_set(&tmp->woken, 1);
2476 wake_up(&tmp->wq);
2477 /* remove it from the list so that only one
2478 * mle will be found */
2479 list_del_init(&tmp->list);
2480 }
2481 spin_unlock(&tmp->spinlock);
2482 }
2483
2484 /* now add a migration mle to the tail of the list */
2485 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2486 mle->new_master = new_master;
2487 mle->master = master;
2488 /* do this for consistency with other mle types */
2489 set_bit(new_master, mle->maybe_map);
2490 list_add(&mle->list, &dlm->master_list);
2491
2492 return ret;
2493}
2494
2495
2496void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2497{
2498 struct list_head *iter, *iter2;
2499 struct dlm_master_list_entry *mle;
2500 struct dlm_lock_resource *res;
2501
2502 mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2503top:
2504 assert_spin_locked(&dlm->spinlock);
2505
2506 /* clean the master list */
2507 spin_lock(&dlm->master_lock);
2508 list_for_each_safe(iter, iter2, &dlm->master_list) {
2509 mle = list_entry(iter, struct dlm_master_list_entry, list);
2510
2511 BUG_ON(mle->type != DLM_MLE_BLOCK &&
2512 mle->type != DLM_MLE_MASTER &&
2513 mle->type != DLM_MLE_MIGRATION);
2514
2515 /* MASTER mles are initiated locally. the waiting
2516 * process will notice the node map change
2517 * shortly. let that happen as normal. */
2518 if (mle->type == DLM_MLE_MASTER)
2519 continue;
2520
2521
2522 /* BLOCK mles are initiated by other nodes.
2523 * need to clean up if the dead node would have
2524 * been the master. */
2525 if (mle->type == DLM_MLE_BLOCK) {
2526 int bit;
2527
2528 spin_lock(&mle->spinlock);
2529 bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
2530 if (bit != dead_node) {
2531 mlog(0, "mle found, but dead node %u would "
2532 "not have been master\n", dead_node);
2533 spin_unlock(&mle->spinlock);
2534 } else {
2535 /* must drop the refcount by one since the
2536 * assert_master will never arrive. this
2537 * may result in the mle being unlinked and
2538 * freed, but there may still be a process
2539 * waiting in the dlmlock path which is fine. */
2540 mlog(ML_ERROR, "node %u was expected master\n",
2541 dead_node);
2542 atomic_set(&mle->woken, 1);
2543 spin_unlock(&mle->spinlock);
2544 wake_up(&mle->wq);
f671c09b
KH
2545 /* do not need events any longer, so detach
2546 * from heartbeat */
2547 __dlm_mle_detach_hb_events(dlm, mle);
6714d8e8
KH
2548 __dlm_put_mle(mle);
2549 }
2550 continue;
2551 }
2552
2553 /* everything else is a MIGRATION mle */
2554
2555 /* the rule for MIGRATION mles is that the master
2556 * becomes UNKNOWN if *either* the original or
2557 * the new master dies. all UNKNOWN lockreses
2558 * are sent to whichever node becomes the recovery
2559 * master. the new master is responsible for
2560 * determining if there is still a master for
2561 * this lockres, or if he needs to take over
2562 * mastery. either way, this node should expect
2563 * another message to resolve this. */
2564 if (mle->master != dead_node &&
2565 mle->new_master != dead_node)
2566 continue;
2567
2568 /* if we have reached this point, this mle needs to
2569 * be removed from the list and freed. */
2570
2571 /* remove from the list early. NOTE: unlinking
2572 * list_head while in list_for_each_safe */
2573 spin_lock(&mle->spinlock);
2574 list_del_init(&mle->list);
2575 atomic_set(&mle->woken, 1);
2576 spin_unlock(&mle->spinlock);
2577 wake_up(&mle->wq);
2578
2579 mlog(0, "node %u died during migration from "
2580 "%u to %u!\n", dead_node,
2581 mle->master, mle->new_master);
2582 /* if there is a lockres associated with this
2583 * mle, find it and set its owner to UNKNOWN */
2584 res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2585 mle->u.name.len);
2586 if (res) {
2587 /* unfortunately if we hit this rare case, our
2588 * lock ordering is messed. we need to drop
2589 * the master lock so that we can take the
2590 * lockres lock, meaning that we will have to
2591 * restart from the head of list. */
2592 spin_unlock(&dlm->master_lock);
2593
2594 /* move lockres onto recovery list */
2595 spin_lock(&res->spinlock);
2596 dlm_set_lockres_owner(dlm, res,
2597 DLM_LOCK_RES_OWNER_UNKNOWN);
2598 dlm_move_lockres_to_recovery_list(dlm, res);
2599 spin_unlock(&res->spinlock);
2600 dlm_lockres_put(res);
2601
f671c09b
KH
2602 /* about to get rid of mle, detach from heartbeat */
2603 __dlm_mle_detach_hb_events(dlm, mle);
2604
6714d8e8
KH
2605 /* dump the mle */
2606 spin_lock(&dlm->master_lock);
2607 __dlm_put_mle(mle);
2608 spin_unlock(&dlm->master_lock);
2609
2610 /* restart */
2611 goto top;
2612 }
2613
2614 /* this may be the last reference */
2615 __dlm_put_mle(mle);
2616 }
2617 spin_unlock(&dlm->master_lock);
2618}
2619
2620
2621int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2622 u8 old_master)
2623{
2624 struct dlm_node_iter iter;
2625 int ret = 0;
2626
2627 spin_lock(&dlm->spinlock);
2628 dlm_node_iter_init(dlm->domain_map, &iter);
2629 clear_bit(old_master, iter.node_map);
2630 clear_bit(dlm->node_num, iter.node_map);
2631 spin_unlock(&dlm->spinlock);
2632
2633 mlog(0, "now time to do a migrate request to other nodes\n");
2634 ret = dlm_do_migrate_request(dlm, res, old_master,
2635 dlm->node_num, &iter);
2636 if (ret < 0) {
2637 mlog_errno(ret);
2638 goto leave;
2639 }
2640
2641 mlog(0, "doing assert master of %.*s to all except the original node\n",
2642 res->lockname.len, res->lockname.name);
2643 /* this call now finishes out the nodemap
2644 * even if one or more nodes die */
2645 ret = dlm_do_assert_master(dlm, res->lockname.name,
2646 res->lockname.len, iter.node_map,
2647 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2648 if (ret < 0) {
2649 /* no longer need to retry. all living nodes contacted. */
2650 mlog_errno(ret);
2651 ret = 0;
2652 }
2653
2654 memset(iter.node_map, 0, sizeof(iter.node_map));
2655 set_bit(old_master, iter.node_map);
2656 mlog(0, "doing assert master of %.*s back to %u\n",
2657 res->lockname.len, res->lockname.name, old_master);
2658 ret = dlm_do_assert_master(dlm, res->lockname.name,
2659 res->lockname.len, iter.node_map,
2660 DLM_ASSERT_MASTER_FINISH_MIGRATION);
2661 if (ret < 0) {
2662 mlog(0, "assert master to original master failed "
2663 "with %d.\n", ret);
2664 /* the only nonzero status here would be because of
2665 * a dead original node. we're done. */
2666 ret = 0;
2667 }
2668
2669 /* all done, set the owner, clear the flag */
2670 spin_lock(&res->spinlock);
2671 dlm_set_lockres_owner(dlm, res, dlm->node_num);
2672 res->state &= ~DLM_LOCK_RES_MIGRATING;
2673 spin_unlock(&res->spinlock);
2674 /* re-dirty it on the new master */
2675 dlm_kick_thread(dlm, res);
2676 wake_up(&res->wq);
2677leave:
2678 return ret;
2679}
2680
2681/*
2682 * LOCKRES AST REFCOUNT
2683 * this is integral to migration
2684 */
2685
2686/* for future intent to call an ast, reserve one ahead of time.
2687 * this should be called only after waiting on the lockres
2688 * with dlm_wait_on_lockres, and while still holding the
2689 * spinlock after the call. */
2690void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
2691{
2692 assert_spin_locked(&res->spinlock);
2693 if (res->state & DLM_LOCK_RES_MIGRATING) {
2694 __dlm_print_one_lock_resource(res);
2695 }
2696 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2697
2698 atomic_inc(&res->asts_reserved);
2699}
2700
2701/*
2702 * used to drop the reserved ast, either because it went unused,
2703 * or because the ast/bast was actually called.
2704 *
2705 * also, if there is a pending migration on this lockres,
2706 * and this was the last pending ast on the lockres,
2707 * atomically set the MIGRATING flag before we drop the lock.
2708 * this is how we ensure that migration can proceed with no
2709 * asts in progress. note that it is ok if the state of the
2710 * queues is such that a lock should be granted in the future
2711 * or that a bast should be fired, because the new master will
2712 * shuffle the lists on this lockres as soon as it is migrated.
2713 */
2714void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
2715 struct dlm_lock_resource *res)
2716{
2717 if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
2718 return;
2719
2720 if (!res->migration_pending) {
2721 spin_unlock(&res->spinlock);
2722 return;
2723 }
2724
2725 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2726 res->migration_pending = 0;
2727 res->state |= DLM_LOCK_RES_MIGRATING;
2728 spin_unlock(&res->spinlock);
2729 wake_up(&res->wq);
2730 wake_up(&dlm->migration_wq);
2731}