1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
6 ** This copyrighted material is made available to anyone wishing to use,
7 ** modify, copy, or redistribute it subject to the terms and conditions
8 ** of the GNU General Public License v.2.
10 *******************************************************************************
11 ******************************************************************************/
13 /* Central locking logic has four stages:
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
54 L: send_xxxx() -> R: receive_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
63 #include "requestqueue.h"
67 #include "lockspace.h"
72 #include "lvb_table.h"
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
90 * Lock compatibilty matrix - thanks Steve
91 * UN = Unlocked state. Not really a state, used as a flag
92 * PD = Padding. Used to make the matrix a nice power of two in size
93 * Other states are the same as the VMS DLM.
94 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
97 static const int __dlm_compat_matrix[8][8] = {
98 /* UN NL CR CW PR PW EX PD */
99 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
101 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
102 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
103 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
104 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
105 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
106 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
110 * This defines the direction of transfer of LVB data.
111 * Granted mode is the row; requested mode is the column.
112 * Usage: matrix[grmode+1][rqmode+1]
113 * 1 = LVB is returned to the caller
114 * 0 = LVB is written to the resource
115 * -1 = nothing happens to the LVB
118 const int dlm_lvb_operations[8][8] = {
119 /* UN NL CR CW PR PW EX PD*/
120 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
121 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
122 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
123 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
124 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
125 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
126 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
127 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
130 #define modes_compat(gr, rq) \
131 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133 int dlm_modes_compat(int mode1, int mode2)
135 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 * Compatibility matrix for conversions with QUECVT set.
140 * Granted mode is the row; requested mode is the column.
141 * Usage: matrix[grmode+1][rqmode+1]
144 static const int __quecvt_compat_matrix[8][8] = {
145 /* UN NL CR CW PR PW EX PD */
146 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
147 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
148 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
149 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
150 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
151 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
152 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
153 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
156 void dlm_print_lkb(struct dlm_lkb *lkb)
158 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
165 void dlm_print_rsb(struct dlm_rsb *r)
167 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168 r->res_nodeid, r->res_flags, r->res_first_lkid,
169 r->res_recover_locks_count, r->res_name);
172 void dlm_dump_rsb(struct dlm_rsb *r)
178 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180 printk(KERN_ERR "rsb lookup list\n");
181 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 printk(KERN_ERR "rsb grant queue:\n");
184 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 printk(KERN_ERR "rsb convert queue:\n");
187 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 printk(KERN_ERR "rsb wait queue:\n");
190 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194 /* Threads cannot use the lockspace while it's being recovered */
196 static inline void lock_recovery(struct dlm_ls *ls)
198 down_read(&ls->ls_in_recovery);
201 static inline void unlock_recovery(struct dlm_ls *ls)
203 up_read(&ls->ls_in_recovery);
206 static inline int lock_recovery_try(struct dlm_ls *ls)
208 return down_read_trylock(&ls->ls_in_recovery);
211 static inline int can_be_queued(struct dlm_lkb *lkb)
213 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
216 static inline int force_blocking_asts(struct dlm_lkb *lkb)
218 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
221 static inline int is_demoted(struct dlm_lkb *lkb)
223 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
226 static inline int is_remote(struct dlm_rsb *r)
228 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229 return !!r->res_nodeid;
232 static inline int is_process_copy(struct dlm_lkb *lkb)
234 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
237 static inline int is_master_copy(struct dlm_lkb *lkb)
239 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
241 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
244 static inline int middle_conversion(struct dlm_lkb *lkb)
246 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
252 static inline int down_conversion(struct dlm_lkb *lkb)
254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
257 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
259 if (is_master_copy(lkb))
262 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
264 lkb->lkb_lksb->sb_status = rv;
265 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
267 dlm_add_ast(lkb, AST_COMP);
270 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
272 if (is_master_copy(lkb))
273 send_bast(r, lkb, rqmode);
275 lkb->lkb_bastmode = rqmode;
276 dlm_add_ast(lkb, AST_BAST);
281 * Basic operations on rsb's and lkb's
284 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
288 r = allocate_rsb(ls, len);
294 memcpy(r->res_name, name, len);
295 mutex_init(&r->res_mutex);
297 INIT_LIST_HEAD(&r->res_lookup);
298 INIT_LIST_HEAD(&r->res_grantqueue);
299 INIT_LIST_HEAD(&r->res_convertqueue);
300 INIT_LIST_HEAD(&r->res_waitqueue);
301 INIT_LIST_HEAD(&r->res_root_list);
302 INIT_LIST_HEAD(&r->res_recover_list);
307 static int search_rsb_list(struct list_head *head, char *name, int len,
308 unsigned int flags, struct dlm_rsb **r_ret)
313 list_for_each_entry(r, head, res_hashchain) {
314 if (len == r->res_length && !memcmp(name, r->res_name, len))
320 if (r->res_nodeid && (flags & R_MASTER))
326 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
327 unsigned int flags, struct dlm_rsb **r_ret)
332 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
334 kref_get(&r->res_ref);
337 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
341 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
343 if (dlm_no_directory(ls))
346 if (r->res_nodeid == -1) {
347 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
348 r->res_first_lkid = 0;
349 } else if (r->res_nodeid > 0) {
350 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
351 r->res_first_lkid = 0;
353 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
354 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
361 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
362 unsigned int flags, struct dlm_rsb **r_ret)
365 write_lock(&ls->ls_rsbtbl[b].lock);
366 error = _search_rsb(ls, name, len, b, flags, r_ret);
367 write_unlock(&ls->ls_rsbtbl[b].lock);
372 * Find rsb in rsbtbl and potentially create/add one
374 * Delaying the release of rsb's has a similar benefit to applications keeping
375 * NL locks on an rsb, but without the guarantee that the cached master value
376 * will still be valid when the rsb is reused. Apps aren't always smart enough
377 * to keep NL locks on an rsb that they may lock again shortly; this can lead
378 * to excessive master lookups and removals if we don't delay the release.
380 * Searching for an rsb means looking through both the normal list and toss
381 * list. When found on the toss list the rsb is moved to the normal list with
382 * ref count of 1; when found on normal list the ref count is incremented.
385 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
386 unsigned int flags, struct dlm_rsb **r_ret)
388 struct dlm_rsb *r, *tmp;
389 uint32_t hash, bucket;
392 if (dlm_no_directory(ls))
395 hash = jhash(name, namelen, 0);
396 bucket = hash & (ls->ls_rsbtbl_size - 1);
398 error = search_rsb(ls, name, namelen, bucket, flags, &r);
402 if (error == -EBADR && !(flags & R_CREATE))
405 /* the rsb was found but wasn't a master copy */
406 if (error == -ENOTBLK)
410 r = create_rsb(ls, name, namelen);
415 r->res_bucket = bucket;
417 kref_init(&r->res_ref);
419 /* With no directory, the master can be set immediately */
420 if (dlm_no_directory(ls)) {
421 int nodeid = dlm_dir_nodeid(r);
422 if (nodeid == dlm_our_nodeid())
424 r->res_nodeid = nodeid;
427 write_lock(&ls->ls_rsbtbl[bucket].lock);
428 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
430 write_unlock(&ls->ls_rsbtbl[bucket].lock);
435 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
436 write_unlock(&ls->ls_rsbtbl[bucket].lock);
443 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
444 unsigned int flags, struct dlm_rsb **r_ret)
446 return find_rsb(ls, name, namelen, flags, r_ret);
449 /* This is only called to add a reference when the code already holds
450 a valid reference to the rsb, so there's no need for locking. */
452 static inline void hold_rsb(struct dlm_rsb *r)
454 kref_get(&r->res_ref);
457 void dlm_hold_rsb(struct dlm_rsb *r)
462 static void toss_rsb(struct kref *kref)
464 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
465 struct dlm_ls *ls = r->res_ls;
467 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
468 kref_init(&r->res_ref);
469 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
470 r->res_toss_time = jiffies;
472 free_lvb(r->res_lvbptr);
473 r->res_lvbptr = NULL;
477 /* When all references to the rsb are gone it's transfered to
478 the tossed list for later disposal. */
480 static void put_rsb(struct dlm_rsb *r)
482 struct dlm_ls *ls = r->res_ls;
483 uint32_t bucket = r->res_bucket;
485 write_lock(&ls->ls_rsbtbl[bucket].lock);
486 kref_put(&r->res_ref, toss_rsb);
487 write_unlock(&ls->ls_rsbtbl[bucket].lock);
490 void dlm_put_rsb(struct dlm_rsb *r)
495 /* See comment for unhold_lkb */
497 static void unhold_rsb(struct dlm_rsb *r)
500 rv = kref_put(&r->res_ref, toss_rsb);
501 DLM_ASSERT(!rv, dlm_dump_rsb(r););
504 static void kill_rsb(struct kref *kref)
506 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
508 /* All work is done after the return from kref_put() so we
509 can release the write_lock before the remove and free. */
511 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
512 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
513 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
514 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
515 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
516 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
519 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
520 The rsb must exist as long as any lkb's for it do. */
522 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
525 lkb->lkb_resource = r;
528 static void detach_lkb(struct dlm_lkb *lkb)
530 if (lkb->lkb_resource) {
531 put_rsb(lkb->lkb_resource);
532 lkb->lkb_resource = NULL;
536 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
538 struct dlm_lkb *lkb, *tmp;
542 lkb = allocate_lkb(ls);
546 lkb->lkb_nodeid = -1;
547 lkb->lkb_grmode = DLM_LOCK_IV;
548 kref_init(&lkb->lkb_ref);
549 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
551 get_random_bytes(&bucket, sizeof(bucket));
552 bucket &= (ls->ls_lkbtbl_size - 1);
554 write_lock(&ls->ls_lkbtbl[bucket].lock);
556 /* counter can roll over so we must verify lkid is not in use */
559 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
561 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
563 if (tmp->lkb_id != lkid)
571 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
572 write_unlock(&ls->ls_lkbtbl[bucket].lock);
578 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
580 uint16_t bucket = lkid & 0xFFFF;
583 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 if (lkb->lkb_id == lkid)
590 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
593 uint16_t bucket = lkid & 0xFFFF;
595 if (bucket >= ls->ls_lkbtbl_size)
598 read_lock(&ls->ls_lkbtbl[bucket].lock);
599 lkb = __find_lkb(ls, lkid);
601 kref_get(&lkb->lkb_ref);
602 read_unlock(&ls->ls_lkbtbl[bucket].lock);
605 return lkb ? 0 : -ENOENT;
608 static void kill_lkb(struct kref *kref)
610 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
612 /* All work is done after the return from kref_put() so we
613 can release the write_lock before the detach_lkb */
615 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
618 /* __put_lkb() is used when an lkb may not have an rsb attached to
619 it so we need to provide the lockspace explicitly */
621 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
623 uint16_t bucket = lkb->lkb_id & 0xFFFF;
625 write_lock(&ls->ls_lkbtbl[bucket].lock);
626 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
627 list_del(&lkb->lkb_idtbl_list);
628 write_unlock(&ls->ls_lkbtbl[bucket].lock);
632 /* for local/process lkbs, lvbptr points to caller's lksb */
633 if (lkb->lkb_lvbptr && is_master_copy(lkb))
634 free_lvb(lkb->lkb_lvbptr);
638 write_unlock(&ls->ls_lkbtbl[bucket].lock);
643 int dlm_put_lkb(struct dlm_lkb *lkb)
647 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
648 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
650 ls = lkb->lkb_resource->res_ls;
651 return __put_lkb(ls, lkb);
654 /* This is only called to add a reference when the code already holds
655 a valid reference to the lkb, so there's no need for locking. */
657 static inline void hold_lkb(struct dlm_lkb *lkb)
659 kref_get(&lkb->lkb_ref);
662 /* This is called when we need to remove a reference and are certain
663 it's not the last ref. e.g. del_lkb is always called between a
664 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
665 put_lkb would work fine, but would involve unnecessary locking */
667 static inline void unhold_lkb(struct dlm_lkb *lkb)
670 rv = kref_put(&lkb->lkb_ref, kill_lkb);
671 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
674 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
677 struct dlm_lkb *lkb = NULL;
679 list_for_each_entry(lkb, head, lkb_statequeue)
680 if (lkb->lkb_rqmode < mode)
684 list_add_tail(new, head);
686 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
689 /* add/remove lkb to rsb's grant/convert/wait queue */
691 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
693 kref_get(&lkb->lkb_ref);
695 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
697 lkb->lkb_status = status;
700 case DLM_LKSTS_WAITING:
701 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
702 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
704 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
706 case DLM_LKSTS_GRANTED:
707 /* convention says granted locks kept in order of grmode */
708 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
711 case DLM_LKSTS_CONVERT:
712 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
713 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
715 list_add_tail(&lkb->lkb_statequeue,
716 &r->res_convertqueue);
719 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
723 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
726 list_del(&lkb->lkb_statequeue);
730 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
734 add_lkb(r, lkb, sts);
738 /* add/remove lkb from global waiters list of lkb's waiting for
739 a reply from a remote node */
741 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
743 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
745 mutex_lock(&ls->ls_waiters_mutex);
746 if (lkb->lkb_wait_type) {
747 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
750 lkb->lkb_wait_type = mstype;
751 kref_get(&lkb->lkb_ref);
752 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
754 mutex_unlock(&ls->ls_waiters_mutex);
757 /* We clear the RESEND flag because we might be taking an lkb off the waiters
758 list as part of process_requestqueue (e.g. a lookup that has an optimized
759 request reply on the requestqueue) between dlm_recover_waiters_pre() which
760 set RESEND and dlm_recover_waiters_post() */
762 static int _remove_from_waiters(struct dlm_lkb *lkb)
766 if (!lkb->lkb_wait_type) {
767 log_print("remove_from_waiters error");
771 lkb->lkb_wait_type = 0;
772 lkb->lkb_flags &= ~DLM_IFL_RESEND;
773 list_del(&lkb->lkb_wait_reply);
779 static int remove_from_waiters(struct dlm_lkb *lkb)
781 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
784 mutex_lock(&ls->ls_waiters_mutex);
785 error = _remove_from_waiters(lkb);
786 mutex_unlock(&ls->ls_waiters_mutex);
790 static void dir_remove(struct dlm_rsb *r)
794 if (dlm_no_directory(r->res_ls))
797 to_nodeid = dlm_dir_nodeid(r);
798 if (to_nodeid != dlm_our_nodeid())
801 dlm_dir_remove_entry(r->res_ls, to_nodeid,
802 r->res_name, r->res_length);
805 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
806 found since they are in order of newest to oldest? */
808 static int shrink_bucket(struct dlm_ls *ls, int b)
811 int count = 0, found;
815 write_lock(&ls->ls_rsbtbl[b].lock);
816 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
818 if (!time_after_eq(jiffies, r->res_toss_time +
819 dlm_config.ci_toss_secs * HZ))
826 write_unlock(&ls->ls_rsbtbl[b].lock);
830 if (kref_put(&r->res_ref, kill_rsb)) {
831 list_del(&r->res_hashchain);
832 write_unlock(&ls->ls_rsbtbl[b].lock);
839 write_unlock(&ls->ls_rsbtbl[b].lock);
840 log_error(ls, "tossed rsb in use %s", r->res_name);
847 void dlm_scan_rsbs(struct dlm_ls *ls)
851 if (dlm_locking_stopped(ls))
854 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
855 shrink_bucket(ls, i);
860 /* lkb is master or local copy */
862 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
864 int b, len = r->res_ls->ls_lvblen;
866 /* b=1 lvb returned to caller
867 b=0 lvb written to rsb or invalidated
870 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
873 if (!lkb->lkb_lvbptr)
876 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
882 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
883 lkb->lkb_lvbseq = r->res_lvbseq;
886 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
887 rsb_set_flag(r, RSB_VALNOTVALID);
891 if (!lkb->lkb_lvbptr)
894 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
898 r->res_lvbptr = allocate_lvb(r->res_ls);
903 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
905 lkb->lkb_lvbseq = r->res_lvbseq;
906 rsb_clear_flag(r, RSB_VALNOTVALID);
909 if (rsb_flag(r, RSB_VALNOTVALID))
910 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
913 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
915 if (lkb->lkb_grmode < DLM_LOCK_PW)
918 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
919 rsb_set_flag(r, RSB_VALNOTVALID);
923 if (!lkb->lkb_lvbptr)
926 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
930 r->res_lvbptr = allocate_lvb(r->res_ls);
935 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
937 rsb_clear_flag(r, RSB_VALNOTVALID);
940 /* lkb is process copy (pc) */
942 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
943 struct dlm_message *ms)
947 if (!lkb->lkb_lvbptr)
950 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
953 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
955 int len = receive_extralen(ms);
956 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
957 lkb->lkb_lvbseq = ms->m_lvbseq;
961 /* Manipulate lkb's on rsb's convert/granted/waiting queues
962 remove_lock -- used for unlock, removes lkb from granted
963 revert_lock -- used for cancel, moves lkb from convert to granted
964 grant_lock -- used for request and convert, adds lkb to granted or
965 moves lkb from convert or waiting to granted
967 Each of these is used for master or local copy lkb's. There is
968 also a _pc() variation used to make the corresponding change on
969 a process copy (pc) lkb. */
971 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
974 lkb->lkb_grmode = DLM_LOCK_IV;
975 /* this unhold undoes the original ref from create_lkb()
976 so this leads to the lkb being freed */
980 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
982 set_lvb_unlock(r, lkb);
983 _remove_lock(r, lkb);
986 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988 _remove_lock(r, lkb);
991 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
993 lkb->lkb_rqmode = DLM_LOCK_IV;
995 switch (lkb->lkb_status) {
996 case DLM_LKSTS_GRANTED:
998 case DLM_LKSTS_CONVERT:
999 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1001 case DLM_LKSTS_WAITING:
1003 lkb->lkb_grmode = DLM_LOCK_IV;
1004 /* this unhold undoes the original ref from create_lkb()
1005 so this leads to the lkb being freed */
1009 log_print("invalid status for revert %d", lkb->lkb_status);
1013 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1015 revert_lock(r, lkb);
1018 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1020 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1021 lkb->lkb_grmode = lkb->lkb_rqmode;
1022 if (lkb->lkb_status)
1023 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1025 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1028 lkb->lkb_rqmode = DLM_LOCK_IV;
1031 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1033 set_lvb_lock(r, lkb);
1034 _grant_lock(r, lkb);
1035 lkb->lkb_highbast = 0;
1038 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1039 struct dlm_message *ms)
1041 set_lvb_lock_pc(r, lkb, ms);
1042 _grant_lock(r, lkb);
1045 /* called by grant_pending_locks() which means an async grant message must
1046 be sent to the requesting node in addition to granting the lock if the
1047 lkb belongs to a remote node. */
1049 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1052 if (is_master_copy(lkb))
1055 queue_cast(r, lkb, 0);
1058 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1060 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1062 if (lkb->lkb_id == first->lkb_id)
1068 /* Check if the given lkb conflicts with another lkb on the queue. */
1070 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1072 struct dlm_lkb *this;
1074 list_for_each_entry(this, head, lkb_statequeue) {
1077 if (!modes_compat(this, lkb))
1084 * "A conversion deadlock arises with a pair of lock requests in the converting
1085 * queue for one resource. The granted mode of each lock blocks the requested
1086 * mode of the other lock."
1088 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1089 * convert queue from being granted, then demote lkb (set grmode to NL).
1090 * This second form requires that we check for conv-deadlk even when
1091 * now == 0 in _can_be_granted().
1094 * Granted Queue: empty
1095 * Convert Queue: NL->EX (first lock)
1096 * PR->EX (second lock)
1098 * The first lock can't be granted because of the granted mode of the second
1099 * lock and the second lock can't be granted because it's not first in the
1100 * list. We demote the granted mode of the second lock (the lkb passed to this
1103 * After the resolution, the "grant pending" function needs to go back and try
1104 * to grant locks on the convert queue again since the first lock can now be
1108 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1110 struct dlm_lkb *this, *first = NULL, *self = NULL;
1112 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1120 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1124 /* if lkb is on the convert queue and is preventing the first
1125 from being granted, then there's deadlock and we demote lkb.
1126 multiple converting locks may need to do this before the first
1127 converting lock can be granted. */
1129 if (self && self != first) {
1130 if (!modes_compat(lkb, first) &&
1131 !queue_conflict(&rsb->res_grantqueue, first))
1139 * Return 1 if the lock can be granted, 0 otherwise.
1140 * Also detect and resolve conversion deadlocks.
1142 * lkb is the lock to be granted
1144 * now is 1 if the function is being called in the context of the
1145 * immediate request, it is 0 if called later, after the lock has been
1148 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1151 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1153 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1156 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1157 * a new request for a NL mode lock being blocked.
1159 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1160 * request, then it would be granted. In essence, the use of this flag
1161 * tells the Lock Manager to expedite theis request by not considering
1162 * what may be in the CONVERTING or WAITING queues... As of this
1163 * writing, the EXPEDITE flag can be used only with new requests for NL
1164 * mode locks. This flag is not valid for conversion requests.
1166 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1167 * conversion or used with a non-NL requested mode. We also know an
1168 * EXPEDITE request is always granted immediately, so now must always
1169 * be 1. The full condition to grant an expedite request: (now &&
1170 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1171 * therefore be shortened to just checking the flag.
1174 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1178 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1179 * added to the remaining conditions.
1182 if (queue_conflict(&r->res_grantqueue, lkb))
1186 * 6-3: By default, a conversion request is immediately granted if the
1187 * requested mode is compatible with the modes of all other granted
1191 if (queue_conflict(&r->res_convertqueue, lkb))
1195 * 6-5: But the default algorithm for deciding whether to grant or
1196 * queue conversion requests does not by itself guarantee that such
1197 * requests are serviced on a "first come first serve" basis. This, in
1198 * turn, can lead to a phenomenon known as "indefinate postponement".
1200 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1201 * the system service employed to request a lock conversion. This flag
1202 * forces certain conversion requests to be queued, even if they are
1203 * compatible with the granted modes of other locks on the same
1204 * resource. Thus, the use of this flag results in conversion requests
1205 * being ordered on a "first come first servce" basis.
1207 * DCT: This condition is all about new conversions being able to occur
1208 * "in place" while the lock remains on the granted queue (assuming
1209 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1210 * doesn't _have_ to go onto the convert queue where it's processed in
1211 * order. The "now" variable is necessary to distinguish converts
1212 * being received and processed for the first time now, because once a
1213 * convert is moved to the conversion queue the condition below applies
1214 * requiring fifo granting.
1217 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1221 * The NOORDER flag is set to avoid the standard vms rules on grant
1225 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1229 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1230 * granted until all other conversion requests ahead of it are granted
1234 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1238 * 6-4: By default, a new request is immediately granted only if all
1239 * three of the following conditions are satisfied when the request is
1241 * - The queue of ungranted conversion requests for the resource is
1243 * - The queue of ungranted new requests for the resource is empty.
1244 * - The mode of the new request is compatible with the most
1245 * restrictive mode of all granted locks on the resource.
1248 if (now && !conv && list_empty(&r->res_convertqueue) &&
1249 list_empty(&r->res_waitqueue))
1253 * 6-4: Once a lock request is in the queue of ungranted new requests,
1254 * it cannot be granted until the queue of ungranted conversion
1255 * requests is empty, all ungranted new requests ahead of it are
1256 * granted and/or canceled, and it is compatible with the granted mode
1257 * of the most restrictive lock granted on the resource.
1260 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1261 first_in_list(lkb, &r->res_waitqueue))
1266 * The following, enabled by CONVDEADLK, departs from VMS.
1269 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1270 conversion_deadlock_detect(r, lkb)) {
1271 lkb->lkb_grmode = DLM_LOCK_NL;
1272 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1279 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1280 * simple way to provide a big optimization to applications that can use them.
1283 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1285 uint32_t flags = lkb->lkb_exflags;
1287 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1289 rv = _can_be_granted(r, lkb, now);
1293 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1296 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1298 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1302 lkb->lkb_rqmode = alt;
1303 rv = _can_be_granted(r, lkb, now);
1305 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1307 lkb->lkb_rqmode = rqmode;
1313 static int grant_pending_convert(struct dlm_rsb *r, int high)
1315 struct dlm_lkb *lkb, *s;
1316 int hi, demoted, quit, grant_restart, demote_restart;
1324 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1325 demoted = is_demoted(lkb);
1326 if (can_be_granted(r, lkb, 0)) {
1327 grant_lock_pending(r, lkb);
1330 hi = max_t(int, lkb->lkb_rqmode, hi);
1331 if (!demoted && is_demoted(lkb))
1338 if (demote_restart && !quit) {
1343 return max_t(int, high, hi);
1346 static int grant_pending_wait(struct dlm_rsb *r, int high)
1348 struct dlm_lkb *lkb, *s;
1350 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1351 if (can_be_granted(r, lkb, 0))
1352 grant_lock_pending(r, lkb);
1354 high = max_t(int, lkb->lkb_rqmode, high);
1360 static void grant_pending_locks(struct dlm_rsb *r)
1362 struct dlm_lkb *lkb, *s;
1363 int high = DLM_LOCK_IV;
1365 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1367 high = grant_pending_convert(r, high);
1368 high = grant_pending_wait(r, high);
1370 if (high == DLM_LOCK_IV)
1374 * If there are locks left on the wait/convert queue then send blocking
1375 * ASTs to granted locks based on the largest requested mode (high)
1376 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1379 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1380 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1381 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1382 queue_bast(r, lkb, high);
1383 lkb->lkb_highbast = high;
1388 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1389 struct dlm_lkb *lkb)
1393 list_for_each_entry(gr, head, lkb_statequeue) {
1394 if (gr->lkb_bastaddr &&
1395 gr->lkb_highbast < lkb->lkb_rqmode &&
1396 !modes_compat(gr, lkb)) {
1397 queue_bast(r, gr, lkb->lkb_rqmode);
1398 gr->lkb_highbast = lkb->lkb_rqmode;
1403 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1405 send_bast_queue(r, &r->res_grantqueue, lkb);
1408 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1410 send_bast_queue(r, &r->res_grantqueue, lkb);
1411 send_bast_queue(r, &r->res_convertqueue, lkb);
1414 /* set_master(r, lkb) -- set the master nodeid of a resource
1416 The purpose of this function is to set the nodeid field in the given
1417 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1418 known, it can just be copied to the lkb and the function will return
1419 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1420 before it can be copied to the lkb.
1422 When the rsb nodeid is being looked up remotely, the initial lkb
1423 causing the lookup is kept on the ls_waiters list waiting for the
1424 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1425 on the rsb's res_lookup list until the master is verified.
1428 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1429 1: the rsb master is not available and the lkb has been placed on
1433 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 struct dlm_ls *ls = r->res_ls;
1436 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1438 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1439 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1440 r->res_first_lkid = lkb->lkb_id;
1441 lkb->lkb_nodeid = r->res_nodeid;
1445 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1446 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1450 if (r->res_nodeid == 0) {
1451 lkb->lkb_nodeid = 0;
1455 if (r->res_nodeid > 0) {
1456 lkb->lkb_nodeid = r->res_nodeid;
1460 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1462 dir_nodeid = dlm_dir_nodeid(r);
1464 if (dir_nodeid != our_nodeid) {
1465 r->res_first_lkid = lkb->lkb_id;
1466 send_lookup(r, lkb);
1471 /* It's possible for dlm_scand to remove an old rsb for
1472 this same resource from the toss list, us to create
1473 a new one, look up the master locally, and find it
1474 already exists just before dlm_scand does the
1475 dir_remove() on the previous rsb. */
1477 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1478 r->res_length, &ret_nodeid);
1481 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1485 if (ret_nodeid == our_nodeid) {
1486 r->res_first_lkid = 0;
1488 lkb->lkb_nodeid = 0;
1490 r->res_first_lkid = lkb->lkb_id;
1491 r->res_nodeid = ret_nodeid;
1492 lkb->lkb_nodeid = ret_nodeid;
1497 static void process_lookup_list(struct dlm_rsb *r)
1499 struct dlm_lkb *lkb, *safe;
1501 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502 list_del(&lkb->lkb_rsb_lookup);
1503 _request_lock(r, lkb);
1508 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1510 static void confirm_master(struct dlm_rsb *r, int error)
1512 struct dlm_lkb *lkb;
1514 if (!r->res_first_lkid)
1520 r->res_first_lkid = 0;
1521 process_lookup_list(r);
1525 /* the remote master didn't queue our NOQUEUE request;
1526 make a waiting lkb the first_lkid */
1528 r->res_first_lkid = 0;
1530 if (!list_empty(&r->res_lookup)) {
1531 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1533 list_del(&lkb->lkb_rsb_lookup);
1534 r->res_first_lkid = lkb->lkb_id;
1535 _request_lock(r, lkb);
1541 log_error(r->res_ls, "confirm_master unknown error %d", error);
1545 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1546 int namelen, uint32_t parent_lkid, void *ast,
1547 void *astarg, void *bast, struct dlm_args *args)
1551 /* check for invalid arg usage */
1553 if (mode < 0 || mode > DLM_LOCK_EX)
1556 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1559 if (flags & DLM_LKF_CANCEL)
1562 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1565 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1568 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1571 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1574 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1577 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1580 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1586 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1589 /* parent/child locks not yet supported */
1593 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1596 /* these args will be copied to the lkb in validate_lock_args,
1597 it cannot be done now because when converting locks, fields in
1598 an active lkb cannot be modified before locking the rsb */
1600 args->flags = flags;
1601 args->astaddr = ast;
1602 args->astparam = (long) astarg;
1603 args->bastaddr = bast;
1611 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1613 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1614 DLM_LKF_FORCEUNLOCK))
1617 args->flags = flags;
1618 args->astparam = (long) astarg;
1622 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1623 struct dlm_args *args)
1627 if (args->flags & DLM_LKF_CONVERT) {
1628 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1631 if (args->flags & DLM_LKF_QUECVT &&
1632 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1636 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1639 if (lkb->lkb_wait_type)
1643 lkb->lkb_exflags = args->flags;
1644 lkb->lkb_sbflags = 0;
1645 lkb->lkb_astaddr = args->astaddr;
1646 lkb->lkb_astparam = args->astparam;
1647 lkb->lkb_bastaddr = args->bastaddr;
1648 lkb->lkb_rqmode = args->mode;
1649 lkb->lkb_lksb = args->lksb;
1650 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1651 lkb->lkb_ownpid = (int) current->pid;
1657 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1661 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1664 if (args->flags & DLM_LKF_FORCEUNLOCK)
1667 if (args->flags & DLM_LKF_CANCEL &&
1668 lkb->lkb_status == DLM_LKSTS_GRANTED)
1671 if (!(args->flags & DLM_LKF_CANCEL) &&
1672 lkb->lkb_status != DLM_LKSTS_GRANTED)
1676 if (lkb->lkb_wait_type)
1680 lkb->lkb_exflags = args->flags;
1681 lkb->lkb_sbflags = 0;
1682 lkb->lkb_astparam = args->astparam;
1690 * Four stage 4 varieties:
1691 * do_request(), do_convert(), do_unlock(), do_cancel()
1692 * These are called on the master node for the given lock and
1693 * from the central locking logic.
1696 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1700 if (can_be_granted(r, lkb, 1)) {
1702 queue_cast(r, lkb, 0);
1706 if (can_be_queued(lkb)) {
1707 error = -EINPROGRESS;
1708 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1709 send_blocking_asts(r, lkb);
1714 if (force_blocking_asts(lkb))
1715 send_blocking_asts_all(r, lkb);
1716 queue_cast(r, lkb, -EAGAIN);
1722 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1726 /* changing an existing lock may allow others to be granted */
1728 if (can_be_granted(r, lkb, 1)) {
1730 queue_cast(r, lkb, 0);
1731 grant_pending_locks(r);
1735 if (can_be_queued(lkb)) {
1736 if (is_demoted(lkb))
1737 grant_pending_locks(r);
1738 error = -EINPROGRESS;
1740 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1741 send_blocking_asts(r, lkb);
1746 if (force_blocking_asts(lkb))
1747 send_blocking_asts_all(r, lkb);
1748 queue_cast(r, lkb, -EAGAIN);
1754 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1756 remove_lock(r, lkb);
1757 queue_cast(r, lkb, -DLM_EUNLOCK);
1758 grant_pending_locks(r);
1759 return -DLM_EUNLOCK;
1762 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1763 skip the queue_cast(ECANCEL). It indicates that the request/convert
1764 completed (and queued a normal ast) just before the cancel; we don't
1765 want to clobber the sb_result for the normal ast with ECANCEL. */
1767 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1769 revert_lock(r, lkb);
1770 queue_cast(r, lkb, -DLM_ECANCEL);
1771 grant_pending_locks(r);
1772 return -DLM_ECANCEL;
1776 * Four stage 3 varieties:
1777 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1780 /* add a new lkb to a possibly new rsb, called by requesting process */
1782 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1786 /* set_master: sets lkb nodeid from r */
1788 error = set_master(r, lkb);
1797 /* receive_request() calls do_request() on remote node */
1798 error = send_request(r, lkb);
1800 error = do_request(r, lkb);
1805 /* change some property of an existing lkb, e.g. mode */
1807 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1812 /* receive_convert() calls do_convert() on remote node */
1813 error = send_convert(r, lkb);
1815 error = do_convert(r, lkb);
1820 /* remove an existing lkb from the granted queue */
1822 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1827 /* receive_unlock() calls do_unlock() on remote node */
1828 error = send_unlock(r, lkb);
1830 error = do_unlock(r, lkb);
1835 /* remove an existing lkb from the convert or wait queue */
1837 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1842 /* receive_cancel() calls do_cancel() on remote node */
1843 error = send_cancel(r, lkb);
1845 error = do_cancel(r, lkb);
1851 * Four stage 2 varieties:
1852 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1855 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1856 int len, struct dlm_args *args)
1861 error = validate_lock_args(ls, lkb, args);
1865 error = find_rsb(ls, name, len, R_CREATE, &r);
1872 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1874 error = _request_lock(r, lkb);
1883 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1884 struct dlm_args *args)
1889 r = lkb->lkb_resource;
1894 error = validate_lock_args(ls, lkb, args);
1898 error = _convert_lock(r, lkb);
1905 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1906 struct dlm_args *args)
1911 r = lkb->lkb_resource;
1916 error = validate_unlock_args(lkb, args);
1920 error = _unlock_lock(r, lkb);
1927 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1928 struct dlm_args *args)
1933 r = lkb->lkb_resource;
1938 error = validate_unlock_args(lkb, args);
1942 error = _cancel_lock(r, lkb);
1950 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
1953 int dlm_lock(dlm_lockspace_t *lockspace,
1955 struct dlm_lksb *lksb,
1958 unsigned int namelen,
1959 uint32_t parent_lkid,
1960 void (*ast) (void *astarg),
1962 void (*bast) (void *astarg, int mode))
1965 struct dlm_lkb *lkb;
1966 struct dlm_args args;
1967 int error, convert = flags & DLM_LKF_CONVERT;
1969 ls = dlm_find_lockspace_local(lockspace);
1976 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1978 error = create_lkb(ls, &lkb);
1983 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1984 astarg, bast, &args);
1989 error = convert_lock(ls, lkb, &args);
1991 error = request_lock(ls, lkb, name, namelen, &args);
1993 if (error == -EINPROGRESS)
1996 if (convert || error)
1998 if (error == -EAGAIN)
2001 unlock_recovery(ls);
2002 dlm_put_lockspace(ls);
2006 int dlm_unlock(dlm_lockspace_t *lockspace,
2009 struct dlm_lksb *lksb,
2013 struct dlm_lkb *lkb;
2014 struct dlm_args args;
2017 ls = dlm_find_lockspace_local(lockspace);
2023 error = find_lkb(ls, lkid, &lkb);
2027 error = set_unlock_args(flags, astarg, &args);
2031 if (flags & DLM_LKF_CANCEL)
2032 error = cancel_lock(ls, lkb, &args);
2034 error = unlock_lock(ls, lkb, &args);
2036 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2041 unlock_recovery(ls);
2042 dlm_put_lockspace(ls);
2047 * send/receive routines for remote operations and replies
2051 * send_request receive_request
2052 * send_convert receive_convert
2053 * send_unlock receive_unlock
2054 * send_cancel receive_cancel
2055 * send_grant receive_grant
2056 * send_bast receive_bast
2057 * send_lookup receive_lookup
2058 * send_remove receive_remove
2061 * receive_request_reply send_request_reply
2062 * receive_convert_reply send_convert_reply
2063 * receive_unlock_reply send_unlock_reply
2064 * receive_cancel_reply send_cancel_reply
2065 * receive_lookup_reply send_lookup_reply
2068 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2069 int to_nodeid, int mstype,
2070 struct dlm_message **ms_ret,
2071 struct dlm_mhandle **mh_ret)
2073 struct dlm_message *ms;
2074 struct dlm_mhandle *mh;
2076 int mb_len = sizeof(struct dlm_message);
2079 case DLM_MSG_REQUEST:
2080 case DLM_MSG_LOOKUP:
2081 case DLM_MSG_REMOVE:
2082 mb_len += r->res_length;
2084 case DLM_MSG_CONVERT:
2085 case DLM_MSG_UNLOCK:
2086 case DLM_MSG_REQUEST_REPLY:
2087 case DLM_MSG_CONVERT_REPLY:
2089 if (lkb && lkb->lkb_lvbptr)
2090 mb_len += r->res_ls->ls_lvblen;
2094 /* get_buffer gives us a message handle (mh) that we need to
2095 pass into lowcomms_commit and a message buffer (mb) that we
2096 write our data into */
2098 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2102 memset(mb, 0, mb_len);
2104 ms = (struct dlm_message *) mb;
2106 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2107 ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2108 ms->m_header.h_nodeid = dlm_our_nodeid();
2109 ms->m_header.h_length = mb_len;
2110 ms->m_header.h_cmd = DLM_MSG;
2112 ms->m_type = mstype;
2119 /* further lowcomms enhancements or alternate implementations may make
2120 the return value from this function useful at some point */
2122 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2124 dlm_message_out(ms);
2125 dlm_lowcomms_commit_buffer(mh);
2129 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2130 struct dlm_message *ms)
2132 ms->m_nodeid = lkb->lkb_nodeid;
2133 ms->m_pid = lkb->lkb_ownpid;
2134 ms->m_lkid = lkb->lkb_id;
2135 ms->m_remid = lkb->lkb_remid;
2136 ms->m_exflags = lkb->lkb_exflags;
2137 ms->m_sbflags = lkb->lkb_sbflags;
2138 ms->m_flags = lkb->lkb_flags;
2139 ms->m_lvbseq = lkb->lkb_lvbseq;
2140 ms->m_status = lkb->lkb_status;
2141 ms->m_grmode = lkb->lkb_grmode;
2142 ms->m_rqmode = lkb->lkb_rqmode;
2143 ms->m_hash = r->res_hash;
2145 /* m_result and m_bastmode are set from function args,
2146 not from lkb fields */
2148 if (lkb->lkb_bastaddr)
2149 ms->m_asts |= AST_BAST;
2150 if (lkb->lkb_astaddr)
2151 ms->m_asts |= AST_COMP;
2153 /* compare with switch in create_message; send_remove() doesn't
2156 switch (ms->m_type) {
2157 case DLM_MSG_REQUEST:
2158 case DLM_MSG_LOOKUP:
2159 memcpy(ms->m_extra, r->res_name, r->res_length);
2161 case DLM_MSG_CONVERT:
2162 case DLM_MSG_UNLOCK:
2163 case DLM_MSG_REQUEST_REPLY:
2164 case DLM_MSG_CONVERT_REPLY:
2166 if (!lkb->lkb_lvbptr)
2168 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2173 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2175 struct dlm_message *ms;
2176 struct dlm_mhandle *mh;
2177 int to_nodeid, error;
2179 add_to_waiters(lkb, mstype);
2181 to_nodeid = r->res_nodeid;
2183 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2187 send_args(r, lkb, ms);
2189 error = send_message(mh, ms);
2195 remove_from_waiters(lkb);
2199 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2201 return send_common(r, lkb, DLM_MSG_REQUEST);
2204 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2208 error = send_common(r, lkb, DLM_MSG_CONVERT);
2210 /* down conversions go without a reply from the master */
2211 if (!error && down_conversion(lkb)) {
2212 remove_from_waiters(lkb);
2213 r->res_ls->ls_stub_ms.m_result = 0;
2214 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2215 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2221 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2222 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2223 that the master is still correct. */
2225 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2227 return send_common(r, lkb, DLM_MSG_UNLOCK);
2230 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2232 return send_common(r, lkb, DLM_MSG_CANCEL);
2235 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2237 struct dlm_message *ms;
2238 struct dlm_mhandle *mh;
2239 int to_nodeid, error;
2241 to_nodeid = lkb->lkb_nodeid;
2243 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2247 send_args(r, lkb, ms);
2251 error = send_message(mh, ms);
2256 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2258 struct dlm_message *ms;
2259 struct dlm_mhandle *mh;
2260 int to_nodeid, error;
2262 to_nodeid = lkb->lkb_nodeid;
2264 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2268 send_args(r, lkb, ms);
2270 ms->m_bastmode = mode;
2272 error = send_message(mh, ms);
2277 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2279 struct dlm_message *ms;
2280 struct dlm_mhandle *mh;
2281 int to_nodeid, error;
2283 add_to_waiters(lkb, DLM_MSG_LOOKUP);
2285 to_nodeid = dlm_dir_nodeid(r);
2287 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2291 send_args(r, lkb, ms);
2293 error = send_message(mh, ms);
2299 remove_from_waiters(lkb);
2303 static int send_remove(struct dlm_rsb *r)
2305 struct dlm_message *ms;
2306 struct dlm_mhandle *mh;
2307 int to_nodeid, error;
2309 to_nodeid = dlm_dir_nodeid(r);
2311 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2315 memcpy(ms->m_extra, r->res_name, r->res_length);
2316 ms->m_hash = r->res_hash;
2318 error = send_message(mh, ms);
2323 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2326 struct dlm_message *ms;
2327 struct dlm_mhandle *mh;
2328 int to_nodeid, error;
2330 to_nodeid = lkb->lkb_nodeid;
2332 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2336 send_args(r, lkb, ms);
2340 error = send_message(mh, ms);
2345 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2347 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2350 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2352 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2355 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2357 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2360 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2362 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2365 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2366 int ret_nodeid, int rv)
2368 struct dlm_rsb *r = &ls->ls_stub_rsb;
2369 struct dlm_message *ms;
2370 struct dlm_mhandle *mh;
2371 int error, nodeid = ms_in->m_header.h_nodeid;
2373 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2377 ms->m_lkid = ms_in->m_lkid;
2379 ms->m_nodeid = ret_nodeid;
2381 error = send_message(mh, ms);
2386 /* which args we save from a received message depends heavily on the type
2387 of message, unlike the send side where we can safely send everything about
2388 the lkb for any type of message */
2390 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2392 lkb->lkb_exflags = ms->m_exflags;
2393 lkb->lkb_sbflags = ms->m_sbflags;
2394 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2395 (ms->m_flags & 0x0000FFFF);
2398 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2400 lkb->lkb_sbflags = ms->m_sbflags;
2401 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2402 (ms->m_flags & 0x0000FFFF);
2405 static int receive_extralen(struct dlm_message *ms)
2407 return (ms->m_header.h_length - sizeof(struct dlm_message));
2410 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2411 struct dlm_message *ms)
2415 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2416 if (!lkb->lkb_lvbptr)
2417 lkb->lkb_lvbptr = allocate_lvb(ls);
2418 if (!lkb->lkb_lvbptr)
2420 len = receive_extralen(ms);
2421 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2426 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2427 struct dlm_message *ms)
2429 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2430 lkb->lkb_ownpid = ms->m_pid;
2431 lkb->lkb_remid = ms->m_lkid;
2432 lkb->lkb_grmode = DLM_LOCK_IV;
2433 lkb->lkb_rqmode = ms->m_rqmode;
2434 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2435 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2437 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2439 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2440 /* lkb was just created so there won't be an lvb yet */
2441 lkb->lkb_lvbptr = allocate_lvb(ls);
2442 if (!lkb->lkb_lvbptr)
2449 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2450 struct dlm_message *ms)
2452 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2453 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2454 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2455 lkb->lkb_id, lkb->lkb_remid);
2459 if (!is_master_copy(lkb))
2462 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2465 if (receive_lvb(ls, lkb, ms))
2468 lkb->lkb_rqmode = ms->m_rqmode;
2469 lkb->lkb_lvbseq = ms->m_lvbseq;
2474 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2475 struct dlm_message *ms)
2477 if (!is_master_copy(lkb))
2479 if (receive_lvb(ls, lkb, ms))
2484 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2485 uses to send a reply and that the remote end uses to process the reply. */
2487 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2489 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2490 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2491 lkb->lkb_remid = ms->m_lkid;
2494 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2496 struct dlm_lkb *lkb;
2500 error = create_lkb(ls, &lkb);
2504 receive_flags(lkb, ms);
2505 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2506 error = receive_request_args(ls, lkb, ms);
2512 namelen = receive_extralen(ms);
2514 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2523 error = do_request(r, lkb);
2524 send_request_reply(r, lkb, error);
2529 if (error == -EINPROGRESS)
2536 setup_stub_lkb(ls, ms);
2537 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2540 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2542 struct dlm_lkb *lkb;
2544 int error, reply = 1;
2546 error = find_lkb(ls, ms->m_remid, &lkb);
2550 r = lkb->lkb_resource;
2555 receive_flags(lkb, ms);
2556 error = receive_convert_args(ls, lkb, ms);
2559 reply = !down_conversion(lkb);
2561 error = do_convert(r, lkb);
2564 send_convert_reply(r, lkb, error);
2572 setup_stub_lkb(ls, ms);
2573 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2576 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2578 struct dlm_lkb *lkb;
2582 error = find_lkb(ls, ms->m_remid, &lkb);
2586 r = lkb->lkb_resource;
2591 receive_flags(lkb, ms);
2592 error = receive_unlock_args(ls, lkb, ms);
2596 error = do_unlock(r, lkb);
2598 send_unlock_reply(r, lkb, error);
2606 setup_stub_lkb(ls, ms);
2607 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2610 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2612 struct dlm_lkb *lkb;
2616 error = find_lkb(ls, ms->m_remid, &lkb);
2620 receive_flags(lkb, ms);
2622 r = lkb->lkb_resource;
2627 error = do_cancel(r, lkb);
2628 send_cancel_reply(r, lkb, error);
2636 setup_stub_lkb(ls, ms);
2637 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2640 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2642 struct dlm_lkb *lkb;
2646 error = find_lkb(ls, ms->m_remid, &lkb);
2648 log_error(ls, "receive_grant no lkb");
2651 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2653 r = lkb->lkb_resource;
2658 receive_flags_reply(lkb, ms);
2659 grant_lock_pc(r, lkb, ms);
2660 queue_cast(r, lkb, 0);
2667 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2669 struct dlm_lkb *lkb;
2673 error = find_lkb(ls, ms->m_remid, &lkb);
2675 log_error(ls, "receive_bast no lkb");
2678 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2680 r = lkb->lkb_resource;
2685 queue_bast(r, lkb, ms->m_bastmode);
2692 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2694 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2696 from_nodeid = ms->m_header.h_nodeid;
2697 our_nodeid = dlm_our_nodeid();
2699 len = receive_extralen(ms);
2701 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2702 if (dir_nodeid != our_nodeid) {
2703 log_error(ls, "lookup dir_nodeid %d from %d",
2704 dir_nodeid, from_nodeid);
2710 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2712 /* Optimization: we're master so treat lookup as a request */
2713 if (!error && ret_nodeid == our_nodeid) {
2714 receive_request(ls, ms);
2718 send_lookup_reply(ls, ms, ret_nodeid, error);
2721 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2723 int len, dir_nodeid, from_nodeid;
2725 from_nodeid = ms->m_header.h_nodeid;
2727 len = receive_extralen(ms);
2729 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2730 if (dir_nodeid != dlm_our_nodeid()) {
2731 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2732 dir_nodeid, from_nodeid);
2736 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2739 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2741 struct dlm_lkb *lkb;
2745 error = find_lkb(ls, ms->m_remid, &lkb);
2747 log_error(ls, "receive_request_reply no lkb");
2750 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2752 mstype = lkb->lkb_wait_type;
2753 error = remove_from_waiters(lkb);
2755 log_error(ls, "receive_request_reply not on waiters");
2759 /* this is the value returned from do_request() on the master */
2760 error = ms->m_result;
2762 r = lkb->lkb_resource;
2766 /* Optimization: the dir node was also the master, so it took our
2767 lookup as a request and sent request reply instead of lookup reply */
2768 if (mstype == DLM_MSG_LOOKUP) {
2769 r->res_nodeid = ms->m_header.h_nodeid;
2770 lkb->lkb_nodeid = r->res_nodeid;
2775 /* request would block (be queued) on remote master;
2776 the unhold undoes the original ref from create_lkb()
2777 so it leads to the lkb being freed */
2778 queue_cast(r, lkb, -EAGAIN);
2779 confirm_master(r, -EAGAIN);
2785 /* request was queued or granted on remote master */
2786 receive_flags_reply(lkb, ms);
2787 lkb->lkb_remid = ms->m_lkid;
2789 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2791 grant_lock_pc(r, lkb, ms);
2792 queue_cast(r, lkb, 0);
2794 confirm_master(r, error);
2799 /* find_rsb failed to find rsb or rsb wasn't master */
2801 lkb->lkb_nodeid = -1;
2802 _request_lock(r, lkb);
2806 log_error(ls, "receive_request_reply error %d", error);
2815 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 struct dlm_message *ms)
2818 int error = ms->m_result;
2820 /* this is the value returned from do_convert() on the master */
2824 /* convert would block (be queued) on remote master */
2825 queue_cast(r, lkb, -EAGAIN);
2829 /* convert was queued on remote master */
2831 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2835 /* convert was granted on remote master */
2836 receive_flags_reply(lkb, ms);
2837 grant_lock_pc(r, lkb, ms);
2838 queue_cast(r, lkb, 0);
2842 log_error(r->res_ls, "receive_convert_reply error %d", error);
2846 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2848 struct dlm_rsb *r = lkb->lkb_resource;
2853 __receive_convert_reply(r, lkb, ms);
2859 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2861 struct dlm_lkb *lkb;
2864 error = find_lkb(ls, ms->m_remid, &lkb);
2866 log_error(ls, "receive_convert_reply no lkb");
2869 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2871 error = remove_from_waiters(lkb);
2873 log_error(ls, "receive_convert_reply not on waiters");
2877 _receive_convert_reply(lkb, ms);
2882 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2884 struct dlm_rsb *r = lkb->lkb_resource;
2885 int error = ms->m_result;
2890 /* this is the value returned from do_unlock() on the master */
2894 receive_flags_reply(lkb, ms);
2895 remove_lock_pc(r, lkb);
2896 queue_cast(r, lkb, -DLM_EUNLOCK);
2899 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2906 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2908 struct dlm_lkb *lkb;
2911 error = find_lkb(ls, ms->m_remid, &lkb);
2913 log_error(ls, "receive_unlock_reply no lkb");
2916 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2918 error = remove_from_waiters(lkb);
2920 log_error(ls, "receive_unlock_reply not on waiters");
2924 _receive_unlock_reply(lkb, ms);
2929 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2931 struct dlm_rsb *r = lkb->lkb_resource;
2932 int error = ms->m_result;
2937 /* this is the value returned from do_cancel() on the master */
2941 receive_flags_reply(lkb, ms);
2942 revert_lock_pc(r, lkb);
2943 queue_cast(r, lkb, -DLM_ECANCEL);
2946 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2953 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2955 struct dlm_lkb *lkb;
2958 error = find_lkb(ls, ms->m_remid, &lkb);
2960 log_error(ls, "receive_cancel_reply no lkb");
2963 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2965 error = remove_from_waiters(lkb);
2967 log_error(ls, "receive_cancel_reply not on waiters");
2971 _receive_cancel_reply(lkb, ms);
2976 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2978 struct dlm_lkb *lkb;
2980 int error, ret_nodeid;
2982 error = find_lkb(ls, ms->m_lkid, &lkb);
2984 log_error(ls, "receive_lookup_reply no lkb");
2988 error = remove_from_waiters(lkb);
2990 log_error(ls, "receive_lookup_reply not on waiters");
2994 /* this is the value returned by dlm_dir_lookup on dir node
2995 FIXME: will a non-zero error ever be returned? */
2996 error = ms->m_result;
2998 r = lkb->lkb_resource;
3002 ret_nodeid = ms->m_nodeid;
3003 if (ret_nodeid == dlm_our_nodeid()) {
3006 r->res_first_lkid = 0;
3008 /* set_master() will copy res_nodeid to lkb_nodeid */
3009 r->res_nodeid = ret_nodeid;
3012 _request_lock(r, lkb);
3015 process_lookup_list(r);
3023 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3025 struct dlm_message *ms = (struct dlm_message *) hd;
3032 ls = dlm_find_lockspace_global(hd->h_lockspace);
3034 log_print("drop message %d from %d for unknown lockspace %d",
3035 ms->m_type, nodeid, hd->h_lockspace);
3039 /* recovery may have just ended leaving a bunch of backed-up requests
3040 in the requestqueue; wait while dlm_recoverd clears them */
3043 dlm_wait_requestqueue(ls);
3045 /* recovery may have just started while there were a bunch of
3046 in-flight requests -- save them in requestqueue to be processed
3047 after recovery. we can't let dlm_recvd block on the recovery
3048 lock. if dlm_recoverd is calling this function to clear the
3049 requestqueue, it needs to be interrupted (-EINTR) if another
3050 recovery operation is starting. */
3053 if (dlm_locking_stopped(ls)) {
3058 error = dlm_add_requestqueue(ls, nodeid, hd);
3059 if (error == -EAGAIN)
3067 if (lock_recovery_try(ls))
3072 switch (ms->m_type) {
3074 /* messages sent to a master node */
3076 case DLM_MSG_REQUEST:
3077 receive_request(ls, ms);
3080 case DLM_MSG_CONVERT:
3081 receive_convert(ls, ms);
3084 case DLM_MSG_UNLOCK:
3085 receive_unlock(ls, ms);
3088 case DLM_MSG_CANCEL:
3089 receive_cancel(ls, ms);
3092 /* messages sent from a master node (replies to above) */
3094 case DLM_MSG_REQUEST_REPLY:
3095 receive_request_reply(ls, ms);
3098 case DLM_MSG_CONVERT_REPLY:
3099 receive_convert_reply(ls, ms);
3102 case DLM_MSG_UNLOCK_REPLY:
3103 receive_unlock_reply(ls, ms);
3106 case DLM_MSG_CANCEL_REPLY:
3107 receive_cancel_reply(ls, ms);
3110 /* messages sent from a master node (only two types of async msg) */
3113 receive_grant(ls, ms);
3117 receive_bast(ls, ms);
3120 /* messages sent to a dir node */
3122 case DLM_MSG_LOOKUP:
3123 receive_lookup(ls, ms);
3126 case DLM_MSG_REMOVE:
3127 receive_remove(ls, ms);
3130 /* messages sent from a dir node (remove has no reply) */
3132 case DLM_MSG_LOOKUP_REPLY:
3133 receive_lookup_reply(ls, ms);
3137 log_error(ls, "unknown message type %d", ms->m_type);
3140 unlock_recovery(ls);
3142 dlm_put_lockspace(ls);
3152 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3154 if (middle_conversion(lkb)) {
3156 ls->ls_stub_ms.m_result = -EINPROGRESS;
3157 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3158 _remove_from_waiters(lkb);
3159 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3161 /* Same special case as in receive_rcom_lock_args() */
3162 lkb->lkb_grmode = DLM_LOCK_IV;
3163 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3166 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3167 lkb->lkb_flags |= DLM_IFL_RESEND;
3170 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3171 conversions are async; there's no reply from the remote master */
3174 /* A waiting lkb needs recovery if the master node has failed, or
3175 the master node is changing (only when no directory is used) */
3177 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3179 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3182 if (!dlm_no_directory(ls))
3185 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3191 /* Recovery for locks that are waiting for replies from nodes that are now
3192 gone. We can just complete unlocks and cancels by faking a reply from the
3193 dead node. Requests and up-conversions we flag to be resent after
3194 recovery. Down-conversions can just be completed with a fake reply like
3195 unlocks. Conversions between PR and CW need special attention. */
3197 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3199 struct dlm_lkb *lkb, *safe;
3201 mutex_lock(&ls->ls_waiters_mutex);
3203 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3204 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3205 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3207 /* all outstanding lookups, regardless of destination will be
3208 resent after recovery is done */
3210 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3211 lkb->lkb_flags |= DLM_IFL_RESEND;
3215 if (!waiter_needs_recovery(ls, lkb))
3218 switch (lkb->lkb_wait_type) {
3220 case DLM_MSG_REQUEST:
3221 lkb->lkb_flags |= DLM_IFL_RESEND;
3224 case DLM_MSG_CONVERT:
3225 recover_convert_waiter(ls, lkb);
3228 case DLM_MSG_UNLOCK:
3230 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3231 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3232 _remove_from_waiters(lkb);
3233 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3237 case DLM_MSG_CANCEL:
3239 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3240 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3241 _remove_from_waiters(lkb);
3242 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3247 log_error(ls, "invalid lkb wait_type %d",
3248 lkb->lkb_wait_type);
3252 mutex_unlock(&ls->ls_waiters_mutex);
3255 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3257 struct dlm_lkb *lkb;
3260 mutex_lock(&ls->ls_waiters_mutex);
3261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263 rv = lkb->lkb_wait_type;
3264 _remove_from_waiters(lkb);
3265 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3269 mutex_unlock(&ls->ls_waiters_mutex);
3277 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3278 master or dir-node for r. Processing the lkb may result in it being placed
3281 int dlm_recover_waiters_post(struct dlm_ls *ls)
3283 struct dlm_lkb *lkb;
3285 int error = 0, mstype;
3288 if (dlm_locking_stopped(ls)) {
3289 log_debug(ls, "recover_waiters_post aborted");
3294 mstype = remove_resend_waiter(ls, &lkb);
3298 r = lkb->lkb_resource;
3300 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3305 case DLM_MSG_LOOKUP:
3308 _request_lock(r, lkb);
3310 confirm_master(r, 0);
3315 case DLM_MSG_REQUEST:
3318 _request_lock(r, lkb);
3320 confirm_master(r, 0);
3325 case DLM_MSG_CONVERT:
3328 _convert_lock(r, lkb);
3334 log_error(ls, "recover_waiters_post type %d", mstype);
3341 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3342 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3344 struct dlm_ls *ls = r->res_ls;
3345 struct dlm_lkb *lkb, *safe;
3347 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3348 if (test(ls, lkb)) {
3349 rsb_set_flag(r, RSB_LOCKS_PURGED);
3351 /* this put should free the lkb */
3352 if (!dlm_put_lkb(lkb))
3353 log_error(ls, "purged lkb not released");
3358 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3360 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3363 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3365 return is_master_copy(lkb);
3368 static void purge_dead_locks(struct dlm_rsb *r)
3370 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3371 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3372 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3375 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3377 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3378 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3379 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3382 /* Get rid of locks held by nodes that are gone. */
3384 int dlm_purge_locks(struct dlm_ls *ls)
3388 log_debug(ls, "dlm_purge_locks");
3390 down_write(&ls->ls_root_sem);
3391 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3395 purge_dead_locks(r);
3401 up_write(&ls->ls_root_sem);
3406 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3408 struct dlm_rsb *r, *r_ret = NULL;
3410 read_lock(&ls->ls_rsbtbl[bucket].lock);
3411 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3412 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3415 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3419 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3423 void dlm_grant_after_purge(struct dlm_ls *ls)
3429 r = find_purged_rsb(ls, bucket);
3431 if (bucket == ls->ls_rsbtbl_size - 1)
3438 grant_pending_locks(r);
3439 confirm_master(r, 0);
3447 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3450 struct dlm_lkb *lkb;
3452 list_for_each_entry(lkb, head, lkb_statequeue) {
3453 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3459 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3462 struct dlm_lkb *lkb;
3464 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3467 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3470 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3476 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3477 struct dlm_rsb *r, struct dlm_rcom *rc)
3479 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3482 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3483 lkb->lkb_ownpid = rl->rl_ownpid;
3484 lkb->lkb_remid = rl->rl_lkid;
3485 lkb->lkb_exflags = rl->rl_exflags;
3486 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3487 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3488 lkb->lkb_lvbseq = rl->rl_lvbseq;
3489 lkb->lkb_rqmode = rl->rl_rqmode;
3490 lkb->lkb_grmode = rl->rl_grmode;
3491 /* don't set lkb_status because add_lkb wants to itself */
3493 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3494 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3496 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3497 lkb->lkb_lvbptr = allocate_lvb(ls);
3498 if (!lkb->lkb_lvbptr)
3500 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3501 sizeof(struct rcom_lock);
3502 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3505 /* Conversions between PR and CW (middle modes) need special handling.
3506 The real granted mode of these converting locks cannot be determined
3507 until all locks have been rebuilt on the rsb (recover_conversion) */
3509 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3510 rl->rl_status = DLM_LKSTS_CONVERT;
3511 lkb->lkb_grmode = DLM_LOCK_IV;
3512 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3518 /* This lkb may have been recovered in a previous aborted recovery so we need
3519 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3520 If so we just send back a standard reply. If not, we create a new lkb with
3521 the given values and send back our lkid. We send back our lkid by sending
3522 back the rcom_lock struct we got but with the remid field filled in. */
3524 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3526 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3528 struct dlm_lkb *lkb;
3531 if (rl->rl_parent_lkid) {
3532 error = -EOPNOTSUPP;
3536 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3542 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3548 error = create_lkb(ls, &lkb);
3552 error = receive_rcom_lock_args(ls, lkb, r, rc);
3559 add_lkb(r, lkb, rl->rl_status);
3563 /* this is the new value returned to the lock holder for
3564 saving in its process-copy lkb */
3565 rl->rl_remid = lkb->lkb_id;
3572 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3573 rl->rl_result = error;
3577 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3579 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3581 struct dlm_lkb *lkb;
3584 error = find_lkb(ls, rl->rl_lkid, &lkb);
3586 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3590 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3592 error = rl->rl_result;
3594 r = lkb->lkb_resource;
3600 /* There's a chance the new master received our lock before
3601 dlm_recover_master_reply(), this wouldn't happen if we did
3602 a barrier between recover_masters and recover_locks. */
3603 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3604 (unsigned long)r, r->res_name);
3605 dlm_send_rcom_lock(r, lkb);
3608 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3611 lkb->lkb_remid = rl->rl_remid;
3614 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3615 error, lkb->lkb_id);
3618 /* an ack for dlm_recover_locks() which waits for replies from
3619 all the locks it sends to new masters */
3620 dlm_recovered_lock(r);
3629 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3630 int mode, uint32_t flags, void *name, unsigned int namelen,
3631 uint32_t parent_lkid)
3633 struct dlm_lkb *lkb;
3634 struct dlm_args args;
3639 error = create_lkb(ls, &lkb);
3645 if (flags & DLM_LKF_VALBLK) {
3646 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3647 if (!ua->lksb.sb_lvbptr) {
3655 /* After ua is attached to lkb it will be freed by free_lkb().
3656 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3657 lock and that lkb_astparam is the dlm_user_args structure. */
3659 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3660 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3661 lkb->lkb_flags |= DLM_IFL_USER;
3662 ua->old_mode = DLM_LOCK_IV;
3669 error = request_lock(ls, lkb, name, namelen, &args);
3685 /* add this new lkb to the per-process list of locks */
3686 spin_lock(&ua->proc->locks_spin);
3687 kref_get(&lkb->lkb_ref);
3688 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689 spin_unlock(&ua->proc->locks_spin);
3691 unlock_recovery(ls);
3695 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3696 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3698 struct dlm_lkb *lkb;
3699 struct dlm_args args;
3700 struct dlm_user_args *ua;
3705 error = find_lkb(ls, lkid, &lkb);
3709 /* user can change the params on its lock when it converts it, or
3710 add an lvb that didn't exist before */
3712 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3714 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3715 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3716 if (!ua->lksb.sb_lvbptr) {
3721 if (lvb_in && ua->lksb.sb_lvbptr)
3722 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3724 ua->castparam = ua_tmp->castparam;
3725 ua->castaddr = ua_tmp->castaddr;
3726 ua->bastparam = ua_tmp->bastparam;
3727 ua->bastaddr = ua_tmp->bastaddr;
3728 ua->user_lksb = ua_tmp->user_lksb;
3729 ua->old_mode = lkb->lkb_grmode;
3731 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
3732 ua, DLM_FAKE_USER_AST, &args);
3736 error = convert_lock(ls, lkb, &args);
3738 if (error == -EINPROGRESS || error == -EAGAIN)
3743 unlock_recovery(ls);
3748 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3749 uint32_t flags, uint32_t lkid, char *lvb_in)
3751 struct dlm_lkb *lkb;
3752 struct dlm_args args;
3753 struct dlm_user_args *ua;
3758 error = find_lkb(ls, lkid, &lkb);
3762 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3764 if (lvb_in && ua->lksb.sb_lvbptr)
3765 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3766 ua->castparam = ua_tmp->castparam;
3767 ua->user_lksb = ua_tmp->user_lksb;
3769 error = set_unlock_args(flags, ua, &args);
3773 error = unlock_lock(ls, lkb, &args);
3775 if (error == -DLM_EUNLOCK)
3780 spin_lock(&ua->proc->locks_spin);
3781 /* dlm_user_add_ast() may have already taken lkb off the proc list */
3782 if (!list_empty(&lkb->lkb_ownqueue))
3783 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3784 spin_unlock(&ua->proc->locks_spin);
3788 unlock_recovery(ls);
3792 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3793 uint32_t flags, uint32_t lkid)
3795 struct dlm_lkb *lkb;
3796 struct dlm_args args;
3797 struct dlm_user_args *ua;
3802 error = find_lkb(ls, lkid, &lkb);
3806 ua = (struct dlm_user_args *)lkb->lkb_astparam;
3807 ua->castparam = ua_tmp->castparam;
3808 ua->user_lksb = ua_tmp->user_lksb;
3810 error = set_unlock_args(flags, ua, &args);
3814 error = cancel_lock(ls, lkb, &args);
3816 if (error == -DLM_ECANCEL)
3821 /* this lkb was removed from the WAITING queue */
3822 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823 spin_lock(&ua->proc->locks_spin);
3824 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3825 spin_unlock(&ua->proc->locks_spin);
3830 unlock_recovery(ls);
3834 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3836 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3838 if (ua->lksb.sb_lvbptr)
3839 kfree(ua->lksb.sb_lvbptr);
3841 lkb->lkb_astparam = (long)NULL;
3843 /* TODO: propogate to master if needed */
3847 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3848 Regardless of what rsb queue the lock is on, it's removed and freed. */
3850 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3852 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3853 struct dlm_args args;
3856 /* FIXME: we need to handle the case where the lkb is in limbo
3857 while the rsb is being looked up, currently we assert in
3858 _unlock_lock/is_remote because rsb nodeid is -1. */
3860 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3862 error = unlock_lock(ls, lkb, &args);
3863 if (error == -DLM_EUNLOCK)
3868 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870 which we clear here. */
3872 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3873 list, and no more device_writes should add lkb's to proc->locks list; so we
3874 shouldn't need to take asts_spin or locks_spin here. this assumes that
3875 device reads/writes/closes are serialized -- FIXME: we may need to serialize
3878 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3880 struct dlm_lkb *lkb, *safe;
3883 mutex_lock(&ls->ls_clear_proc_locks);
3885 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3886 list_del_init(&lkb->lkb_ownqueue);
3888 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3889 lkb->lkb_flags |= DLM_IFL_ORPHAN;
3890 orphan_proc_lock(ls, lkb);
3892 lkb->lkb_flags |= DLM_IFL_DEAD;
3893 unlock_proc_lock(ls, lkb);
3896 /* this removes the reference for the proc->locks list
3897 added by dlm_user_request, it may result in the lkb
3903 /* in-progress unlocks */
3904 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905 list_del_init(&lkb->lkb_ownqueue);
3906 lkb->lkb_flags |= DLM_IFL_DEAD;
3910 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
3911 list_del(&lkb->lkb_astqueue);
3915 mutex_unlock(&ls->ls_clear_proc_locks);
3916 unlock_recovery(ls);