dlm: remove deadlock debug print
[linux-2.6-block.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
7fe2b319 4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
5a0e3ad6 59#include <linux/slab.h>
e7fd4179 60#include "dlm_internal.h"
597d0cae 61#include <linux/dlm_device.h>
e7fd4179
DT
62#include "memory.h"
63#include "lowcomms.h"
64#include "requestqueue.h"
65#include "util.h"
66#include "dir.h"
67#include "member.h"
68#include "lockspace.h"
69#include "ast.h"
70#include "lock.h"
71#include "rcom.h"
72#include "recover.h"
73#include "lvb_table.h"
597d0cae 74#include "user.h"
e7fd4179
DT
75#include "config.h"
76
77static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84static int send_remove(struct dlm_rsb *r);
85static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
3ae1acf9 86static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
e7fd4179
DT
87static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88 struct dlm_message *ms);
89static int receive_extralen(struct dlm_message *ms);
8499137d 90static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
3ae1acf9 91static void del_timeout(struct dlm_lkb *lkb);
e7fd4179
DT
92
93/*
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99 */
100
101static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111};
112
113/*
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
120 */
121
122const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132};
e7fd4179
DT
133
134#define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137int dlm_modes_compat(int mode1, int mode2)
138{
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140}
141
142/*
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
146 */
147
148static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158};
159
597d0cae 160void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
161{
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
8304d6f2 163 " status %d rqmode %d grmode %d wait_type %d\n",
e7fd4179
DT
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
8304d6f2 166 lkb->lkb_grmode, lkb->lkb_wait_type);
e7fd4179
DT
167}
168
170e19ab 169static void dlm_print_rsb(struct dlm_rsb *r)
e7fd4179
DT
170{
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
174}
175
a345da3e
DT
176void dlm_dump_rsb(struct dlm_rsb *r)
177{
178 struct dlm_lkb *lkb;
179
180 dlm_print_rsb(r);
181
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 dlm_print_lkb(lkb);
196}
197
e7fd4179
DT
198/* Threads cannot use the lockspace while it's being recovered */
199
85e86edf 200static inline void dlm_lock_recovery(struct dlm_ls *ls)
e7fd4179
DT
201{
202 down_read(&ls->ls_in_recovery);
203}
204
85e86edf 205void dlm_unlock_recovery(struct dlm_ls *ls)
e7fd4179
DT
206{
207 up_read(&ls->ls_in_recovery);
208}
209
85e86edf 210int dlm_lock_recovery_try(struct dlm_ls *ls)
e7fd4179
DT
211{
212 return down_read_trylock(&ls->ls_in_recovery);
213}
214
215static inline int can_be_queued(struct dlm_lkb *lkb)
216{
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218}
219
220static inline int force_blocking_asts(struct dlm_lkb *lkb)
221{
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223}
224
225static inline int is_demoted(struct dlm_lkb *lkb)
226{
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228}
229
7d3c1feb
DT
230static inline int is_altmode(struct dlm_lkb *lkb)
231{
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233}
234
235static inline int is_granted(struct dlm_lkb *lkb)
236{
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238}
239
e7fd4179
DT
240static inline int is_remote(struct dlm_rsb *r)
241{
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
244}
245
246static inline int is_process_copy(struct dlm_lkb *lkb)
247{
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249}
250
251static inline int is_master_copy(struct dlm_lkb *lkb)
252{
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
256}
257
258static inline int middle_conversion(struct dlm_lkb *lkb)
259{
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
262 return 1;
263 return 0;
e7fd4179
DT
264}
265
266static inline int down_conversion(struct dlm_lkb *lkb)
267{
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269}
270
ef0c2bb0
DT
271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272{
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274}
275
276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277{
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279}
280
281static inline int is_overlap(struct dlm_lkb *lkb)
282{
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
285}
286
e7fd4179
DT
287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288{
289 if (is_master_copy(lkb))
290 return;
291
3ae1acf9
DT
292 del_timeout(lkb);
293
e7fd4179
DT
294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
3ae1acf9
DT
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
8b4021fa
DT
303 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305 rv = -EDEADLK;
306 }
307
8304d6f2 308 dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
e7fd4179
DT
309}
310
ef0c2bb0
DT
311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312{
313 queue_cast(r, lkb,
314 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315}
316
e7fd4179
DT
317static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318{
b6fa8796 319 if (is_master_copy(lkb)) {
e7fd4179 320 send_bast(r, lkb, rqmode);
b6fa8796 321 } else {
8304d6f2 322 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
b6fa8796 323 }
e7fd4179
DT
324}
325
326/*
327 * Basic operations on rsb's and lkb's
328 */
329
3881ac04
DT
330static int pre_rsb_struct(struct dlm_ls *ls)
331{
332 struct dlm_rsb *r1, *r2;
333 int count = 0;
334
335 spin_lock(&ls->ls_new_rsb_spin);
336 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
337 spin_unlock(&ls->ls_new_rsb_spin);
338 return 0;
339 }
340 spin_unlock(&ls->ls_new_rsb_spin);
341
342 r1 = dlm_allocate_rsb(ls);
343 r2 = dlm_allocate_rsb(ls);
344
345 spin_lock(&ls->ls_new_rsb_spin);
346 if (r1) {
347 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
348 ls->ls_new_rsb_count++;
349 }
350 if (r2) {
351 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
352 ls->ls_new_rsb_count++;
353 }
354 count = ls->ls_new_rsb_count;
355 spin_unlock(&ls->ls_new_rsb_spin);
356
357 if (!count)
358 return -ENOMEM;
359 return 0;
360}
361
362/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
363 unlock any spinlocks, go back and call pre_rsb_struct again.
364 Otherwise, take an rsb off the list and return it. */
365
366static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
367 struct dlm_rsb **r_ret)
e7fd4179
DT
368{
369 struct dlm_rsb *r;
3881ac04
DT
370 int count;
371
372 spin_lock(&ls->ls_new_rsb_spin);
373 if (list_empty(&ls->ls_new_rsb)) {
374 count = ls->ls_new_rsb_count;
375 spin_unlock(&ls->ls_new_rsb_spin);
376 log_debug(ls, "find_rsb retry %d %d %s",
377 count, dlm_config.ci_new_rsb_count, name);
378 return -EAGAIN;
379 }
e7fd4179 380
3881ac04
DT
381 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382 list_del(&r->res_hashchain);
383 ls->ls_new_rsb_count--;
384 spin_unlock(&ls->ls_new_rsb_spin);
e7fd4179
DT
385
386 r->res_ls = ls;
387 r->res_length = len;
388 memcpy(r->res_name, name, len);
90135925 389 mutex_init(&r->res_mutex);
e7fd4179 390
3881ac04 391 INIT_LIST_HEAD(&r->res_hashchain);
e7fd4179
DT
392 INIT_LIST_HEAD(&r->res_lookup);
393 INIT_LIST_HEAD(&r->res_grantqueue);
394 INIT_LIST_HEAD(&r->res_convertqueue);
395 INIT_LIST_HEAD(&r->res_waitqueue);
396 INIT_LIST_HEAD(&r->res_root_list);
397 INIT_LIST_HEAD(&r->res_recover_list);
398
3881ac04
DT
399 *r_ret = r;
400 return 0;
e7fd4179
DT
401}
402
403static int search_rsb_list(struct list_head *head, char *name, int len,
404 unsigned int flags, struct dlm_rsb **r_ret)
405{
406 struct dlm_rsb *r;
407 int error = 0;
408
409 list_for_each_entry(r, head, res_hashchain) {
410 if (len == r->res_length && !memcmp(name, r->res_name, len))
411 goto found;
412 }
18c60c0a 413 *r_ret = NULL;
597d0cae 414 return -EBADR;
e7fd4179
DT
415
416 found:
417 if (r->res_nodeid && (flags & R_MASTER))
418 error = -ENOTBLK;
419 *r_ret = r;
420 return error;
421}
422
423static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
424 unsigned int flags, struct dlm_rsb **r_ret)
425{
426 struct dlm_rsb *r;
427 int error;
428
429 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
430 if (!error) {
431 kref_get(&r->res_ref);
432 goto out;
433 }
434 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
435 if (error)
436 goto out;
437
438 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
439
440 if (dlm_no_directory(ls))
441 goto out;
442
443 if (r->res_nodeid == -1) {
444 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
445 r->res_first_lkid = 0;
446 } else if (r->res_nodeid > 0) {
447 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
448 r->res_first_lkid = 0;
449 } else {
450 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
451 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
452 }
453 out:
454 *r_ret = r;
455 return error;
456}
457
e7fd4179
DT
458/*
459 * Find rsb in rsbtbl and potentially create/add one
460 *
461 * Delaying the release of rsb's has a similar benefit to applications keeping
462 * NL locks on an rsb, but without the guarantee that the cached master value
463 * will still be valid when the rsb is reused. Apps aren't always smart enough
464 * to keep NL locks on an rsb that they may lock again shortly; this can lead
465 * to excessive master lookups and removals if we don't delay the release.
466 *
467 * Searching for an rsb means looking through both the normal list and toss
468 * list. When found on the toss list the rsb is moved to the normal list with
469 * ref count of 1; when found on normal list the ref count is incremented.
470 */
471
472static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
473 unsigned int flags, struct dlm_rsb **r_ret)
474{
3881ac04 475 struct dlm_rsb *r = NULL;
e7fd4179 476 uint32_t hash, bucket;
3881ac04 477 int error;
ef58bcca 478
3881ac04
DT
479 if (namelen > DLM_RESNAME_MAXLEN) {
480 error = -EINVAL;
ef58bcca 481 goto out;
3881ac04 482 }
e7fd4179
DT
483
484 if (dlm_no_directory(ls))
485 flags |= R_CREATE;
486
487 hash = jhash(name, namelen, 0);
488 bucket = hash & (ls->ls_rsbtbl_size - 1);
489
3881ac04
DT
490 retry:
491 if (flags & R_CREATE) {
492 error = pre_rsb_struct(ls);
493 if (error < 0)
494 goto out;
495 }
496
497 spin_lock(&ls->ls_rsbtbl[bucket].lock);
498
499 error = _search_rsb(ls, name, namelen, bucket, flags, &r);
e7fd4179 500 if (!error)
3881ac04 501 goto out_unlock;
e7fd4179 502
597d0cae 503 if (error == -EBADR && !(flags & R_CREATE))
3881ac04 504 goto out_unlock;
e7fd4179
DT
505
506 /* the rsb was found but wasn't a master copy */
507 if (error == -ENOTBLK)
3881ac04 508 goto out_unlock;
e7fd4179 509
3881ac04
DT
510 error = get_rsb_struct(ls, name, namelen, &r);
511 if (error == -EAGAIN) {
512 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
513 goto retry;
514 }
515 if (error)
516 goto out_unlock;
e7fd4179
DT
517
518 r->res_hash = hash;
519 r->res_bucket = bucket;
520 r->res_nodeid = -1;
521 kref_init(&r->res_ref);
522
523 /* With no directory, the master can be set immediately */
524 if (dlm_no_directory(ls)) {
525 int nodeid = dlm_dir_nodeid(r);
526 if (nodeid == dlm_our_nodeid())
527 nodeid = 0;
528 r->res_nodeid = nodeid;
529 }
e7fd4179 530 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
e7fd4179 531 error = 0;
3881ac04
DT
532 out_unlock:
533 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
534 out:
535 *r_ret = r;
536 return error;
537}
538
e7fd4179
DT
539/* This is only called to add a reference when the code already holds
540 a valid reference to the rsb, so there's no need for locking. */
541
542static inline void hold_rsb(struct dlm_rsb *r)
543{
544 kref_get(&r->res_ref);
545}
546
547void dlm_hold_rsb(struct dlm_rsb *r)
548{
549 hold_rsb(r);
550}
551
552static void toss_rsb(struct kref *kref)
553{
554 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555 struct dlm_ls *ls = r->res_ls;
556
557 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
558 kref_init(&r->res_ref);
559 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
560 r->res_toss_time = jiffies;
561 if (r->res_lvbptr) {
52bda2b5 562 dlm_free_lvb(r->res_lvbptr);
e7fd4179
DT
563 r->res_lvbptr = NULL;
564 }
565}
566
25985edc 567/* When all references to the rsb are gone it's transferred to
e7fd4179
DT
568 the tossed list for later disposal. */
569
570static void put_rsb(struct dlm_rsb *r)
571{
572 struct dlm_ls *ls = r->res_ls;
573 uint32_t bucket = r->res_bucket;
574
c7be761a 575 spin_lock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179 576 kref_put(&r->res_ref, toss_rsb);
c7be761a 577 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
578}
579
580void dlm_put_rsb(struct dlm_rsb *r)
581{
582 put_rsb(r);
583}
584
585/* See comment for unhold_lkb */
586
587static void unhold_rsb(struct dlm_rsb *r)
588{
589 int rv;
590 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 591 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
592}
593
594static void kill_rsb(struct kref *kref)
595{
596 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
597
598 /* All work is done after the return from kref_put() so we
599 can release the write_lock before the remove and free. */
600
a345da3e
DT
601 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
602 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
603 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
604 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
605 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
606 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
607}
608
609/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
610 The rsb must exist as long as any lkb's for it do. */
611
612static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
613{
614 hold_rsb(r);
615 lkb->lkb_resource = r;
616}
617
618static void detach_lkb(struct dlm_lkb *lkb)
619{
620 if (lkb->lkb_resource) {
621 put_rsb(lkb->lkb_resource);
622 lkb->lkb_resource = NULL;
623 }
624}
625
626static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
627{
3d6aa675
DT
628 struct dlm_lkb *lkb;
629 int rv, id;
e7fd4179 630
52bda2b5 631 lkb = dlm_allocate_lkb(ls);
e7fd4179
DT
632 if (!lkb)
633 return -ENOMEM;
634
635 lkb->lkb_nodeid = -1;
636 lkb->lkb_grmode = DLM_LOCK_IV;
637 kref_init(&lkb->lkb_ref);
34e22bed 638 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 639 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
3ae1acf9 640 INIT_LIST_HEAD(&lkb->lkb_time_list);
8304d6f2 641 INIT_LIST_HEAD(&lkb->lkb_astqueue);
e7fd4179 642
3d6aa675
DT
643 retry:
644 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
645 if (!rv)
646 return -ENOMEM;
e7fd4179 647
3d6aa675
DT
648 spin_lock(&ls->ls_lkbidr_spin);
649 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
650 if (!rv)
651 lkb->lkb_id = id;
652 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179 653
3d6aa675
DT
654 if (rv == -EAGAIN)
655 goto retry;
e7fd4179 656
3d6aa675
DT
657 if (rv < 0) {
658 log_error(ls, "create_lkb idr error %d", rv);
659 return rv;
e7fd4179
DT
660 }
661
e7fd4179
DT
662 *lkb_ret = lkb;
663 return 0;
664}
665
e7fd4179
DT
666static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
667{
668 struct dlm_lkb *lkb;
e7fd4179 669
3d6aa675
DT
670 spin_lock(&ls->ls_lkbidr_spin);
671 lkb = idr_find(&ls->ls_lkbidr, lkid);
e7fd4179
DT
672 if (lkb)
673 kref_get(&lkb->lkb_ref);
3d6aa675 674 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
675
676 *lkb_ret = lkb;
677 return lkb ? 0 : -ENOENT;
678}
679
680static void kill_lkb(struct kref *kref)
681{
682 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
683
684 /* All work is done after the return from kref_put() so we
685 can release the write_lock before the detach_lkb */
686
687 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
688}
689
b3f58d8f
DT
690/* __put_lkb() is used when an lkb may not have an rsb attached to
691 it so we need to provide the lockspace explicitly */
692
693static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 694{
3d6aa675 695 uint32_t lkid = lkb->lkb_id;
e7fd4179 696
3d6aa675 697 spin_lock(&ls->ls_lkbidr_spin);
e7fd4179 698 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
3d6aa675
DT
699 idr_remove(&ls->ls_lkbidr, lkid);
700 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
701
702 detach_lkb(lkb);
703
704 /* for local/process lkbs, lvbptr points to caller's lksb */
705 if (lkb->lkb_lvbptr && is_master_copy(lkb))
52bda2b5
DT
706 dlm_free_lvb(lkb->lkb_lvbptr);
707 dlm_free_lkb(lkb);
e7fd4179
DT
708 return 1;
709 } else {
3d6aa675 710 spin_unlock(&ls->ls_lkbidr_spin);
e7fd4179
DT
711 return 0;
712 }
713}
714
715int dlm_put_lkb(struct dlm_lkb *lkb)
716{
b3f58d8f
DT
717 struct dlm_ls *ls;
718
719 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
720 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
721
722 ls = lkb->lkb_resource->res_ls;
723 return __put_lkb(ls, lkb);
e7fd4179
DT
724}
725
726/* This is only called to add a reference when the code already holds
727 a valid reference to the lkb, so there's no need for locking. */
728
729static inline void hold_lkb(struct dlm_lkb *lkb)
730{
731 kref_get(&lkb->lkb_ref);
732}
733
734/* This is called when we need to remove a reference and are certain
735 it's not the last ref. e.g. del_lkb is always called between a
736 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
737 put_lkb would work fine, but would involve unnecessary locking */
738
739static inline void unhold_lkb(struct dlm_lkb *lkb)
740{
741 int rv;
742 rv = kref_put(&lkb->lkb_ref, kill_lkb);
743 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
744}
745
746static void lkb_add_ordered(struct list_head *new, struct list_head *head,
747 int mode)
748{
749 struct dlm_lkb *lkb = NULL;
750
751 list_for_each_entry(lkb, head, lkb_statequeue)
752 if (lkb->lkb_rqmode < mode)
753 break;
754
99fb19d4 755 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
e7fd4179
DT
756}
757
758/* add/remove lkb to rsb's grant/convert/wait queue */
759
760static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
761{
762 kref_get(&lkb->lkb_ref);
763
764 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
765
eeda418d
DT
766 lkb->lkb_timestamp = ktime_get();
767
e7fd4179
DT
768 lkb->lkb_status = status;
769
770 switch (status) {
771 case DLM_LKSTS_WAITING:
772 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
773 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
774 else
775 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
776 break;
777 case DLM_LKSTS_GRANTED:
778 /* convention says granted locks kept in order of grmode */
779 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
780 lkb->lkb_grmode);
781 break;
782 case DLM_LKSTS_CONVERT:
783 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
784 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
785 else
786 list_add_tail(&lkb->lkb_statequeue,
787 &r->res_convertqueue);
788 break;
789 default:
790 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
791 }
792}
793
794static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
795{
796 lkb->lkb_status = 0;
797 list_del(&lkb->lkb_statequeue);
798 unhold_lkb(lkb);
799}
800
801static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
802{
803 hold_lkb(lkb);
804 del_lkb(r, lkb);
805 add_lkb(r, lkb, sts);
806 unhold_lkb(lkb);
807}
808
ef0c2bb0
DT
809static int msg_reply_type(int mstype)
810{
811 switch (mstype) {
812 case DLM_MSG_REQUEST:
813 return DLM_MSG_REQUEST_REPLY;
814 case DLM_MSG_CONVERT:
815 return DLM_MSG_CONVERT_REPLY;
816 case DLM_MSG_UNLOCK:
817 return DLM_MSG_UNLOCK_REPLY;
818 case DLM_MSG_CANCEL:
819 return DLM_MSG_CANCEL_REPLY;
820 case DLM_MSG_LOOKUP:
821 return DLM_MSG_LOOKUP_REPLY;
822 }
823 return -1;
824}
825
c6ff669b
DT
826static int nodeid_warned(int nodeid, int num_nodes, int *warned)
827{
828 int i;
829
830 for (i = 0; i < num_nodes; i++) {
831 if (!warned[i]) {
832 warned[i] = nodeid;
833 return 0;
834 }
835 if (warned[i] == nodeid)
836 return 1;
837 }
838 return 0;
839}
840
841void dlm_scan_waiters(struct dlm_ls *ls)
842{
843 struct dlm_lkb *lkb;
844 ktime_t zero = ktime_set(0, 0);
845 s64 us;
846 s64 debug_maxus = 0;
847 u32 debug_scanned = 0;
848 u32 debug_expired = 0;
849 int num_nodes = 0;
850 int *warned = NULL;
851
852 if (!dlm_config.ci_waitwarn_us)
853 return;
854
855 mutex_lock(&ls->ls_waiters_mutex);
856
857 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
858 if (ktime_equal(lkb->lkb_wait_time, zero))
859 continue;
860
861 debug_scanned++;
862
863 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
864
865 if (us < dlm_config.ci_waitwarn_us)
866 continue;
867
868 lkb->lkb_wait_time = zero;
869
870 debug_expired++;
871 if (us > debug_maxus)
872 debug_maxus = us;
873
874 if (!num_nodes) {
875 num_nodes = ls->ls_num_nodes;
5d70828a 876 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
c6ff669b
DT
877 }
878 if (!warned)
879 continue;
880 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
881 continue;
882
883 log_error(ls, "waitwarn %x %lld %d us check connection to "
884 "node %d", lkb->lkb_id, (long long)us,
885 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
886 }
887 mutex_unlock(&ls->ls_waiters_mutex);
5d70828a 888 kfree(warned);
c6ff669b
DT
889
890 if (debug_expired)
891 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
892 debug_scanned, debug_expired,
893 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
894}
895
e7fd4179
DT
896/* add/remove lkb from global waiters list of lkb's waiting for
897 a reply from a remote node */
898
c6ff669b 899static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
e7fd4179
DT
900{
901 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 902 int error = 0;
e7fd4179 903
90135925 904 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
905
906 if (is_overlap_unlock(lkb) ||
907 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
908 error = -EINVAL;
909 goto out;
910 }
911
912 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
913 switch (mstype) {
914 case DLM_MSG_UNLOCK:
915 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
916 break;
917 case DLM_MSG_CANCEL:
918 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
919 break;
920 default:
921 error = -EBUSY;
922 goto out;
923 }
924 lkb->lkb_wait_count++;
925 hold_lkb(lkb);
926
43279e53 927 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
ef0c2bb0
DT
928 lkb->lkb_id, lkb->lkb_wait_type, mstype,
929 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
930 goto out;
931 }
ef0c2bb0
DT
932
933 DLM_ASSERT(!lkb->lkb_wait_count,
934 dlm_print_lkb(lkb);
935 printk("wait_count %d\n", lkb->lkb_wait_count););
936
937 lkb->lkb_wait_count++;
e7fd4179 938 lkb->lkb_wait_type = mstype;
c6ff669b
DT
939 lkb->lkb_wait_time = ktime_get();
940 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
ef0c2bb0 941 hold_lkb(lkb);
e7fd4179
DT
942 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
943 out:
ef0c2bb0 944 if (error)
43279e53 945 log_error(ls, "addwait error %x %d flags %x %d %d %s",
ef0c2bb0
DT
946 lkb->lkb_id, error, lkb->lkb_flags, mstype,
947 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 948 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 949 return error;
e7fd4179
DT
950}
951
b790c3b7
DT
952/* We clear the RESEND flag because we might be taking an lkb off the waiters
953 list as part of process_requestqueue (e.g. a lookup that has an optimized
954 request reply on the requestqueue) between dlm_recover_waiters_pre() which
955 set RESEND and dlm_recover_waiters_post() */
956
43279e53
DT
957static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
958 struct dlm_message *ms)
e7fd4179 959{
ef0c2bb0
DT
960 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
961 int overlap_done = 0;
e7fd4179 962
ef0c2bb0 963 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
43279e53 964 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
965 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
966 overlap_done = 1;
967 goto out_del;
e7fd4179 968 }
ef0c2bb0
DT
969
970 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
43279e53 971 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
972 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
973 overlap_done = 1;
974 goto out_del;
975 }
976
43279e53
DT
977 /* Cancel state was preemptively cleared by a successful convert,
978 see next comment, nothing to do. */
979
980 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
981 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
982 log_debug(ls, "remwait %x cancel_reply wait_type %d",
983 lkb->lkb_id, lkb->lkb_wait_type);
984 return -1;
985 }
986
987 /* Remove for the convert reply, and premptively remove for the
988 cancel reply. A convert has been granted while there's still
989 an outstanding cancel on it (the cancel is moot and the result
990 in the cancel reply should be 0). We preempt the cancel reply
991 because the app gets the convert result and then can follow up
992 with another op, like convert. This subsequent op would see the
993 lingering state of the cancel and fail with -EBUSY. */
994
995 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
996 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
997 is_overlap_cancel(lkb) && ms && !ms->m_result) {
998 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
999 lkb->lkb_id);
1000 lkb->lkb_wait_type = 0;
1001 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1002 lkb->lkb_wait_count--;
1003 goto out_del;
1004 }
1005
ef0c2bb0
DT
1006 /* N.B. type of reply may not always correspond to type of original
1007 msg due to lookup->request optimization, verify others? */
1008
1009 if (lkb->lkb_wait_type) {
1010 lkb->lkb_wait_type = 0;
1011 goto out_del;
1012 }
1013
43279e53
DT
1014 log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1015 lkb->lkb_id, mstype, lkb->lkb_flags);
ef0c2bb0
DT
1016 return -1;
1017
1018 out_del:
1019 /* the force-unlock/cancel has completed and we haven't recvd a reply
1020 to the op that was in progress prior to the unlock/cancel; we
1021 give up on any reply to the earlier op. FIXME: not sure when/how
1022 this would happen */
1023
1024 if (overlap_done && lkb->lkb_wait_type) {
43279e53 1025 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
ef0c2bb0
DT
1026 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1027 lkb->lkb_wait_count--;
1028 lkb->lkb_wait_type = 0;
1029 }
1030
1031 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1032
b790c3b7 1033 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
1034 lkb->lkb_wait_count--;
1035 if (!lkb->lkb_wait_count)
1036 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 1037 unhold_lkb(lkb);
ef0c2bb0 1038 return 0;
e7fd4179
DT
1039}
1040
ef0c2bb0 1041static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
1042{
1043 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1044 int error;
1045
90135925 1046 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1047 error = _remove_from_waiters(lkb, mstype, NULL);
90135925 1048 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
1049 return error;
1050}
1051
ef0c2bb0
DT
1052/* Handles situations where we might be processing a "fake" or "stub" reply in
1053 which we can't try to take waiters_mutex again. */
1054
1055static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1056{
1057 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1058 int error;
1059
2a7ce0ed 1060 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0 1061 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1062 error = _remove_from_waiters(lkb, ms->m_type, ms);
2a7ce0ed 1063 if (ms->m_flags != DLM_IFL_STUB_MS)
ef0c2bb0
DT
1064 mutex_unlock(&ls->ls_waiters_mutex);
1065 return error;
1066}
1067
e7fd4179
DT
1068static void dir_remove(struct dlm_rsb *r)
1069{
1070 int to_nodeid;
1071
1072 if (dlm_no_directory(r->res_ls))
1073 return;
1074
1075 to_nodeid = dlm_dir_nodeid(r);
1076 if (to_nodeid != dlm_our_nodeid())
1077 send_remove(r);
1078 else
1079 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1080 r->res_name, r->res_length);
1081}
1082
1083/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1084 found since they are in order of newest to oldest? */
1085
1086static int shrink_bucket(struct dlm_ls *ls, int b)
1087{
1088 struct dlm_rsb *r;
1089 int count = 0, found;
1090
1091 for (;;) {
90135925 1092 found = 0;
c7be761a 1093 spin_lock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1094 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1095 res_hashchain) {
1096 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 1097 dlm_config.ci_toss_secs * HZ))
e7fd4179 1098 continue;
90135925 1099 found = 1;
e7fd4179
DT
1100 break;
1101 }
1102
1103 if (!found) {
c7be761a 1104 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1105 break;
1106 }
1107
1108 if (kref_put(&r->res_ref, kill_rsb)) {
1109 list_del(&r->res_hashchain);
c7be761a 1110 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1111
1112 if (is_master(r))
1113 dir_remove(r);
52bda2b5 1114 dlm_free_rsb(r);
e7fd4179
DT
1115 count++;
1116 } else {
c7be761a 1117 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1118 log_error(ls, "tossed rsb in use %s", r->res_name);
1119 }
1120 }
1121
1122 return count;
1123}
1124
1125void dlm_scan_rsbs(struct dlm_ls *ls)
1126{
1127 int i;
1128
e7fd4179
DT
1129 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1130 shrink_bucket(ls, i);
85e86edf
DT
1131 if (dlm_locking_stopped(ls))
1132 break;
e7fd4179
DT
1133 cond_resched();
1134 }
1135}
1136
3ae1acf9
DT
1137static void add_timeout(struct dlm_lkb *lkb)
1138{
1139 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1140
eeda418d 1141 if (is_master_copy(lkb))
3ae1acf9 1142 return;
3ae1acf9
DT
1143
1144 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1145 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1146 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1147 goto add_it;
1148 }
84d8cd69
DT
1149 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1150 goto add_it;
3ae1acf9
DT
1151 return;
1152
1153 add_it:
1154 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1155 mutex_lock(&ls->ls_timeout_mutex);
1156 hold_lkb(lkb);
3ae1acf9
DT
1157 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1158 mutex_unlock(&ls->ls_timeout_mutex);
1159}
1160
1161static void del_timeout(struct dlm_lkb *lkb)
1162{
1163 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1164
1165 mutex_lock(&ls->ls_timeout_mutex);
1166 if (!list_empty(&lkb->lkb_time_list)) {
1167 list_del_init(&lkb->lkb_time_list);
1168 unhold_lkb(lkb);
1169 }
1170 mutex_unlock(&ls->ls_timeout_mutex);
1171}
1172
1173/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1174 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1175 and then lock rsb because of lock ordering in add_timeout. We may need
1176 to specify some special timeout-related bits in the lkb that are just to
1177 be accessed under the timeout_mutex. */
1178
1179void dlm_scan_timeout(struct dlm_ls *ls)
1180{
1181 struct dlm_rsb *r;
1182 struct dlm_lkb *lkb;
1183 int do_cancel, do_warn;
eeda418d 1184 s64 wait_us;
3ae1acf9
DT
1185
1186 for (;;) {
1187 if (dlm_locking_stopped(ls))
1188 break;
1189
1190 do_cancel = 0;
1191 do_warn = 0;
1192 mutex_lock(&ls->ls_timeout_mutex);
1193 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1194
eeda418d
DT
1195 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1196 lkb->lkb_timestamp));
1197
3ae1acf9 1198 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
eeda418d 1199 wait_us >= (lkb->lkb_timeout_cs * 10000))
3ae1acf9
DT
1200 do_cancel = 1;
1201
1202 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
eeda418d 1203 wait_us >= dlm_config.ci_timewarn_cs * 10000)
3ae1acf9
DT
1204 do_warn = 1;
1205
1206 if (!do_cancel && !do_warn)
1207 continue;
1208 hold_lkb(lkb);
1209 break;
1210 }
1211 mutex_unlock(&ls->ls_timeout_mutex);
1212
1213 if (!do_cancel && !do_warn)
1214 break;
1215
1216 r = lkb->lkb_resource;
1217 hold_rsb(r);
1218 lock_rsb(r);
1219
1220 if (do_warn) {
1221 /* clear flag so we only warn once */
1222 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1223 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1224 del_timeout(lkb);
1225 dlm_timeout_warn(lkb);
1226 }
1227
1228 if (do_cancel) {
b3cab7b9 1229 log_debug(ls, "timeout cancel %x node %d %s",
639aca41 1230 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
3ae1acf9
DT
1231 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1232 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1233 del_timeout(lkb);
1234 _cancel_lock(r, lkb);
1235 }
1236
1237 unlock_rsb(r);
1238 unhold_rsb(r);
1239 dlm_put_lkb(lkb);
1240 }
1241}
1242
1243/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1244 dlm_recoverd before checking/setting ls_recover_begin. */
1245
1246void dlm_adjust_timeouts(struct dlm_ls *ls)
1247{
1248 struct dlm_lkb *lkb;
eeda418d 1249 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
3ae1acf9
DT
1250
1251 ls->ls_recover_begin = 0;
1252 mutex_lock(&ls->ls_timeout_mutex);
1253 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
eeda418d 1254 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
3ae1acf9 1255 mutex_unlock(&ls->ls_timeout_mutex);
c6ff669b
DT
1256
1257 if (!dlm_config.ci_waitwarn_us)
1258 return;
1259
1260 mutex_lock(&ls->ls_waiters_mutex);
1261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1262 if (ktime_to_us(lkb->lkb_wait_time))
1263 lkb->lkb_wait_time = ktime_get();
1264 }
1265 mutex_unlock(&ls->ls_waiters_mutex);
3ae1acf9
DT
1266}
1267
e7fd4179
DT
1268/* lkb is master or local copy */
1269
1270static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1271{
1272 int b, len = r->res_ls->ls_lvblen;
1273
1274 /* b=1 lvb returned to caller
1275 b=0 lvb written to rsb or invalidated
1276 b=-1 do nothing */
1277
1278 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1279
1280 if (b == 1) {
1281 if (!lkb->lkb_lvbptr)
1282 return;
1283
1284 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1285 return;
1286
1287 if (!r->res_lvbptr)
1288 return;
1289
1290 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1291 lkb->lkb_lvbseq = r->res_lvbseq;
1292
1293 } else if (b == 0) {
1294 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1295 rsb_set_flag(r, RSB_VALNOTVALID);
1296 return;
1297 }
1298
1299 if (!lkb->lkb_lvbptr)
1300 return;
1301
1302 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1303 return;
1304
1305 if (!r->res_lvbptr)
52bda2b5 1306 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1307
1308 if (!r->res_lvbptr)
1309 return;
1310
1311 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1312 r->res_lvbseq++;
1313 lkb->lkb_lvbseq = r->res_lvbseq;
1314 rsb_clear_flag(r, RSB_VALNOTVALID);
1315 }
1316
1317 if (rsb_flag(r, RSB_VALNOTVALID))
1318 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1319}
1320
1321static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1322{
1323 if (lkb->lkb_grmode < DLM_LOCK_PW)
1324 return;
1325
1326 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1327 rsb_set_flag(r, RSB_VALNOTVALID);
1328 return;
1329 }
1330
1331 if (!lkb->lkb_lvbptr)
1332 return;
1333
1334 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1335 return;
1336
1337 if (!r->res_lvbptr)
52bda2b5 1338 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1339
1340 if (!r->res_lvbptr)
1341 return;
1342
1343 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1344 r->res_lvbseq++;
1345 rsb_clear_flag(r, RSB_VALNOTVALID);
1346}
1347
1348/* lkb is process copy (pc) */
1349
1350static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1351 struct dlm_message *ms)
1352{
1353 int b;
1354
1355 if (!lkb->lkb_lvbptr)
1356 return;
1357
1358 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1359 return;
1360
597d0cae 1361 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1362 if (b == 1) {
1363 int len = receive_extralen(ms);
a9cc9159
AV
1364 if (len > DLM_RESNAME_MAXLEN)
1365 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
1366 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1367 lkb->lkb_lvbseq = ms->m_lvbseq;
1368 }
1369}
1370
1371/* Manipulate lkb's on rsb's convert/granted/waiting queues
1372 remove_lock -- used for unlock, removes lkb from granted
1373 revert_lock -- used for cancel, moves lkb from convert to granted
1374 grant_lock -- used for request and convert, adds lkb to granted or
1375 moves lkb from convert or waiting to granted
1376
1377 Each of these is used for master or local copy lkb's. There is
1378 also a _pc() variation used to make the corresponding change on
1379 a process copy (pc) lkb. */
1380
1381static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1382{
1383 del_lkb(r, lkb);
1384 lkb->lkb_grmode = DLM_LOCK_IV;
1385 /* this unhold undoes the original ref from create_lkb()
1386 so this leads to the lkb being freed */
1387 unhold_lkb(lkb);
1388}
1389
1390static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1391{
1392 set_lvb_unlock(r, lkb);
1393 _remove_lock(r, lkb);
1394}
1395
1396static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1397{
1398 _remove_lock(r, lkb);
1399}
1400
ef0c2bb0
DT
1401/* returns: 0 did nothing
1402 1 moved lock to granted
1403 -1 removed lock */
1404
1405static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1406{
ef0c2bb0
DT
1407 int rv = 0;
1408
e7fd4179
DT
1409 lkb->lkb_rqmode = DLM_LOCK_IV;
1410
1411 switch (lkb->lkb_status) {
597d0cae
DT
1412 case DLM_LKSTS_GRANTED:
1413 break;
e7fd4179
DT
1414 case DLM_LKSTS_CONVERT:
1415 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1416 rv = 1;
e7fd4179
DT
1417 break;
1418 case DLM_LKSTS_WAITING:
1419 del_lkb(r, lkb);
1420 lkb->lkb_grmode = DLM_LOCK_IV;
1421 /* this unhold undoes the original ref from create_lkb()
1422 so this leads to the lkb being freed */
1423 unhold_lkb(lkb);
ef0c2bb0 1424 rv = -1;
e7fd4179
DT
1425 break;
1426 default:
1427 log_print("invalid status for revert %d", lkb->lkb_status);
1428 }
ef0c2bb0 1429 return rv;
e7fd4179
DT
1430}
1431
ef0c2bb0 1432static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1433{
ef0c2bb0 1434 return revert_lock(r, lkb);
e7fd4179
DT
1435}
1436
1437static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1438{
1439 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1440 lkb->lkb_grmode = lkb->lkb_rqmode;
1441 if (lkb->lkb_status)
1442 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1443 else
1444 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1445 }
1446
1447 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1448}
1449
1450static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1451{
1452 set_lvb_lock(r, lkb);
1453 _grant_lock(r, lkb);
1454 lkb->lkb_highbast = 0;
1455}
1456
1457static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1458 struct dlm_message *ms)
1459{
1460 set_lvb_lock_pc(r, lkb, ms);
1461 _grant_lock(r, lkb);
1462}
1463
1464/* called by grant_pending_locks() which means an async grant message must
1465 be sent to the requesting node in addition to granting the lock if the
1466 lkb belongs to a remote node. */
1467
1468static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1469{
1470 grant_lock(r, lkb);
1471 if (is_master_copy(lkb))
1472 send_grant(r, lkb);
1473 else
1474 queue_cast(r, lkb, 0);
1475}
1476
7d3c1feb
DT
1477/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1478 change the granted/requested modes. We're munging things accordingly in
1479 the process copy.
1480 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1481 conversion deadlock
1482 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1483 compatible with other granted locks */
1484
2a7ce0ed 1485static void munge_demoted(struct dlm_lkb *lkb)
7d3c1feb 1486{
7d3c1feb
DT
1487 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1488 log_print("munge_demoted %x invalid modes gr %d rq %d",
1489 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1490 return;
1491 }
1492
1493 lkb->lkb_grmode = DLM_LOCK_NL;
1494}
1495
1496static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1497{
1498 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1499 ms->m_type != DLM_MSG_GRANT) {
1500 log_print("munge_altmode %x invalid reply type %d",
1501 lkb->lkb_id, ms->m_type);
1502 return;
1503 }
1504
1505 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1506 lkb->lkb_rqmode = DLM_LOCK_PR;
1507 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1508 lkb->lkb_rqmode = DLM_LOCK_CW;
1509 else {
1510 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1511 dlm_print_lkb(lkb);
1512 }
1513}
1514
e7fd4179
DT
1515static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1516{
1517 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1518 lkb_statequeue);
1519 if (lkb->lkb_id == first->lkb_id)
90135925 1520 return 1;
e7fd4179 1521
90135925 1522 return 0;
e7fd4179
DT
1523}
1524
e7fd4179
DT
1525/* Check if the given lkb conflicts with another lkb on the queue. */
1526
1527static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1528{
1529 struct dlm_lkb *this;
1530
1531 list_for_each_entry(this, head, lkb_statequeue) {
1532 if (this == lkb)
1533 continue;
3bcd3687 1534 if (!modes_compat(this, lkb))
90135925 1535 return 1;
e7fd4179 1536 }
90135925 1537 return 0;
e7fd4179
DT
1538}
1539
1540/*
1541 * "A conversion deadlock arises with a pair of lock requests in the converting
1542 * queue for one resource. The granted mode of each lock blocks the requested
1543 * mode of the other lock."
1544 *
c85d65e9
DT
1545 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1546 * convert queue from being granted, then deadlk/demote lkb.
e7fd4179
DT
1547 *
1548 * Example:
1549 * Granted Queue: empty
1550 * Convert Queue: NL->EX (first lock)
1551 * PR->EX (second lock)
1552 *
1553 * The first lock can't be granted because of the granted mode of the second
1554 * lock and the second lock can't be granted because it's not first in the
c85d65e9
DT
1555 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1556 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1557 * flag set and return DEMOTED in the lksb flags.
e7fd4179 1558 *
c85d65e9
DT
1559 * Originally, this function detected conv-deadlk in a more limited scope:
1560 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1561 * - if lkb1 was the first entry in the queue (not just earlier), and was
1562 * blocked by the granted mode of lkb2, and there was nothing on the
1563 * granted queue preventing lkb1 from being granted immediately, i.e.
1564 * lkb2 was the only thing preventing lkb1 from being granted.
1565 *
1566 * That second condition meant we'd only say there was conv-deadlk if
1567 * resolving it (by demotion) would lead to the first lock on the convert
1568 * queue being granted right away. It allowed conversion deadlocks to exist
1569 * between locks on the convert queue while they couldn't be granted anyway.
1570 *
1571 * Now, we detect and take action on conversion deadlocks immediately when
1572 * they're created, even if they may not be immediately consequential. If
1573 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1574 * mode that would prevent lkb1's conversion from being granted, we do a
1575 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1576 * I think this means that the lkb_is_ahead condition below should always
1577 * be zero, i.e. there will never be conv-deadlk between two locks that are
1578 * both already on the convert queue.
e7fd4179
DT
1579 */
1580
c85d65e9 1581static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
e7fd4179 1582{
c85d65e9
DT
1583 struct dlm_lkb *lkb1;
1584 int lkb_is_ahead = 0;
e7fd4179 1585
c85d65e9
DT
1586 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1587 if (lkb1 == lkb2) {
1588 lkb_is_ahead = 1;
e7fd4179
DT
1589 continue;
1590 }
1591
c85d65e9
DT
1592 if (!lkb_is_ahead) {
1593 if (!modes_compat(lkb2, lkb1))
1594 return 1;
1595 } else {
1596 if (!modes_compat(lkb2, lkb1) &&
1597 !modes_compat(lkb1, lkb2))
1598 return 1;
1599 }
e7fd4179 1600 }
90135925 1601 return 0;
e7fd4179
DT
1602}
1603
1604/*
1605 * Return 1 if the lock can be granted, 0 otherwise.
1606 * Also detect and resolve conversion deadlocks.
1607 *
1608 * lkb is the lock to be granted
1609 *
1610 * now is 1 if the function is being called in the context of the
1611 * immediate request, it is 0 if called later, after the lock has been
1612 * queued.
1613 *
1614 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1615 */
1616
1617static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1618{
1619 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1620
1621 /*
1622 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1623 * a new request for a NL mode lock being blocked.
1624 *
1625 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1626 * request, then it would be granted. In essence, the use of this flag
1627 * tells the Lock Manager to expedite theis request by not considering
1628 * what may be in the CONVERTING or WAITING queues... As of this
1629 * writing, the EXPEDITE flag can be used only with new requests for NL
1630 * mode locks. This flag is not valid for conversion requests.
1631 *
1632 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1633 * conversion or used with a non-NL requested mode. We also know an
1634 * EXPEDITE request is always granted immediately, so now must always
1635 * be 1. The full condition to grant an expedite request: (now &&
1636 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1637 * therefore be shortened to just checking the flag.
1638 */
1639
1640 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1641 return 1;
e7fd4179
DT
1642
1643 /*
1644 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1645 * added to the remaining conditions.
1646 */
1647
1648 if (queue_conflict(&r->res_grantqueue, lkb))
1649 goto out;
1650
1651 /*
1652 * 6-3: By default, a conversion request is immediately granted if the
1653 * requested mode is compatible with the modes of all other granted
1654 * locks
1655 */
1656
1657 if (queue_conflict(&r->res_convertqueue, lkb))
1658 goto out;
1659
1660 /*
1661 * 6-5: But the default algorithm for deciding whether to grant or
1662 * queue conversion requests does not by itself guarantee that such
1663 * requests are serviced on a "first come first serve" basis. This, in
1664 * turn, can lead to a phenomenon known as "indefinate postponement".
1665 *
1666 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1667 * the system service employed to request a lock conversion. This flag
1668 * forces certain conversion requests to be queued, even if they are
1669 * compatible with the granted modes of other locks on the same
1670 * resource. Thus, the use of this flag results in conversion requests
1671 * being ordered on a "first come first servce" basis.
1672 *
1673 * DCT: This condition is all about new conversions being able to occur
1674 * "in place" while the lock remains on the granted queue (assuming
1675 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1676 * doesn't _have_ to go onto the convert queue where it's processed in
1677 * order. The "now" variable is necessary to distinguish converts
1678 * being received and processed for the first time now, because once a
1679 * convert is moved to the conversion queue the condition below applies
1680 * requiring fifo granting.
1681 */
1682
1683 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1684 return 1;
e7fd4179
DT
1685
1686 /*
3bcd3687
DT
1687 * The NOORDER flag is set to avoid the standard vms rules on grant
1688 * order.
e7fd4179
DT
1689 */
1690
1691 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1692 return 1;
e7fd4179
DT
1693
1694 /*
1695 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1696 * granted until all other conversion requests ahead of it are granted
1697 * and/or canceled.
1698 */
1699
1700 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1701 return 1;
e7fd4179
DT
1702
1703 /*
1704 * 6-4: By default, a new request is immediately granted only if all
1705 * three of the following conditions are satisfied when the request is
1706 * issued:
1707 * - The queue of ungranted conversion requests for the resource is
1708 * empty.
1709 * - The queue of ungranted new requests for the resource is empty.
1710 * - The mode of the new request is compatible with the most
1711 * restrictive mode of all granted locks on the resource.
1712 */
1713
1714 if (now && !conv && list_empty(&r->res_convertqueue) &&
1715 list_empty(&r->res_waitqueue))
90135925 1716 return 1;
e7fd4179
DT
1717
1718 /*
1719 * 6-4: Once a lock request is in the queue of ungranted new requests,
1720 * it cannot be granted until the queue of ungranted conversion
1721 * requests is empty, all ungranted new requests ahead of it are
1722 * granted and/or canceled, and it is compatible with the granted mode
1723 * of the most restrictive lock granted on the resource.
1724 */
1725
1726 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1727 first_in_list(lkb, &r->res_waitqueue))
90135925 1728 return 1;
e7fd4179 1729 out:
90135925 1730 return 0;
e7fd4179
DT
1731}
1732
c85d65e9
DT
1733static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1734 int *err)
e7fd4179 1735{
e7fd4179
DT
1736 int rv;
1737 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
c85d65e9
DT
1738 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1739
1740 if (err)
1741 *err = 0;
e7fd4179
DT
1742
1743 rv = _can_be_granted(r, lkb, now);
1744 if (rv)
1745 goto out;
1746
c85d65e9
DT
1747 /*
1748 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1749 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1750 * cancels one of the locks.
1751 */
1752
1753 if (is_convert && can_be_queued(lkb) &&
1754 conversion_deadlock_detect(r, lkb)) {
1755 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1756 lkb->lkb_grmode = DLM_LOCK_NL;
1757 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1758 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1759 if (err)
1760 *err = -EDEADLK;
1761 else {
1762 log_print("can_be_granted deadlock %x now %d",
1763 lkb->lkb_id, now);
1764 dlm_dump_rsb(r);
1765 }
1766 }
e7fd4179 1767 goto out;
c85d65e9 1768 }
e7fd4179 1769
c85d65e9
DT
1770 /*
1771 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1772 * to grant a request in a mode other than the normal rqmode. It's a
1773 * simple way to provide a big optimization to applications that can
1774 * use them.
1775 */
1776
1777 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
e7fd4179 1778 alt = DLM_LOCK_PR;
c85d65e9 1779 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
e7fd4179
DT
1780 alt = DLM_LOCK_CW;
1781
1782 if (alt) {
1783 lkb->lkb_rqmode = alt;
1784 rv = _can_be_granted(r, lkb, now);
1785 if (rv)
1786 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1787 else
1788 lkb->lkb_rqmode = rqmode;
1789 }
1790 out:
1791 return rv;
1792}
1793
c85d65e9
DT
1794/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1795 for locks pending on the convert list. Once verified (watch for these
1796 log_prints), we should be able to just call _can_be_granted() and not
1797 bother with the demote/deadlk cases here (and there's no easy way to deal
1798 with a deadlk here, we'd have to generate something like grant_lock with
1799 the deadlk error.) */
1800
36509258
DT
1801/* Returns the highest requested mode of all blocked conversions; sets
1802 cw if there's a blocked conversion to DLM_LOCK_CW. */
c85d65e9 1803
36509258 1804static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1805{
1806 struct dlm_lkb *lkb, *s;
1807 int hi, demoted, quit, grant_restart, demote_restart;
c85d65e9 1808 int deadlk;
e7fd4179
DT
1809
1810 quit = 0;
1811 restart:
1812 grant_restart = 0;
1813 demote_restart = 0;
1814 hi = DLM_LOCK_IV;
1815
1816 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1817 demoted = is_demoted(lkb);
c85d65e9
DT
1818 deadlk = 0;
1819
1820 if (can_be_granted(r, lkb, 0, &deadlk)) {
e7fd4179
DT
1821 grant_lock_pending(r, lkb);
1822 grant_restart = 1;
c85d65e9 1823 continue;
e7fd4179 1824 }
c85d65e9
DT
1825
1826 if (!demoted && is_demoted(lkb)) {
1827 log_print("WARN: pending demoted %x node %d %s",
1828 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1829 demote_restart = 1;
1830 continue;
1831 }
1832
1833 if (deadlk) {
1834 log_print("WARN: pending deadlock %x node %d %s",
1835 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1836 dlm_dump_rsb(r);
1837 continue;
1838 }
1839
1840 hi = max_t(int, lkb->lkb_rqmode, hi);
36509258
DT
1841
1842 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1843 *cw = 1;
e7fd4179
DT
1844 }
1845
1846 if (grant_restart)
1847 goto restart;
1848 if (demote_restart && !quit) {
1849 quit = 1;
1850 goto restart;
1851 }
1852
1853 return max_t(int, high, hi);
1854}
1855
36509258 1856static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1857{
1858 struct dlm_lkb *lkb, *s;
1859
1860 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
c85d65e9 1861 if (can_be_granted(r, lkb, 0, NULL))
e7fd4179 1862 grant_lock_pending(r, lkb);
36509258 1863 else {
e7fd4179 1864 high = max_t(int, lkb->lkb_rqmode, high);
36509258
DT
1865 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1866 *cw = 1;
1867 }
e7fd4179
DT
1868 }
1869
1870 return high;
1871}
1872
36509258
DT
1873/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1874 on either the convert or waiting queue.
1875 high is the largest rqmode of all locks blocked on the convert or
1876 waiting queue. */
1877
1878static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1879{
1880 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1881 if (gr->lkb_highbast < DLM_LOCK_EX)
1882 return 1;
1883 return 0;
1884 }
1885
1886 if (gr->lkb_highbast < high &&
1887 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1888 return 1;
1889 return 0;
1890}
1891
e7fd4179
DT
1892static void grant_pending_locks(struct dlm_rsb *r)
1893{
1894 struct dlm_lkb *lkb, *s;
1895 int high = DLM_LOCK_IV;
36509258 1896 int cw = 0;
e7fd4179 1897
a345da3e 1898 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179 1899
36509258
DT
1900 high = grant_pending_convert(r, high, &cw);
1901 high = grant_pending_wait(r, high, &cw);
e7fd4179
DT
1902
1903 if (high == DLM_LOCK_IV)
1904 return;
1905
1906 /*
1907 * If there are locks left on the wait/convert queue then send blocking
1908 * ASTs to granted locks based on the largest requested mode (high)
36509258 1909 * found above.
e7fd4179
DT
1910 */
1911
1912 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
e5dae548 1913 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
329fc4c3
DT
1914 if (cw && high == DLM_LOCK_PR &&
1915 lkb->lkb_grmode == DLM_LOCK_PR)
36509258
DT
1916 queue_bast(r, lkb, DLM_LOCK_CW);
1917 else
1918 queue_bast(r, lkb, high);
e7fd4179
DT
1919 lkb->lkb_highbast = high;
1920 }
1921 }
1922}
1923
36509258
DT
1924static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1925{
1926 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1927 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1928 if (gr->lkb_highbast < DLM_LOCK_EX)
1929 return 1;
1930 return 0;
1931 }
1932
1933 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1934 return 1;
1935 return 0;
1936}
1937
e7fd4179
DT
1938static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1939 struct dlm_lkb *lkb)
1940{
1941 struct dlm_lkb *gr;
1942
1943 list_for_each_entry(gr, head, lkb_statequeue) {
314dd2a0
SW
1944 /* skip self when sending basts to convertqueue */
1945 if (gr == lkb)
1946 continue;
e5dae548 1947 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
e7fd4179
DT
1948 queue_bast(r, gr, lkb->lkb_rqmode);
1949 gr->lkb_highbast = lkb->lkb_rqmode;
1950 }
1951 }
1952}
1953
1954static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1955{
1956 send_bast_queue(r, &r->res_grantqueue, lkb);
1957}
1958
1959static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1960{
1961 send_bast_queue(r, &r->res_grantqueue, lkb);
1962 send_bast_queue(r, &r->res_convertqueue, lkb);
1963}
1964
1965/* set_master(r, lkb) -- set the master nodeid of a resource
1966
1967 The purpose of this function is to set the nodeid field in the given
1968 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1969 known, it can just be copied to the lkb and the function will return
1970 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1971 before it can be copied to the lkb.
1972
1973 When the rsb nodeid is being looked up remotely, the initial lkb
1974 causing the lookup is kept on the ls_waiters list waiting for the
1975 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1976 on the rsb's res_lookup list until the master is verified.
1977
1978 Return values:
1979 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1980 1: the rsb master is not available and the lkb has been placed on
1981 a wait queue
1982*/
1983
1984static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1985{
1986 struct dlm_ls *ls = r->res_ls;
755b5eb8 1987 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
e7fd4179
DT
1988
1989 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1990 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1991 r->res_first_lkid = lkb->lkb_id;
1992 lkb->lkb_nodeid = r->res_nodeid;
1993 return 0;
1994 }
1995
1996 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1997 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1998 return 1;
1999 }
2000
2001 if (r->res_nodeid == 0) {
2002 lkb->lkb_nodeid = 0;
2003 return 0;
2004 }
2005
2006 if (r->res_nodeid > 0) {
2007 lkb->lkb_nodeid = r->res_nodeid;
2008 return 0;
2009 }
2010
a345da3e 2011 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
2012
2013 dir_nodeid = dlm_dir_nodeid(r);
2014
2015 if (dir_nodeid != our_nodeid) {
2016 r->res_first_lkid = lkb->lkb_id;
2017 send_lookup(r, lkb);
2018 return 1;
2019 }
2020
755b5eb8 2021 for (i = 0; i < 2; i++) {
e7fd4179
DT
2022 /* It's possible for dlm_scand to remove an old rsb for
2023 this same resource from the toss list, us to create
2024 a new one, look up the master locally, and find it
2025 already exists just before dlm_scand does the
2026 dir_remove() on the previous rsb. */
2027
2028 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2029 r->res_length, &ret_nodeid);
2030 if (!error)
2031 break;
2032 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2033 schedule();
2034 }
755b5eb8
DT
2035 if (error && error != -EEXIST)
2036 return error;
e7fd4179
DT
2037
2038 if (ret_nodeid == our_nodeid) {
2039 r->res_first_lkid = 0;
2040 r->res_nodeid = 0;
2041 lkb->lkb_nodeid = 0;
2042 } else {
2043 r->res_first_lkid = lkb->lkb_id;
2044 r->res_nodeid = ret_nodeid;
2045 lkb->lkb_nodeid = ret_nodeid;
2046 }
2047 return 0;
2048}
2049
2050static void process_lookup_list(struct dlm_rsb *r)
2051{
2052 struct dlm_lkb *lkb, *safe;
2053
2054 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 2055 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2056 _request_lock(r, lkb);
2057 schedule();
2058 }
2059}
2060
2061/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2062
2063static void confirm_master(struct dlm_rsb *r, int error)
2064{
2065 struct dlm_lkb *lkb;
2066
2067 if (!r->res_first_lkid)
2068 return;
2069
2070 switch (error) {
2071 case 0:
2072 case -EINPROGRESS:
2073 r->res_first_lkid = 0;
2074 process_lookup_list(r);
2075 break;
2076
2077 case -EAGAIN:
aec64e1b
DT
2078 case -EBADR:
2079 case -ENOTBLK:
2080 /* the remote request failed and won't be retried (it was
2081 a NOQUEUE, or has been canceled/unlocked); make a waiting
2082 lkb the first_lkid */
e7fd4179
DT
2083
2084 r->res_first_lkid = 0;
2085
2086 if (!list_empty(&r->res_lookup)) {
2087 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2088 lkb_rsb_lookup);
ef0c2bb0 2089 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2090 r->res_first_lkid = lkb->lkb_id;
2091 _request_lock(r, lkb);
761b9d3f 2092 }
e7fd4179
DT
2093 break;
2094
2095 default:
2096 log_error(r->res_ls, "confirm_master unknown error %d", error);
2097 }
2098}
2099
2100static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
e5dae548
DT
2101 int namelen, unsigned long timeout_cs,
2102 void (*ast) (void *astparam),
2103 void *astparam,
2104 void (*bast) (void *astparam, int mode),
2105 struct dlm_args *args)
e7fd4179
DT
2106{
2107 int rv = -EINVAL;
2108
2109 /* check for invalid arg usage */
2110
2111 if (mode < 0 || mode > DLM_LOCK_EX)
2112 goto out;
2113
2114 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2115 goto out;
2116
2117 if (flags & DLM_LKF_CANCEL)
2118 goto out;
2119
2120 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2121 goto out;
2122
2123 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2124 goto out;
2125
2126 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2127 goto out;
2128
2129 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2130 goto out;
2131
2132 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2133 goto out;
2134
2135 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2136 goto out;
2137
2138 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2139 goto out;
2140
2141 if (!ast || !lksb)
2142 goto out;
2143
2144 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2145 goto out;
2146
e7fd4179
DT
2147 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2148 goto out;
2149
2150 /* these args will be copied to the lkb in validate_lock_args,
2151 it cannot be done now because when converting locks, fields in
2152 an active lkb cannot be modified before locking the rsb */
2153
2154 args->flags = flags;
e5dae548
DT
2155 args->astfn = ast;
2156 args->astparam = astparam;
2157 args->bastfn = bast;
d7db923e 2158 args->timeout = timeout_cs;
e7fd4179
DT
2159 args->mode = mode;
2160 args->lksb = lksb;
e7fd4179
DT
2161 rv = 0;
2162 out:
2163 return rv;
2164}
2165
2166static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2167{
2168 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2169 DLM_LKF_FORCEUNLOCK))
2170 return -EINVAL;
2171
ef0c2bb0
DT
2172 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2173 return -EINVAL;
2174
e7fd4179 2175 args->flags = flags;
e5dae548 2176 args->astparam = astarg;
e7fd4179
DT
2177 return 0;
2178}
2179
2180static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2181 struct dlm_args *args)
2182{
2183 int rv = -EINVAL;
2184
2185 if (args->flags & DLM_LKF_CONVERT) {
2186 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2187 goto out;
2188
2189 if (args->flags & DLM_LKF_QUECVT &&
2190 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2191 goto out;
2192
2193 rv = -EBUSY;
2194 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2195 goto out;
2196
2197 if (lkb->lkb_wait_type)
2198 goto out;
ef0c2bb0
DT
2199
2200 if (is_overlap(lkb))
2201 goto out;
e7fd4179
DT
2202 }
2203
2204 lkb->lkb_exflags = args->flags;
2205 lkb->lkb_sbflags = 0;
e5dae548 2206 lkb->lkb_astfn = args->astfn;
e7fd4179 2207 lkb->lkb_astparam = args->astparam;
e5dae548 2208 lkb->lkb_bastfn = args->bastfn;
e7fd4179
DT
2209 lkb->lkb_rqmode = args->mode;
2210 lkb->lkb_lksb = args->lksb;
2211 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2212 lkb->lkb_ownpid = (int) current->pid;
d7db923e 2213 lkb->lkb_timeout_cs = args->timeout;
e7fd4179
DT
2214 rv = 0;
2215 out:
43279e53
DT
2216 if (rv)
2217 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2218 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2219 lkb->lkb_status, lkb->lkb_wait_type,
2220 lkb->lkb_resource->res_name);
e7fd4179
DT
2221 return rv;
2222}
2223
ef0c2bb0
DT
2224/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2225 for success */
2226
2227/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2228 because there may be a lookup in progress and it's valid to do
2229 cancel/unlockf on it */
2230
e7fd4179
DT
2231static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2232{
ef0c2bb0 2233 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
2234 int rv = -EINVAL;
2235
ef0c2bb0
DT
2236 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2237 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2238 dlm_print_lkb(lkb);
e7fd4179 2239 goto out;
ef0c2bb0 2240 }
e7fd4179 2241
ef0c2bb0
DT
2242 /* an lkb may still exist even though the lock is EOL'ed due to a
2243 cancel, unlock or failed noqueue request; an app can't use these
2244 locks; return same error as if the lkid had not been found at all */
e7fd4179 2245
ef0c2bb0
DT
2246 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2247 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2248 rv = -ENOENT;
e7fd4179 2249 goto out;
ef0c2bb0 2250 }
e7fd4179 2251
ef0c2bb0
DT
2252 /* an lkb may be waiting for an rsb lookup to complete where the
2253 lookup was initiated by another lock */
2254
42dc1601
DT
2255 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2256 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
ef0c2bb0
DT
2257 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2258 list_del_init(&lkb->lkb_rsb_lookup);
2259 queue_cast(lkb->lkb_resource, lkb,
2260 args->flags & DLM_LKF_CANCEL ?
2261 -DLM_ECANCEL : -DLM_EUNLOCK);
2262 unhold_lkb(lkb); /* undoes create_lkb() */
ef0c2bb0 2263 }
42dc1601
DT
2264 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2265 rv = -EBUSY;
2266 goto out;
ef0c2bb0
DT
2267 }
2268
2269 /* cancel not allowed with another cancel/unlock in progress */
2270
2271 if (args->flags & DLM_LKF_CANCEL) {
2272 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2273 goto out;
2274
2275 if (is_overlap(lkb))
2276 goto out;
2277
3ae1acf9
DT
2278 /* don't let scand try to do a cancel */
2279 del_timeout(lkb);
2280
ef0c2bb0
DT
2281 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2282 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2283 rv = -EBUSY;
2284 goto out;
2285 }
2286
a536e381
DT
2287 /* there's nothing to cancel */
2288 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2289 !lkb->lkb_wait_type) {
2290 rv = -EBUSY;
2291 goto out;
2292 }
2293
ef0c2bb0
DT
2294 switch (lkb->lkb_wait_type) {
2295 case DLM_MSG_LOOKUP:
2296 case DLM_MSG_REQUEST:
2297 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2298 rv = -EBUSY;
2299 goto out;
2300 case DLM_MSG_UNLOCK:
2301 case DLM_MSG_CANCEL:
2302 goto out;
2303 }
2304 /* add_to_waiters() will set OVERLAP_CANCEL */
2305 goto out_ok;
2306 }
2307
2308 /* do we need to allow a force-unlock if there's a normal unlock
2309 already in progress? in what conditions could the normal unlock
2310 fail such that we'd want to send a force-unlock to be sure? */
2311
2312 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2313 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2314 goto out;
2315
2316 if (is_overlap_unlock(lkb))
2317 goto out;
e7fd4179 2318
3ae1acf9
DT
2319 /* don't let scand try to do a cancel */
2320 del_timeout(lkb);
2321
ef0c2bb0
DT
2322 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2323 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2324 rv = -EBUSY;
2325 goto out;
2326 }
2327
2328 switch (lkb->lkb_wait_type) {
2329 case DLM_MSG_LOOKUP:
2330 case DLM_MSG_REQUEST:
2331 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2332 rv = -EBUSY;
2333 goto out;
2334 case DLM_MSG_UNLOCK:
2335 goto out;
2336 }
2337 /* add_to_waiters() will set OVERLAP_UNLOCK */
2338 goto out_ok;
2339 }
2340
2341 /* normal unlock not allowed if there's any op in progress */
e7fd4179 2342 rv = -EBUSY;
ef0c2bb0 2343 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
2344 goto out;
2345
2346 out_ok:
ef0c2bb0
DT
2347 /* an overlapping op shouldn't blow away exflags from other op */
2348 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
2349 lkb->lkb_sbflags = 0;
2350 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
2351 rv = 0;
2352 out:
ef0c2bb0
DT
2353 if (rv)
2354 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2355 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2356 args->flags, lkb->lkb_wait_type,
2357 lkb->lkb_resource->res_name);
e7fd4179
DT
2358 return rv;
2359}
2360
2361/*
2362 * Four stage 4 varieties:
2363 * do_request(), do_convert(), do_unlock(), do_cancel()
2364 * These are called on the master node for the given lock and
2365 * from the central locking logic.
2366 */
2367
2368static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2369{
2370 int error = 0;
2371
c85d65e9 2372 if (can_be_granted(r, lkb, 1, NULL)) {
e7fd4179
DT
2373 grant_lock(r, lkb);
2374 queue_cast(r, lkb, 0);
2375 goto out;
2376 }
2377
2378 if (can_be_queued(lkb)) {
2379 error = -EINPROGRESS;
2380 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9 2381 add_timeout(lkb);
e7fd4179
DT
2382 goto out;
2383 }
2384
2385 error = -EAGAIN;
e7fd4179 2386 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2387 out:
2388 return error;
2389}
2390
cf6620ac
DT
2391static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2392 int error)
2393{
2394 switch (error) {
2395 case -EAGAIN:
2396 if (force_blocking_asts(lkb))
2397 send_blocking_asts_all(r, lkb);
2398 break;
2399 case -EINPROGRESS:
2400 send_blocking_asts(r, lkb);
2401 break;
2402 }
2403}
2404
e7fd4179
DT
2405static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2406{
2407 int error = 0;
c85d65e9 2408 int deadlk = 0;
e7fd4179
DT
2409
2410 /* changing an existing lock may allow others to be granted */
2411
c85d65e9 2412 if (can_be_granted(r, lkb, 1, &deadlk)) {
e7fd4179
DT
2413 grant_lock(r, lkb);
2414 queue_cast(r, lkb, 0);
e7fd4179
DT
2415 goto out;
2416 }
2417
c85d65e9
DT
2418 /* can_be_granted() detected that this lock would block in a conversion
2419 deadlock, so we leave it on the granted queue and return EDEADLK in
2420 the ast for the convert. */
2421
2422 if (deadlk) {
2423 /* it's left on the granted queue */
c85d65e9
DT
2424 revert_lock(r, lkb);
2425 queue_cast(r, lkb, -EDEADLK);
2426 error = -EDEADLK;
2427 goto out;
2428 }
2429
7d3c1feb
DT
2430 /* is_demoted() means the can_be_granted() above set the grmode
2431 to NL, and left us on the granted queue. This auto-demotion
2432 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2433 now grantable. We have to try to grant other converting locks
2434 before we try again to grant this one. */
2435
2436 if (is_demoted(lkb)) {
36509258 2437 grant_pending_convert(r, DLM_LOCK_IV, NULL);
7d3c1feb
DT
2438 if (_can_be_granted(r, lkb, 1)) {
2439 grant_lock(r, lkb);
2440 queue_cast(r, lkb, 0);
7d3c1feb
DT
2441 goto out;
2442 }
2443 /* else fall through and move to convert queue */
2444 }
2445
2446 if (can_be_queued(lkb)) {
e7fd4179
DT
2447 error = -EINPROGRESS;
2448 del_lkb(r, lkb);
2449 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 2450 add_timeout(lkb);
e7fd4179
DT
2451 goto out;
2452 }
2453
2454 error = -EAGAIN;
e7fd4179 2455 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2456 out:
2457 return error;
2458}
2459
cf6620ac
DT
2460static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2461 int error)
2462{
2463 switch (error) {
2464 case 0:
2465 grant_pending_locks(r);
2466 /* grant_pending_locks also sends basts */
2467 break;
2468 case -EAGAIN:
2469 if (force_blocking_asts(lkb))
2470 send_blocking_asts_all(r, lkb);
2471 break;
2472 case -EINPROGRESS:
2473 send_blocking_asts(r, lkb);
2474 break;
2475 }
2476}
2477
e7fd4179
DT
2478static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2479{
2480 remove_lock(r, lkb);
2481 queue_cast(r, lkb, -DLM_EUNLOCK);
e7fd4179
DT
2482 return -DLM_EUNLOCK;
2483}
2484
cf6620ac
DT
2485static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2486 int error)
2487{
2488 grant_pending_locks(r);
2489}
2490
ef0c2bb0 2491/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 2492
e7fd4179
DT
2493static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2494{
ef0c2bb0
DT
2495 int error;
2496
2497 error = revert_lock(r, lkb);
2498 if (error) {
2499 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
2500 return -DLM_ECANCEL;
2501 }
2502 return 0;
e7fd4179
DT
2503}
2504
cf6620ac
DT
2505static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2506 int error)
2507{
2508 if (error)
2509 grant_pending_locks(r);
2510}
2511
e7fd4179
DT
2512/*
2513 * Four stage 3 varieties:
2514 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2515 */
2516
2517/* add a new lkb to a possibly new rsb, called by requesting process */
2518
2519static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2520{
2521 int error;
2522
2523 /* set_master: sets lkb nodeid from r */
2524
2525 error = set_master(r, lkb);
2526 if (error < 0)
2527 goto out;
2528 if (error) {
2529 error = 0;
2530 goto out;
2531 }
2532
cf6620ac 2533 if (is_remote(r)) {
e7fd4179
DT
2534 /* receive_request() calls do_request() on remote node */
2535 error = send_request(r, lkb);
cf6620ac 2536 } else {
e7fd4179 2537 error = do_request(r, lkb);
cf6620ac
DT
2538 /* for remote locks the request_reply is sent
2539 between do_request and do_request_effects */
2540 do_request_effects(r, lkb, error);
2541 }
e7fd4179
DT
2542 out:
2543 return error;
2544}
2545
3bcd3687 2546/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2547
2548static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2549{
2550 int error;
2551
cf6620ac 2552 if (is_remote(r)) {
e7fd4179
DT
2553 /* receive_convert() calls do_convert() on remote node */
2554 error = send_convert(r, lkb);
cf6620ac 2555 } else {
e7fd4179 2556 error = do_convert(r, lkb);
cf6620ac
DT
2557 /* for remote locks the convert_reply is sent
2558 between do_convert and do_convert_effects */
2559 do_convert_effects(r, lkb, error);
2560 }
e7fd4179
DT
2561
2562 return error;
2563}
2564
2565/* remove an existing lkb from the granted queue */
2566
2567static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2568{
2569 int error;
2570
cf6620ac 2571 if (is_remote(r)) {
e7fd4179
DT
2572 /* receive_unlock() calls do_unlock() on remote node */
2573 error = send_unlock(r, lkb);
cf6620ac 2574 } else {
e7fd4179 2575 error = do_unlock(r, lkb);
cf6620ac
DT
2576 /* for remote locks the unlock_reply is sent
2577 between do_unlock and do_unlock_effects */
2578 do_unlock_effects(r, lkb, error);
2579 }
e7fd4179
DT
2580
2581 return error;
2582}
2583
2584/* remove an existing lkb from the convert or wait queue */
2585
2586static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2587{
2588 int error;
2589
cf6620ac 2590 if (is_remote(r)) {
e7fd4179
DT
2591 /* receive_cancel() calls do_cancel() on remote node */
2592 error = send_cancel(r, lkb);
cf6620ac 2593 } else {
e7fd4179 2594 error = do_cancel(r, lkb);
cf6620ac
DT
2595 /* for remote locks the cancel_reply is sent
2596 between do_cancel and do_cancel_effects */
2597 do_cancel_effects(r, lkb, error);
2598 }
e7fd4179
DT
2599
2600 return error;
2601}
2602
2603/*
2604 * Four stage 2 varieties:
2605 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2606 */
2607
2608static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2609 int len, struct dlm_args *args)
2610{
2611 struct dlm_rsb *r;
2612 int error;
2613
2614 error = validate_lock_args(ls, lkb, args);
2615 if (error)
2616 goto out;
2617
2618 error = find_rsb(ls, name, len, R_CREATE, &r);
2619 if (error)
2620 goto out;
2621
2622 lock_rsb(r);
2623
2624 attach_lkb(r, lkb);
2625 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2626
2627 error = _request_lock(r, lkb);
2628
2629 unlock_rsb(r);
2630 put_rsb(r);
2631
2632 out:
2633 return error;
2634}
2635
2636static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2637 struct dlm_args *args)
2638{
2639 struct dlm_rsb *r;
2640 int error;
2641
2642 r = lkb->lkb_resource;
2643
2644 hold_rsb(r);
2645 lock_rsb(r);
2646
2647 error = validate_lock_args(ls, lkb, args);
2648 if (error)
2649 goto out;
2650
2651 error = _convert_lock(r, lkb);
2652 out:
2653 unlock_rsb(r);
2654 put_rsb(r);
2655 return error;
2656}
2657
2658static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2659 struct dlm_args *args)
2660{
2661 struct dlm_rsb *r;
2662 int error;
2663
2664 r = lkb->lkb_resource;
2665
2666 hold_rsb(r);
2667 lock_rsb(r);
2668
2669 error = validate_unlock_args(lkb, args);
2670 if (error)
2671 goto out;
2672
2673 error = _unlock_lock(r, lkb);
2674 out:
2675 unlock_rsb(r);
2676 put_rsb(r);
2677 return error;
2678}
2679
2680static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2681 struct dlm_args *args)
2682{
2683 struct dlm_rsb *r;
2684 int error;
2685
2686 r = lkb->lkb_resource;
2687
2688 hold_rsb(r);
2689 lock_rsb(r);
2690
2691 error = validate_unlock_args(lkb, args);
2692 if (error)
2693 goto out;
2694
2695 error = _cancel_lock(r, lkb);
2696 out:
2697 unlock_rsb(r);
2698 put_rsb(r);
2699 return error;
2700}
2701
2702/*
2703 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2704 */
2705
2706int dlm_lock(dlm_lockspace_t *lockspace,
2707 int mode,
2708 struct dlm_lksb *lksb,
2709 uint32_t flags,
2710 void *name,
2711 unsigned int namelen,
2712 uint32_t parent_lkid,
2713 void (*ast) (void *astarg),
2714 void *astarg,
3bcd3687 2715 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2716{
2717 struct dlm_ls *ls;
2718 struct dlm_lkb *lkb;
2719 struct dlm_args args;
2720 int error, convert = flags & DLM_LKF_CONVERT;
2721
2722 ls = dlm_find_lockspace_local(lockspace);
2723 if (!ls)
2724 return -EINVAL;
2725
85e86edf 2726 dlm_lock_recovery(ls);
e7fd4179
DT
2727
2728 if (convert)
2729 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2730 else
2731 error = create_lkb(ls, &lkb);
2732
2733 if (error)
2734 goto out;
2735
d7db923e 2736 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3bcd3687 2737 astarg, bast, &args);
e7fd4179
DT
2738 if (error)
2739 goto out_put;
2740
2741 if (convert)
2742 error = convert_lock(ls, lkb, &args);
2743 else
2744 error = request_lock(ls, lkb, name, namelen, &args);
2745
2746 if (error == -EINPROGRESS)
2747 error = 0;
2748 out_put:
2749 if (convert || error)
b3f58d8f 2750 __put_lkb(ls, lkb);
c85d65e9 2751 if (error == -EAGAIN || error == -EDEADLK)
e7fd4179
DT
2752 error = 0;
2753 out:
85e86edf 2754 dlm_unlock_recovery(ls);
e7fd4179
DT
2755 dlm_put_lockspace(ls);
2756 return error;
2757}
2758
2759int dlm_unlock(dlm_lockspace_t *lockspace,
2760 uint32_t lkid,
2761 uint32_t flags,
2762 struct dlm_lksb *lksb,
2763 void *astarg)
2764{
2765 struct dlm_ls *ls;
2766 struct dlm_lkb *lkb;
2767 struct dlm_args args;
2768 int error;
2769
2770 ls = dlm_find_lockspace_local(lockspace);
2771 if (!ls)
2772 return -EINVAL;
2773
85e86edf 2774 dlm_lock_recovery(ls);
e7fd4179
DT
2775
2776 error = find_lkb(ls, lkid, &lkb);
2777 if (error)
2778 goto out;
2779
2780 error = set_unlock_args(flags, astarg, &args);
2781 if (error)
2782 goto out_put;
2783
2784 if (flags & DLM_LKF_CANCEL)
2785 error = cancel_lock(ls, lkb, &args);
2786 else
2787 error = unlock_lock(ls, lkb, &args);
2788
2789 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2790 error = 0;
ef0c2bb0
DT
2791 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2792 error = 0;
e7fd4179 2793 out_put:
b3f58d8f 2794 dlm_put_lkb(lkb);
e7fd4179 2795 out:
85e86edf 2796 dlm_unlock_recovery(ls);
e7fd4179
DT
2797 dlm_put_lockspace(ls);
2798 return error;
2799}
2800
2801/*
2802 * send/receive routines for remote operations and replies
2803 *
2804 * send_args
2805 * send_common
2806 * send_request receive_request
2807 * send_convert receive_convert
2808 * send_unlock receive_unlock
2809 * send_cancel receive_cancel
2810 * send_grant receive_grant
2811 * send_bast receive_bast
2812 * send_lookup receive_lookup
2813 * send_remove receive_remove
2814 *
2815 * send_common_reply
2816 * receive_request_reply send_request_reply
2817 * receive_convert_reply send_convert_reply
2818 * receive_unlock_reply send_unlock_reply
2819 * receive_cancel_reply send_cancel_reply
2820 * receive_lookup_reply send_lookup_reply
2821 */
2822
7e4dac33
DT
2823static int _create_message(struct dlm_ls *ls, int mb_len,
2824 int to_nodeid, int mstype,
2825 struct dlm_message **ms_ret,
2826 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2827{
2828 struct dlm_message *ms;
2829 struct dlm_mhandle *mh;
2830 char *mb;
e7fd4179
DT
2831
2832 /* get_buffer gives us a message handle (mh) that we need to
2833 pass into lowcomms_commit and a message buffer (mb) that we
2834 write our data into */
2835
573c24c4 2836 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
e7fd4179
DT
2837 if (!mh)
2838 return -ENOBUFS;
2839
2840 memset(mb, 0, mb_len);
2841
2842 ms = (struct dlm_message *) mb;
2843
2844 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2845 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2846 ms->m_header.h_nodeid = dlm_our_nodeid();
2847 ms->m_header.h_length = mb_len;
2848 ms->m_header.h_cmd = DLM_MSG;
2849
2850 ms->m_type = mstype;
2851
2852 *mh_ret = mh;
2853 *ms_ret = ms;
2854 return 0;
2855}
2856
7e4dac33
DT
2857static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2858 int to_nodeid, int mstype,
2859 struct dlm_message **ms_ret,
2860 struct dlm_mhandle **mh_ret)
2861{
2862 int mb_len = sizeof(struct dlm_message);
2863
2864 switch (mstype) {
2865 case DLM_MSG_REQUEST:
2866 case DLM_MSG_LOOKUP:
2867 case DLM_MSG_REMOVE:
2868 mb_len += r->res_length;
2869 break;
2870 case DLM_MSG_CONVERT:
2871 case DLM_MSG_UNLOCK:
2872 case DLM_MSG_REQUEST_REPLY:
2873 case DLM_MSG_CONVERT_REPLY:
2874 case DLM_MSG_GRANT:
2875 if (lkb && lkb->lkb_lvbptr)
2876 mb_len += r->res_ls->ls_lvblen;
2877 break;
2878 }
2879
2880 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2881 ms_ret, mh_ret);
2882}
2883
e7fd4179
DT
2884/* further lowcomms enhancements or alternate implementations may make
2885 the return value from this function useful at some point */
2886
2887static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2888{
2889 dlm_message_out(ms);
2890 dlm_lowcomms_commit_buffer(mh);
2891 return 0;
2892}
2893
2894static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2895 struct dlm_message *ms)
2896{
2897 ms->m_nodeid = lkb->lkb_nodeid;
2898 ms->m_pid = lkb->lkb_ownpid;
2899 ms->m_lkid = lkb->lkb_id;
2900 ms->m_remid = lkb->lkb_remid;
2901 ms->m_exflags = lkb->lkb_exflags;
2902 ms->m_sbflags = lkb->lkb_sbflags;
2903 ms->m_flags = lkb->lkb_flags;
2904 ms->m_lvbseq = lkb->lkb_lvbseq;
2905 ms->m_status = lkb->lkb_status;
2906 ms->m_grmode = lkb->lkb_grmode;
2907 ms->m_rqmode = lkb->lkb_rqmode;
2908 ms->m_hash = r->res_hash;
2909
2910 /* m_result and m_bastmode are set from function args,
2911 not from lkb fields */
2912
e5dae548 2913 if (lkb->lkb_bastfn)
8304d6f2 2914 ms->m_asts |= DLM_CB_BAST;
e5dae548 2915 if (lkb->lkb_astfn)
8304d6f2 2916 ms->m_asts |= DLM_CB_CAST;
e7fd4179 2917
da49f36f
DT
2918 /* compare with switch in create_message; send_remove() doesn't
2919 use send_args() */
e7fd4179 2920
da49f36f
DT
2921 switch (ms->m_type) {
2922 case DLM_MSG_REQUEST:
2923 case DLM_MSG_LOOKUP:
2924 memcpy(ms->m_extra, r->res_name, r->res_length);
2925 break;
2926 case DLM_MSG_CONVERT:
2927 case DLM_MSG_UNLOCK:
2928 case DLM_MSG_REQUEST_REPLY:
2929 case DLM_MSG_CONVERT_REPLY:
2930 case DLM_MSG_GRANT:
2931 if (!lkb->lkb_lvbptr)
2932 break;
e7fd4179 2933 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
2934 break;
2935 }
e7fd4179
DT
2936}
2937
2938static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2939{
2940 struct dlm_message *ms;
2941 struct dlm_mhandle *mh;
2942 int to_nodeid, error;
2943
c6ff669b
DT
2944 to_nodeid = r->res_nodeid;
2945
2946 error = add_to_waiters(lkb, mstype, to_nodeid);
ef0c2bb0
DT
2947 if (error)
2948 return error;
e7fd4179 2949
e7fd4179
DT
2950 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2951 if (error)
2952 goto fail;
2953
2954 send_args(r, lkb, ms);
2955
2956 error = send_message(mh, ms);
2957 if (error)
2958 goto fail;
2959 return 0;
2960
2961 fail:
ef0c2bb0 2962 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
2963 return error;
2964}
2965
2966static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2967{
2968 return send_common(r, lkb, DLM_MSG_REQUEST);
2969}
2970
2971static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2972{
2973 int error;
2974
2975 error = send_common(r, lkb, DLM_MSG_CONVERT);
2976
2977 /* down conversions go without a reply from the master */
2978 if (!error && down_conversion(lkb)) {
ef0c2bb0 2979 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2a7ce0ed 2980 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
ef0c2bb0 2981 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179
DT
2982 r->res_ls->ls_stub_ms.m_result = 0;
2983 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2984 }
2985
2986 return error;
2987}
2988
2989/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2990 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2991 that the master is still correct. */
2992
2993static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2994{
2995 return send_common(r, lkb, DLM_MSG_UNLOCK);
2996}
2997
2998static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2999{
3000 return send_common(r, lkb, DLM_MSG_CANCEL);
3001}
3002
3003static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3004{
3005 struct dlm_message *ms;
3006 struct dlm_mhandle *mh;
3007 int to_nodeid, error;
3008
3009 to_nodeid = lkb->lkb_nodeid;
3010
3011 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3012 if (error)
3013 goto out;
3014
3015 send_args(r, lkb, ms);
3016
3017 ms->m_result = 0;
3018
3019 error = send_message(mh, ms);
3020 out:
3021 return error;
3022}
3023
3024static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3025{
3026 struct dlm_message *ms;
3027 struct dlm_mhandle *mh;
3028 int to_nodeid, error;
3029
3030 to_nodeid = lkb->lkb_nodeid;
3031
3032 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3033 if (error)
3034 goto out;
3035
3036 send_args(r, lkb, ms);
3037
3038 ms->m_bastmode = mode;
3039
3040 error = send_message(mh, ms);
3041 out:
3042 return error;
3043}
3044
3045static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3046{
3047 struct dlm_message *ms;
3048 struct dlm_mhandle *mh;
3049 int to_nodeid, error;
3050
c6ff669b
DT
3051 to_nodeid = dlm_dir_nodeid(r);
3052
3053 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
ef0c2bb0
DT
3054 if (error)
3055 return error;
e7fd4179 3056
e7fd4179
DT
3057 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3058 if (error)
3059 goto fail;
3060
3061 send_args(r, lkb, ms);
3062
3063 error = send_message(mh, ms);
3064 if (error)
3065 goto fail;
3066 return 0;
3067
3068 fail:
ef0c2bb0 3069 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
3070 return error;
3071}
3072
3073static int send_remove(struct dlm_rsb *r)
3074{
3075 struct dlm_message *ms;
3076 struct dlm_mhandle *mh;
3077 int to_nodeid, error;
3078
3079 to_nodeid = dlm_dir_nodeid(r);
3080
3081 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3082 if (error)
3083 goto out;
3084
3085 memcpy(ms->m_extra, r->res_name, r->res_length);
3086 ms->m_hash = r->res_hash;
3087
3088 error = send_message(mh, ms);
3089 out:
3090 return error;
3091}
3092
3093static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3094 int mstype, int rv)
3095{
3096 struct dlm_message *ms;
3097 struct dlm_mhandle *mh;
3098 int to_nodeid, error;
3099
3100 to_nodeid = lkb->lkb_nodeid;
3101
3102 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3103 if (error)
3104 goto out;
3105
3106 send_args(r, lkb, ms);
3107
3108 ms->m_result = rv;
3109
3110 error = send_message(mh, ms);
3111 out:
3112 return error;
3113}
3114
3115static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3116{
3117 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3118}
3119
3120static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3121{
3122 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3123}
3124
3125static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3126{
3127 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3128}
3129
3130static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3131{
3132 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3133}
3134
3135static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3136 int ret_nodeid, int rv)
3137{
3138 struct dlm_rsb *r = &ls->ls_stub_rsb;
3139 struct dlm_message *ms;
3140 struct dlm_mhandle *mh;
3141 int error, nodeid = ms_in->m_header.h_nodeid;
3142
3143 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3144 if (error)
3145 goto out;
3146
3147 ms->m_lkid = ms_in->m_lkid;
3148 ms->m_result = rv;
3149 ms->m_nodeid = ret_nodeid;
3150
3151 error = send_message(mh, ms);
3152 out:
3153 return error;
3154}
3155
3156/* which args we save from a received message depends heavily on the type
3157 of message, unlike the send side where we can safely send everything about
3158 the lkb for any type of message */
3159
3160static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3161{
3162 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 3163 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
3164 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3165 (ms->m_flags & 0x0000FFFF);
3166}
3167
3168static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3169{
2a7ce0ed
DT
3170 if (ms->m_flags == DLM_IFL_STUB_MS)
3171 return;
3172
e7fd4179
DT
3173 lkb->lkb_sbflags = ms->m_sbflags;
3174 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3175 (ms->m_flags & 0x0000FFFF);
3176}
3177
3178static int receive_extralen(struct dlm_message *ms)
3179{
3180 return (ms->m_header.h_length - sizeof(struct dlm_message));
3181}
3182
e7fd4179
DT
3183static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3184 struct dlm_message *ms)
3185{
3186 int len;
3187
3188 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3189 if (!lkb->lkb_lvbptr)
52bda2b5 3190 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
3191 if (!lkb->lkb_lvbptr)
3192 return -ENOMEM;
3193 len = receive_extralen(ms);
a9cc9159
AV
3194 if (len > DLM_RESNAME_MAXLEN)
3195 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
3196 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3197 }
3198 return 0;
3199}
3200
e5dae548
DT
3201static void fake_bastfn(void *astparam, int mode)
3202{
3203 log_print("fake_bastfn should not be called");
3204}
3205
3206static void fake_astfn(void *astparam)
3207{
3208 log_print("fake_astfn should not be called");
3209}
3210
e7fd4179
DT
3211static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3212 struct dlm_message *ms)
3213{
3214 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3215 lkb->lkb_ownpid = ms->m_pid;
3216 lkb->lkb_remid = ms->m_lkid;
3217 lkb->lkb_grmode = DLM_LOCK_IV;
3218 lkb->lkb_rqmode = ms->m_rqmode;
e5dae548 3219
8304d6f2
DT
3220 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3221 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 3222
8d07fd50
DT
3223 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3224 /* lkb was just created so there won't be an lvb yet */
52bda2b5 3225 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
8d07fd50
DT
3226 if (!lkb->lkb_lvbptr)
3227 return -ENOMEM;
3228 }
e7fd4179
DT
3229
3230 return 0;
3231}
3232
3233static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3234 struct dlm_message *ms)
3235{
e7fd4179
DT
3236 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3237 return -EBUSY;
3238
e7fd4179
DT
3239 if (receive_lvb(ls, lkb, ms))
3240 return -ENOMEM;
3241
3242 lkb->lkb_rqmode = ms->m_rqmode;
3243 lkb->lkb_lvbseq = ms->m_lvbseq;
3244
3245 return 0;
3246}
3247
3248static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3249 struct dlm_message *ms)
3250{
e7fd4179
DT
3251 if (receive_lvb(ls, lkb, ms))
3252 return -ENOMEM;
3253 return 0;
3254}
3255
3256/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3257 uses to send a reply and that the remote end uses to process the reply. */
3258
3259static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3260{
3261 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3262 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3263 lkb->lkb_remid = ms->m_lkid;
3264}
3265
c54e04b0
DT
3266/* This is called after the rsb is locked so that we can safely inspect
3267 fields in the lkb. */
3268
3269static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3270{
3271 int from = ms->m_header.h_nodeid;
3272 int error = 0;
3273
3274 switch (ms->m_type) {
3275 case DLM_MSG_CONVERT:
3276 case DLM_MSG_UNLOCK:
3277 case DLM_MSG_CANCEL:
3278 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3279 error = -EINVAL;
3280 break;
3281
3282 case DLM_MSG_CONVERT_REPLY:
3283 case DLM_MSG_UNLOCK_REPLY:
3284 case DLM_MSG_CANCEL_REPLY:
3285 case DLM_MSG_GRANT:
3286 case DLM_MSG_BAST:
3287 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3288 error = -EINVAL;
3289 break;
3290
3291 case DLM_MSG_REQUEST_REPLY:
3292 if (!is_process_copy(lkb))
3293 error = -EINVAL;
3294 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3295 error = -EINVAL;
3296 break;
3297
3298 default:
3299 error = -EINVAL;
3300 }
3301
3302 if (error)
3303 log_error(lkb->lkb_resource->res_ls,
3304 "ignore invalid message %d from %d %x %x %x %d",
3305 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3306 lkb->lkb_flags, lkb->lkb_nodeid);
3307 return error;
3308}
3309
e7fd4179
DT
3310static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3311{
3312 struct dlm_lkb *lkb;
3313 struct dlm_rsb *r;
3314 int error, namelen;
3315
3316 error = create_lkb(ls, &lkb);
3317 if (error)
3318 goto fail;
3319
3320 receive_flags(lkb, ms);
3321 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3322 error = receive_request_args(ls, lkb, ms);
3323 if (error) {
b3f58d8f 3324 __put_lkb(ls, lkb);
e7fd4179
DT
3325 goto fail;
3326 }
3327
3328 namelen = receive_extralen(ms);
3329
3330 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3331 if (error) {
b3f58d8f 3332 __put_lkb(ls, lkb);
e7fd4179
DT
3333 goto fail;
3334 }
3335
3336 lock_rsb(r);
3337
3338 attach_lkb(r, lkb);
3339 error = do_request(r, lkb);
3340 send_request_reply(r, lkb, error);
cf6620ac 3341 do_request_effects(r, lkb, error);
e7fd4179
DT
3342
3343 unlock_rsb(r);
3344 put_rsb(r);
3345
3346 if (error == -EINPROGRESS)
3347 error = 0;
3348 if (error)
b3f58d8f 3349 dlm_put_lkb(lkb);
e7fd4179
DT
3350 return;
3351
3352 fail:
3353 setup_stub_lkb(ls, ms);
3354 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3355}
3356
3357static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3358{
3359 struct dlm_lkb *lkb;
3360 struct dlm_rsb *r;
90135925 3361 int error, reply = 1;
e7fd4179
DT
3362
3363 error = find_lkb(ls, ms->m_remid, &lkb);
3364 if (error)
3365 goto fail;
3366
3367 r = lkb->lkb_resource;
3368
3369 hold_rsb(r);
3370 lock_rsb(r);
3371
c54e04b0
DT
3372 error = validate_message(lkb, ms);
3373 if (error)
3374 goto out;
3375
e7fd4179 3376 receive_flags(lkb, ms);
cf6620ac 3377
e7fd4179 3378 error = receive_convert_args(ls, lkb, ms);
cf6620ac
DT
3379 if (error) {
3380 send_convert_reply(r, lkb, error);
3381 goto out;
3382 }
3383
e7fd4179
DT
3384 reply = !down_conversion(lkb);
3385
3386 error = do_convert(r, lkb);
e7fd4179
DT
3387 if (reply)
3388 send_convert_reply(r, lkb, error);
cf6620ac 3389 do_convert_effects(r, lkb, error);
c54e04b0 3390 out:
e7fd4179
DT
3391 unlock_rsb(r);
3392 put_rsb(r);
b3f58d8f 3393 dlm_put_lkb(lkb);
e7fd4179
DT
3394 return;
3395
3396 fail:
3397 setup_stub_lkb(ls, ms);
3398 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3399}
3400
3401static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3402{
3403 struct dlm_lkb *lkb;
3404 struct dlm_rsb *r;
3405 int error;
3406
3407 error = find_lkb(ls, ms->m_remid, &lkb);
3408 if (error)
3409 goto fail;
3410
3411 r = lkb->lkb_resource;
3412
3413 hold_rsb(r);
3414 lock_rsb(r);
3415
c54e04b0
DT
3416 error = validate_message(lkb, ms);
3417 if (error)
3418 goto out;
3419
e7fd4179 3420 receive_flags(lkb, ms);
cf6620ac 3421
e7fd4179 3422 error = receive_unlock_args(ls, lkb, ms);
cf6620ac
DT
3423 if (error) {
3424 send_unlock_reply(r, lkb, error);
3425 goto out;
3426 }
e7fd4179
DT
3427
3428 error = do_unlock(r, lkb);
e7fd4179 3429 send_unlock_reply(r, lkb, error);
cf6620ac 3430 do_unlock_effects(r, lkb, error);
c54e04b0 3431 out:
e7fd4179
DT
3432 unlock_rsb(r);
3433 put_rsb(r);
b3f58d8f 3434 dlm_put_lkb(lkb);
e7fd4179
DT
3435 return;
3436
3437 fail:
3438 setup_stub_lkb(ls, ms);
3439 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3440}
3441
3442static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3443{
3444 struct dlm_lkb *lkb;
3445 struct dlm_rsb *r;
3446 int error;
3447
3448 error = find_lkb(ls, ms->m_remid, &lkb);
3449 if (error)
3450 goto fail;
3451
3452 receive_flags(lkb, ms);
3453
3454 r = lkb->lkb_resource;
3455
3456 hold_rsb(r);
3457 lock_rsb(r);
3458
c54e04b0
DT
3459 error = validate_message(lkb, ms);
3460 if (error)
3461 goto out;
3462
e7fd4179
DT
3463 error = do_cancel(r, lkb);
3464 send_cancel_reply(r, lkb, error);
cf6620ac 3465 do_cancel_effects(r, lkb, error);
c54e04b0 3466 out:
e7fd4179
DT
3467 unlock_rsb(r);
3468 put_rsb(r);
b3f58d8f 3469 dlm_put_lkb(lkb);
e7fd4179
DT
3470 return;
3471
3472 fail:
3473 setup_stub_lkb(ls, ms);
3474 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3475}
3476
3477static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3478{
3479 struct dlm_lkb *lkb;
3480 struct dlm_rsb *r;
3481 int error;
3482
3483 error = find_lkb(ls, ms->m_remid, &lkb);
3484 if (error) {
c54e04b0
DT
3485 log_debug(ls, "receive_grant from %d no lkb %x",
3486 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3487 return;
3488 }
e7fd4179
DT
3489
3490 r = lkb->lkb_resource;
3491
3492 hold_rsb(r);
3493 lock_rsb(r);
3494
c54e04b0
DT
3495 error = validate_message(lkb, ms);
3496 if (error)
3497 goto out;
3498
e7fd4179 3499 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3500 if (is_altmode(lkb))
3501 munge_altmode(lkb, ms);
e7fd4179
DT
3502 grant_lock_pc(r, lkb, ms);
3503 queue_cast(r, lkb, 0);
c54e04b0 3504 out:
e7fd4179
DT
3505 unlock_rsb(r);
3506 put_rsb(r);
b3f58d8f 3507 dlm_put_lkb(lkb);
e7fd4179
DT
3508}
3509
3510static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3511{
3512 struct dlm_lkb *lkb;
3513 struct dlm_rsb *r;
3514 int error;
3515
3516 error = find_lkb(ls, ms->m_remid, &lkb);
3517 if (error) {
c54e04b0
DT
3518 log_debug(ls, "receive_bast from %d no lkb %x",
3519 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3520 return;
3521 }
e7fd4179
DT
3522
3523 r = lkb->lkb_resource;
3524
3525 hold_rsb(r);
3526 lock_rsb(r);
3527
c54e04b0
DT
3528 error = validate_message(lkb, ms);
3529 if (error)
3530 goto out;
e7fd4179 3531
c54e04b0
DT
3532 queue_bast(r, lkb, ms->m_bastmode);
3533 out:
e7fd4179
DT
3534 unlock_rsb(r);
3535 put_rsb(r);
b3f58d8f 3536 dlm_put_lkb(lkb);
e7fd4179
DT
3537}
3538
3539static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3540{
3541 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3542
3543 from_nodeid = ms->m_header.h_nodeid;
3544 our_nodeid = dlm_our_nodeid();
3545
3546 len = receive_extralen(ms);
3547
3548 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3549 if (dir_nodeid != our_nodeid) {
3550 log_error(ls, "lookup dir_nodeid %d from %d",
3551 dir_nodeid, from_nodeid);
3552 error = -EINVAL;
3553 ret_nodeid = -1;
3554 goto out;
3555 }
3556
3557 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3558
3559 /* Optimization: we're master so treat lookup as a request */
3560 if (!error && ret_nodeid == our_nodeid) {
3561 receive_request(ls, ms);
3562 return;
3563 }
3564 out:
3565 send_lookup_reply(ls, ms, ret_nodeid, error);
3566}
3567
3568static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3569{
3570 int len, dir_nodeid, from_nodeid;
3571
3572 from_nodeid = ms->m_header.h_nodeid;
3573
3574 len = receive_extralen(ms);
3575
3576 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3577 if (dir_nodeid != dlm_our_nodeid()) {
3578 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3579 dir_nodeid, from_nodeid);
3580 return;
3581 }
3582
3583 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3584}
3585
8499137d
DT
3586static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3587{
3588 do_purge(ls, ms->m_nodeid, ms->m_pid);
3589}
3590
e7fd4179
DT
3591static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3592{
3593 struct dlm_lkb *lkb;
3594 struct dlm_rsb *r;
ef0c2bb0 3595 int error, mstype, result;
e7fd4179
DT
3596
3597 error = find_lkb(ls, ms->m_remid, &lkb);
3598 if (error) {
c54e04b0
DT
3599 log_debug(ls, "receive_request_reply from %d no lkb %x",
3600 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3601 return;
3602 }
e7fd4179 3603
e7fd4179
DT
3604 r = lkb->lkb_resource;
3605 hold_rsb(r);
3606 lock_rsb(r);
3607
c54e04b0
DT
3608 error = validate_message(lkb, ms);
3609 if (error)
3610 goto out;
3611
ef0c2bb0
DT
3612 mstype = lkb->lkb_wait_type;
3613 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3614 if (error)
3615 goto out;
3616
e7fd4179
DT
3617 /* Optimization: the dir node was also the master, so it took our
3618 lookup as a request and sent request reply instead of lookup reply */
3619 if (mstype == DLM_MSG_LOOKUP) {
3620 r->res_nodeid = ms->m_header.h_nodeid;
3621 lkb->lkb_nodeid = r->res_nodeid;
3622 }
3623
ef0c2bb0
DT
3624 /* this is the value returned from do_request() on the master */
3625 result = ms->m_result;
3626
3627 switch (result) {
e7fd4179 3628 case -EAGAIN:
ef0c2bb0 3629 /* request would block (be queued) on remote master */
e7fd4179
DT
3630 queue_cast(r, lkb, -EAGAIN);
3631 confirm_master(r, -EAGAIN);
ef0c2bb0 3632 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3633 break;
3634
3635 case -EINPROGRESS:
3636 case 0:
3637 /* request was queued or granted on remote master */
3638 receive_flags_reply(lkb, ms);
3639 lkb->lkb_remid = ms->m_lkid;
7d3c1feb
DT
3640 if (is_altmode(lkb))
3641 munge_altmode(lkb, ms);
3ae1acf9 3642 if (result) {
e7fd4179 3643 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9
DT
3644 add_timeout(lkb);
3645 } else {
e7fd4179
DT
3646 grant_lock_pc(r, lkb, ms);
3647 queue_cast(r, lkb, 0);
3648 }
ef0c2bb0 3649 confirm_master(r, result);
e7fd4179
DT
3650 break;
3651
597d0cae 3652 case -EBADR:
e7fd4179
DT
3653 case -ENOTBLK:
3654 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3655 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3656 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3657 r->res_nodeid = -1;
3658 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3659
3660 if (is_overlap(lkb)) {
3661 /* we'll ignore error in cancel/unlock reply */
3662 queue_cast_overlap(r, lkb);
aec64e1b 3663 confirm_master(r, result);
ef0c2bb0
DT
3664 unhold_lkb(lkb); /* undoes create_lkb() */
3665 } else
3666 _request_lock(r, lkb);
e7fd4179
DT
3667 break;
3668
3669 default:
ef0c2bb0
DT
3670 log_error(ls, "receive_request_reply %x error %d",
3671 lkb->lkb_id, result);
e7fd4179
DT
3672 }
3673
ef0c2bb0
DT
3674 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3675 log_debug(ls, "receive_request_reply %x result %d unlock",
3676 lkb->lkb_id, result);
3677 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3678 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3679 send_unlock(r, lkb);
3680 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3681 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3682 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3683 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3684 send_cancel(r, lkb);
3685 } else {
3686 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3687 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3688 }
3689 out:
e7fd4179
DT
3690 unlock_rsb(r);
3691 put_rsb(r);
b3f58d8f 3692 dlm_put_lkb(lkb);
e7fd4179
DT
3693}
3694
3695static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3696 struct dlm_message *ms)
3697{
e7fd4179 3698 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3699 switch (ms->m_result) {
e7fd4179
DT
3700 case -EAGAIN:
3701 /* convert would block (be queued) on remote master */
3702 queue_cast(r, lkb, -EAGAIN);
3703 break;
3704
c85d65e9
DT
3705 case -EDEADLK:
3706 receive_flags_reply(lkb, ms);
3707 revert_lock_pc(r, lkb);
3708 queue_cast(r, lkb, -EDEADLK);
3709 break;
3710
e7fd4179
DT
3711 case -EINPROGRESS:
3712 /* convert was queued on remote master */
7d3c1feb
DT
3713 receive_flags_reply(lkb, ms);
3714 if (is_demoted(lkb))
2a7ce0ed 3715 munge_demoted(lkb);
e7fd4179
DT
3716 del_lkb(r, lkb);
3717 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 3718 add_timeout(lkb);
e7fd4179
DT
3719 break;
3720
3721 case 0:
3722 /* convert was granted on remote master */
3723 receive_flags_reply(lkb, ms);
7d3c1feb 3724 if (is_demoted(lkb))
2a7ce0ed 3725 munge_demoted(lkb);
e7fd4179
DT
3726 grant_lock_pc(r, lkb, ms);
3727 queue_cast(r, lkb, 0);
3728 break;
3729
3730 default:
ef0c2bb0
DT
3731 log_error(r->res_ls, "receive_convert_reply %x error %d",
3732 lkb->lkb_id, ms->m_result);
e7fd4179
DT
3733 }
3734}
3735
3736static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3737{
3738 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3739 int error;
e7fd4179
DT
3740
3741 hold_rsb(r);
3742 lock_rsb(r);
3743
c54e04b0
DT
3744 error = validate_message(lkb, ms);
3745 if (error)
3746 goto out;
3747
ef0c2bb0
DT
3748 /* stub reply can happen with waiters_mutex held */
3749 error = remove_from_waiters_ms(lkb, ms);
3750 if (error)
3751 goto out;
e7fd4179 3752
ef0c2bb0
DT
3753 __receive_convert_reply(r, lkb, ms);
3754 out:
e7fd4179
DT
3755 unlock_rsb(r);
3756 put_rsb(r);
3757}
3758
3759static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3760{
3761 struct dlm_lkb *lkb;
3762 int error;
3763
3764 error = find_lkb(ls, ms->m_remid, &lkb);
3765 if (error) {
c54e04b0
DT
3766 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3767 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3768 return;
3769 }
e7fd4179 3770
e7fd4179 3771 _receive_convert_reply(lkb, ms);
b3f58d8f 3772 dlm_put_lkb(lkb);
e7fd4179
DT
3773}
3774
3775static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3776{
3777 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3778 int error;
e7fd4179
DT
3779
3780 hold_rsb(r);
3781 lock_rsb(r);
3782
c54e04b0
DT
3783 error = validate_message(lkb, ms);
3784 if (error)
3785 goto out;
3786
ef0c2bb0
DT
3787 /* stub reply can happen with waiters_mutex held */
3788 error = remove_from_waiters_ms(lkb, ms);
3789 if (error)
3790 goto out;
3791
e7fd4179
DT
3792 /* this is the value returned from do_unlock() on the master */
3793
ef0c2bb0 3794 switch (ms->m_result) {
e7fd4179
DT
3795 case -DLM_EUNLOCK:
3796 receive_flags_reply(lkb, ms);
3797 remove_lock_pc(r, lkb);
3798 queue_cast(r, lkb, -DLM_EUNLOCK);
3799 break;
ef0c2bb0
DT
3800 case -ENOENT:
3801 break;
e7fd4179 3802 default:
ef0c2bb0
DT
3803 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3804 lkb->lkb_id, ms->m_result);
e7fd4179 3805 }
ef0c2bb0 3806 out:
e7fd4179
DT
3807 unlock_rsb(r);
3808 put_rsb(r);
3809}
3810
3811static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3812{
3813 struct dlm_lkb *lkb;
3814 int error;
3815
3816 error = find_lkb(ls, ms->m_remid, &lkb);
3817 if (error) {
c54e04b0
DT
3818 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3819 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3820 return;
3821 }
e7fd4179 3822
e7fd4179 3823 _receive_unlock_reply(lkb, ms);
b3f58d8f 3824 dlm_put_lkb(lkb);
e7fd4179
DT
3825}
3826
3827static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3828{
3829 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3830 int error;
e7fd4179
DT
3831
3832 hold_rsb(r);
3833 lock_rsb(r);
3834
c54e04b0
DT
3835 error = validate_message(lkb, ms);
3836 if (error)
3837 goto out;
3838
ef0c2bb0
DT
3839 /* stub reply can happen with waiters_mutex held */
3840 error = remove_from_waiters_ms(lkb, ms);
3841 if (error)
3842 goto out;
3843
e7fd4179
DT
3844 /* this is the value returned from do_cancel() on the master */
3845
ef0c2bb0 3846 switch (ms->m_result) {
e7fd4179
DT
3847 case -DLM_ECANCEL:
3848 receive_flags_reply(lkb, ms);
3849 revert_lock_pc(r, lkb);
84d8cd69 3850 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
3851 break;
3852 case 0:
e7fd4179
DT
3853 break;
3854 default:
ef0c2bb0
DT
3855 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3856 lkb->lkb_id, ms->m_result);
e7fd4179 3857 }
ef0c2bb0 3858 out:
e7fd4179
DT
3859 unlock_rsb(r);
3860 put_rsb(r);
3861}
3862
3863static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3864{
3865 struct dlm_lkb *lkb;
3866 int error;
3867
3868 error = find_lkb(ls, ms->m_remid, &lkb);
3869 if (error) {
c54e04b0
DT
3870 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3871 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3872 return;
3873 }
e7fd4179 3874
e7fd4179 3875 _receive_cancel_reply(lkb, ms);
b3f58d8f 3876 dlm_put_lkb(lkb);
e7fd4179
DT
3877}
3878
3879static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3880{
3881 struct dlm_lkb *lkb;
3882 struct dlm_rsb *r;
3883 int error, ret_nodeid;
3884
3885 error = find_lkb(ls, ms->m_lkid, &lkb);
3886 if (error) {
3887 log_error(ls, "receive_lookup_reply no lkb");
3888 return;
3889 }
3890
ef0c2bb0 3891 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3892 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3893
3894 r = lkb->lkb_resource;
3895 hold_rsb(r);
3896 lock_rsb(r);
3897
ef0c2bb0
DT
3898 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3899 if (error)
3900 goto out;
3901
e7fd4179
DT
3902 ret_nodeid = ms->m_nodeid;
3903 if (ret_nodeid == dlm_our_nodeid()) {
3904 r->res_nodeid = 0;
3905 ret_nodeid = 0;
3906 r->res_first_lkid = 0;
3907 } else {
3908 /* set_master() will copy res_nodeid to lkb_nodeid */
3909 r->res_nodeid = ret_nodeid;
3910 }
3911
ef0c2bb0
DT
3912 if (is_overlap(lkb)) {
3913 log_debug(ls, "receive_lookup_reply %x unlock %x",
3914 lkb->lkb_id, lkb->lkb_flags);
3915 queue_cast_overlap(r, lkb);
3916 unhold_lkb(lkb); /* undoes create_lkb() */
3917 goto out_list;
3918 }
3919
e7fd4179
DT
3920 _request_lock(r, lkb);
3921
ef0c2bb0 3922 out_list:
e7fd4179
DT
3923 if (!ret_nodeid)
3924 process_lookup_list(r);
ef0c2bb0 3925 out:
e7fd4179
DT
3926 unlock_rsb(r);
3927 put_rsb(r);
b3f58d8f 3928 dlm_put_lkb(lkb);
e7fd4179
DT
3929}
3930
c36258b5 3931static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179 3932{
46b43eed
DT
3933 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3934 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3935 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3936 ms->m_remid, ms->m_result);
3937 return;
3938 }
3939
e7fd4179
DT
3940 switch (ms->m_type) {
3941
3942 /* messages sent to a master node */
3943
3944 case DLM_MSG_REQUEST:
3945 receive_request(ls, ms);
3946 break;
3947
3948 case DLM_MSG_CONVERT:
3949 receive_convert(ls, ms);
3950 break;
3951
3952 case DLM_MSG_UNLOCK:
3953 receive_unlock(ls, ms);
3954 break;
3955
3956 case DLM_MSG_CANCEL:
3957 receive_cancel(ls, ms);
3958 break;
3959
3960 /* messages sent from a master node (replies to above) */
3961
3962 case DLM_MSG_REQUEST_REPLY:
3963 receive_request_reply(ls, ms);
3964 break;
3965
3966 case DLM_MSG_CONVERT_REPLY:
3967 receive_convert_reply(ls, ms);
3968 break;
3969
3970 case DLM_MSG_UNLOCK_REPLY:
3971 receive_unlock_reply(ls, ms);
3972 break;
3973
3974 case DLM_MSG_CANCEL_REPLY:
3975 receive_cancel_reply(ls, ms);
3976 break;
3977
3978 /* messages sent from a master node (only two types of async msg) */
3979
3980 case DLM_MSG_GRANT:
3981 receive_grant(ls, ms);
3982 break;
3983
3984 case DLM_MSG_BAST:
3985 receive_bast(ls, ms);
3986 break;
3987
3988 /* messages sent to a dir node */
3989
3990 case DLM_MSG_LOOKUP:
3991 receive_lookup(ls, ms);
3992 break;
3993
3994 case DLM_MSG_REMOVE:
3995 receive_remove(ls, ms);
3996 break;
3997
3998 /* messages sent from a dir node (remove has no reply) */
3999
4000 case DLM_MSG_LOOKUP_REPLY:
4001 receive_lookup_reply(ls, ms);
4002 break;
4003
8499137d
DT
4004 /* other messages */
4005
4006 case DLM_MSG_PURGE:
4007 receive_purge(ls, ms);
4008 break;
4009
e7fd4179
DT
4010 default:
4011 log_error(ls, "unknown message type %d", ms->m_type);
4012 }
4013
e7fd4179 4014 dlm_astd_wake();
e7fd4179
DT
4015}
4016
c36258b5
DT
4017/* If the lockspace is in recovery mode (locking stopped), then normal
4018 messages are saved on the requestqueue for processing after recovery is
4019 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4020 messages off the requestqueue before we process new ones. This occurs right
4021 after recovery completes when we transition from saving all messages on
4022 requestqueue, to processing all the saved messages, to processing new
4023 messages as they arrive. */
e7fd4179 4024
c36258b5
DT
4025static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4026 int nodeid)
4027{
4028 if (dlm_locking_stopped(ls)) {
8b0d8e03 4029 dlm_add_requestqueue(ls, nodeid, ms);
c36258b5
DT
4030 } else {
4031 dlm_wait_requestqueue(ls);
4032 _receive_message(ls, ms);
4033 }
4034}
4035
4036/* This is called by dlm_recoverd to process messages that were saved on
4037 the requestqueue. */
4038
4039void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4040{
4041 _receive_message(ls, ms);
4042}
4043
4044/* This is called by the midcomms layer when something is received for
4045 the lockspace. It could be either a MSG (normal message sent as part of
4046 standard locking activity) or an RCOM (recovery message sent as part of
4047 lockspace recovery). */
4048
eef7d739 4049void dlm_receive_buffer(union dlm_packet *p, int nodeid)
c36258b5 4050{
eef7d739 4051 struct dlm_header *hd = &p->header;
c36258b5
DT
4052 struct dlm_ls *ls;
4053 int type = 0;
4054
4055 switch (hd->h_cmd) {
4056 case DLM_MSG:
eef7d739
AV
4057 dlm_message_in(&p->message);
4058 type = p->message.m_type;
c36258b5
DT
4059 break;
4060 case DLM_RCOM:
eef7d739
AV
4061 dlm_rcom_in(&p->rcom);
4062 type = p->rcom.rc_type;
c36258b5
DT
4063 break;
4064 default:
4065 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4066 return;
4067 }
4068
4069 if (hd->h_nodeid != nodeid) {
4070 log_print("invalid h_nodeid %d from %d lockspace %x",
4071 hd->h_nodeid, nodeid, hd->h_lockspace);
4072 return;
4073 }
4074
4075 ls = dlm_find_lockspace_global(hd->h_lockspace);
4076 if (!ls) {
594199eb
DT
4077 if (dlm_config.ci_log_debug)
4078 log_print("invalid lockspace %x from %d cmd %d type %d",
4079 hd->h_lockspace, nodeid, hd->h_cmd, type);
c36258b5
DT
4080
4081 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
eef7d739 4082 dlm_send_ls_not_ready(nodeid, &p->rcom);
c36258b5
DT
4083 return;
4084 }
4085
4086 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4087 be inactive (in this ls) before transitioning to recovery mode */
4088
4089 down_read(&ls->ls_recv_active);
4090 if (hd->h_cmd == DLM_MSG)
eef7d739 4091 dlm_receive_message(ls, &p->message, nodeid);
c36258b5 4092 else
eef7d739 4093 dlm_receive_rcom(ls, &p->rcom, nodeid);
c36258b5
DT
4094 up_read(&ls->ls_recv_active);
4095
4096 dlm_put_lockspace(ls);
4097}
e7fd4179 4098
2a7ce0ed
DT
4099static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4100 struct dlm_message *ms_stub)
e7fd4179
DT
4101{
4102 if (middle_conversion(lkb)) {
4103 hold_lkb(lkb);
2a7ce0ed
DT
4104 memset(ms_stub, 0, sizeof(struct dlm_message));
4105 ms_stub->m_flags = DLM_IFL_STUB_MS;
4106 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4107 ms_stub->m_result = -EINPROGRESS;
4108 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4109 _receive_convert_reply(lkb, ms_stub);
e7fd4179
DT
4110
4111 /* Same special case as in receive_rcom_lock_args() */
4112 lkb->lkb_grmode = DLM_LOCK_IV;
4113 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4114 unhold_lkb(lkb);
4115
4116 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4117 lkb->lkb_flags |= DLM_IFL_RESEND;
4118 }
4119
4120 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4121 conversions are async; there's no reply from the remote master */
4122}
4123
4124/* A waiting lkb needs recovery if the master node has failed, or
4125 the master node is changing (only when no directory is used) */
4126
4127static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4128{
4129 if (dlm_is_removed(ls, lkb->lkb_nodeid))
4130 return 1;
4131
4132 if (!dlm_no_directory(ls))
4133 return 0;
4134
4135 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4136 return 1;
4137
4138 return 0;
4139}
4140
4141/* Recovery for locks that are waiting for replies from nodes that are now
4142 gone. We can just complete unlocks and cancels by faking a reply from the
4143 dead node. Requests and up-conversions we flag to be resent after
4144 recovery. Down-conversions can just be completed with a fake reply like
4145 unlocks. Conversions between PR and CW need special attention. */
4146
4147void dlm_recover_waiters_pre(struct dlm_ls *ls)
4148{
4149 struct dlm_lkb *lkb, *safe;
2a7ce0ed 4150 struct dlm_message *ms_stub;
601342ce 4151 int wait_type, stub_unlock_result, stub_cancel_result;
e7fd4179 4152
a22ca480 4153 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
2a7ce0ed
DT
4154 if (!ms_stub) {
4155 log_error(ls, "dlm_recover_waiters_pre no mem");
4156 return;
4157 }
4158
90135925 4159 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4160
4161 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
2a7ce0ed
DT
4162
4163 /* exclude debug messages about unlocks because there can be so
4164 many and they aren't very interesting */
4165
4166 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4167 log_debug(ls, "recover_waiter %x nodeid %d "
4168 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4169 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4170 }
e7fd4179
DT
4171
4172 /* all outstanding lookups, regardless of destination will be
4173 resent after recovery is done */
4174
4175 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4176 lkb->lkb_flags |= DLM_IFL_RESEND;
4177 continue;
4178 }
4179
4180 if (!waiter_needs_recovery(ls, lkb))
4181 continue;
4182
601342ce
DT
4183 wait_type = lkb->lkb_wait_type;
4184 stub_unlock_result = -DLM_EUNLOCK;
4185 stub_cancel_result = -DLM_ECANCEL;
4186
4187 /* Main reply may have been received leaving a zero wait_type,
4188 but a reply for the overlapping op may not have been
4189 received. In that case we need to fake the appropriate
4190 reply for the overlap op. */
4191
4192 if (!wait_type) {
4193 if (is_overlap_cancel(lkb)) {
4194 wait_type = DLM_MSG_CANCEL;
4195 if (lkb->lkb_grmode == DLM_LOCK_IV)
4196 stub_cancel_result = 0;
4197 }
4198 if (is_overlap_unlock(lkb)) {
4199 wait_type = DLM_MSG_UNLOCK;
4200 if (lkb->lkb_grmode == DLM_LOCK_IV)
4201 stub_unlock_result = -ENOENT;
4202 }
4203
4204 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4205 lkb->lkb_id, lkb->lkb_flags, wait_type,
4206 stub_cancel_result, stub_unlock_result);
4207 }
4208
4209 switch (wait_type) {
e7fd4179
DT
4210
4211 case DLM_MSG_REQUEST:
4212 lkb->lkb_flags |= DLM_IFL_RESEND;
4213 break;
4214
4215 case DLM_MSG_CONVERT:
2a7ce0ed 4216 recover_convert_waiter(ls, lkb, ms_stub);
e7fd4179
DT
4217 break;
4218
4219 case DLM_MSG_UNLOCK:
4220 hold_lkb(lkb);
2a7ce0ed
DT
4221 memset(ms_stub, 0, sizeof(struct dlm_message));
4222 ms_stub->m_flags = DLM_IFL_STUB_MS;
4223 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4224 ms_stub->m_result = stub_unlock_result;
4225 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4226 _receive_unlock_reply(lkb, ms_stub);
b3f58d8f 4227 dlm_put_lkb(lkb);
e7fd4179
DT
4228 break;
4229
4230 case DLM_MSG_CANCEL:
4231 hold_lkb(lkb);
2a7ce0ed
DT
4232 memset(ms_stub, 0, sizeof(struct dlm_message));
4233 ms_stub->m_flags = DLM_IFL_STUB_MS;
4234 ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4235 ms_stub->m_result = stub_cancel_result;
4236 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4237 _receive_cancel_reply(lkb, ms_stub);
b3f58d8f 4238 dlm_put_lkb(lkb);
e7fd4179
DT
4239 break;
4240
4241 default:
601342ce
DT
4242 log_error(ls, "invalid lkb wait_type %d %d",
4243 lkb->lkb_wait_type, wait_type);
e7fd4179 4244 }
81456807 4245 schedule();
e7fd4179 4246 }
90135925 4247 mutex_unlock(&ls->ls_waiters_mutex);
2a7ce0ed 4248 kfree(ms_stub);
e7fd4179
DT
4249}
4250
ef0c2bb0 4251static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
4252{
4253 struct dlm_lkb *lkb;
ef0c2bb0 4254 int found = 0;
e7fd4179 4255
90135925 4256 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4257 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4258 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
4259 hold_lkb(lkb);
4260 found = 1;
e7fd4179
DT
4261 break;
4262 }
4263 }
90135925 4264 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 4265
ef0c2bb0 4266 if (!found)
e7fd4179 4267 lkb = NULL;
ef0c2bb0 4268 return lkb;
e7fd4179
DT
4269}
4270
4271/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4272 master or dir-node for r. Processing the lkb may result in it being placed
4273 back on waiters. */
4274
ef0c2bb0
DT
4275/* We do this after normal locking has been enabled and any saved messages
4276 (in requestqueue) have been processed. We should be confident that at
4277 this point we won't get or process a reply to any of these waiting
4278 operations. But, new ops may be coming in on the rsbs/locks here from
4279 userspace or remotely. */
4280
4281/* there may have been an overlap unlock/cancel prior to recovery or after
4282 recovery. if before, the lkb may still have a pos wait_count; if after, the
4283 overlap flag would just have been set and nothing new sent. we can be
4284 confident here than any replies to either the initial op or overlap ops
4285 prior to recovery have been received. */
4286
e7fd4179
DT
4287int dlm_recover_waiters_post(struct dlm_ls *ls)
4288{
4289 struct dlm_lkb *lkb;
4290 struct dlm_rsb *r;
ef0c2bb0 4291 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
4292
4293 while (1) {
4294 if (dlm_locking_stopped(ls)) {
4295 log_debug(ls, "recover_waiters_post aborted");
4296 error = -EINTR;
4297 break;
4298 }
4299
ef0c2bb0
DT
4300 lkb = find_resend_waiter(ls);
4301 if (!lkb)
e7fd4179
DT
4302 break;
4303
4304 r = lkb->lkb_resource;
ef0c2bb0
DT
4305 hold_rsb(r);
4306 lock_rsb(r);
4307
4308 mstype = lkb->lkb_wait_type;
4309 oc = is_overlap_cancel(lkb);
4310 ou = is_overlap_unlock(lkb);
4311 err = 0;
e7fd4179 4312
2a7ce0ed
DT
4313 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4314 lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
e7fd4179 4315
ef0c2bb0
DT
4316 /* At this point we assume that we won't get a reply to any
4317 previous op or overlap op on this lock. First, do a big
4318 remove_from_waiters() for all previous ops. */
4319
4320 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4321 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4322 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4323 lkb->lkb_wait_type = 0;
4324 lkb->lkb_wait_count = 0;
4325 mutex_lock(&ls->ls_waiters_mutex);
4326 list_del_init(&lkb->lkb_wait_reply);
4327 mutex_unlock(&ls->ls_waiters_mutex);
4328 unhold_lkb(lkb); /* for waiters list */
4329
4330 if (oc || ou) {
4331 /* do an unlock or cancel instead of resending */
4332 switch (mstype) {
4333 case DLM_MSG_LOOKUP:
4334 case DLM_MSG_REQUEST:
4335 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4336 -DLM_ECANCEL);
4337 unhold_lkb(lkb); /* undoes create_lkb() */
4338 break;
4339 case DLM_MSG_CONVERT:
4340 if (oc) {
4341 queue_cast(r, lkb, -DLM_ECANCEL);
4342 } else {
4343 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4344 _unlock_lock(r, lkb);
4345 }
4346 break;
4347 default:
4348 err = 1;
4349 }
4350 } else {
4351 switch (mstype) {
4352 case DLM_MSG_LOOKUP:
4353 case DLM_MSG_REQUEST:
4354 _request_lock(r, lkb);
4355 if (is_master(r))
4356 confirm_master(r, 0);
4357 break;
4358 case DLM_MSG_CONVERT:
4359 _convert_lock(r, lkb);
4360 break;
4361 default:
4362 err = 1;
4363 }
e7fd4179 4364 }
ef0c2bb0
DT
4365
4366 if (err)
4367 log_error(ls, "recover_waiters_post %x %d %x %d %d",
4368 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4369 unlock_rsb(r);
4370 put_rsb(r);
4371 dlm_put_lkb(lkb);
e7fd4179
DT
4372 }
4373
4374 return error;
4375}
4376
4377static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4378 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4379{
4380 struct dlm_ls *ls = r->res_ls;
4381 struct dlm_lkb *lkb, *safe;
4382
4383 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4384 if (test(ls, lkb)) {
97a35d1e 4385 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
4386 del_lkb(r, lkb);
4387 /* this put should free the lkb */
b3f58d8f 4388 if (!dlm_put_lkb(lkb))
e7fd4179
DT
4389 log_error(ls, "purged lkb not released");
4390 }
4391 }
4392}
4393
4394static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4395{
4396 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4397}
4398
4399static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4400{
4401 return is_master_copy(lkb);
4402}
4403
4404static void purge_dead_locks(struct dlm_rsb *r)
4405{
4406 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4407 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4408 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4409}
4410
4411void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4412{
4413 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4414 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4415 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4416}
4417
4418/* Get rid of locks held by nodes that are gone. */
4419
4420int dlm_purge_locks(struct dlm_ls *ls)
4421{
4422 struct dlm_rsb *r;
4423
4424 log_debug(ls, "dlm_purge_locks");
4425
4426 down_write(&ls->ls_root_sem);
4427 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4428 hold_rsb(r);
4429 lock_rsb(r);
4430 if (is_master(r))
4431 purge_dead_locks(r);
4432 unlock_rsb(r);
4433 unhold_rsb(r);
4434
4435 schedule();
4436 }
4437 up_write(&ls->ls_root_sem);
4438
4439 return 0;
4440}
4441
97a35d1e
DT
4442static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4443{
4444 struct dlm_rsb *r, *r_ret = NULL;
4445
c7be761a 4446 spin_lock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4447 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4448 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4449 continue;
4450 hold_rsb(r);
4451 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4452 r_ret = r;
4453 break;
4454 }
c7be761a 4455 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4456 return r_ret;
4457}
4458
4459void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
4460{
4461 struct dlm_rsb *r;
2b4e926a 4462 int bucket = 0;
e7fd4179 4463
2b4e926a
DT
4464 while (1) {
4465 r = find_purged_rsb(ls, bucket);
4466 if (!r) {
4467 if (bucket == ls->ls_rsbtbl_size - 1)
4468 break;
4469 bucket++;
97a35d1e 4470 continue;
2b4e926a 4471 }
97a35d1e
DT
4472 lock_rsb(r);
4473 if (is_master(r)) {
4474 grant_pending_locks(r);
4475 confirm_master(r, 0);
e7fd4179 4476 }
97a35d1e
DT
4477 unlock_rsb(r);
4478 put_rsb(r);
2b4e926a 4479 schedule();
e7fd4179 4480 }
e7fd4179
DT
4481}
4482
4483static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4484 uint32_t remid)
4485{
4486 struct dlm_lkb *lkb;
4487
4488 list_for_each_entry(lkb, head, lkb_statequeue) {
4489 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4490 return lkb;
4491 }
4492 return NULL;
4493}
4494
4495static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4496 uint32_t remid)
4497{
4498 struct dlm_lkb *lkb;
4499
4500 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4501 if (lkb)
4502 return lkb;
4503 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4504 if (lkb)
4505 return lkb;
4506 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4507 if (lkb)
4508 return lkb;
4509 return NULL;
4510}
4511
ae773d0b 4512/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4513static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4514 struct dlm_rsb *r, struct dlm_rcom *rc)
4515{
4516 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
e7fd4179
DT
4517
4518 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
163a1859
AV
4519 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4520 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4521 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4522 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
e7fd4179 4523 lkb->lkb_flags |= DLM_IFL_MSTCPY;
163a1859 4524 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
e7fd4179
DT
4525 lkb->lkb_rqmode = rl->rl_rqmode;
4526 lkb->lkb_grmode = rl->rl_grmode;
4527 /* don't set lkb_status because add_lkb wants to itself */
4528
8304d6f2
DT
4529 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4530 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 4531
e7fd4179 4532 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
a5dd0631
AV
4533 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4534 sizeof(struct rcom_lock);
4535 if (lvblen > ls->ls_lvblen)
4536 return -EINVAL;
52bda2b5 4537 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
4538 if (!lkb->lkb_lvbptr)
4539 return -ENOMEM;
e7fd4179
DT
4540 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4541 }
4542
4543 /* Conversions between PR and CW (middle modes) need special handling.
4544 The real granted mode of these converting locks cannot be determined
4545 until all locks have been rebuilt on the rsb (recover_conversion) */
4546
163a1859
AV
4547 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4548 middle_conversion(lkb)) {
e7fd4179
DT
4549 rl->rl_status = DLM_LKSTS_CONVERT;
4550 lkb->lkb_grmode = DLM_LOCK_IV;
4551 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4552 }
4553
4554 return 0;
4555}
4556
4557/* This lkb may have been recovered in a previous aborted recovery so we need
4558 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4559 If so we just send back a standard reply. If not, we create a new lkb with
4560 the given values and send back our lkid. We send back our lkid by sending
4561 back the rcom_lock struct we got but with the remid field filled in. */
4562
ae773d0b 4563/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4564int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4565{
4566 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4567 struct dlm_rsb *r;
4568 struct dlm_lkb *lkb;
4569 int error;
4570
4571 if (rl->rl_parent_lkid) {
4572 error = -EOPNOTSUPP;
4573 goto out;
4574 }
4575
163a1859
AV
4576 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4577 R_MASTER, &r);
e7fd4179
DT
4578 if (error)
4579 goto out;
4580
4581 lock_rsb(r);
4582
163a1859 4583 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4584 if (lkb) {
4585 error = -EEXIST;
4586 goto out_remid;
4587 }
4588
4589 error = create_lkb(ls, &lkb);
4590 if (error)
4591 goto out_unlock;
4592
4593 error = receive_rcom_lock_args(ls, lkb, r, rc);
4594 if (error) {
b3f58d8f 4595 __put_lkb(ls, lkb);
e7fd4179
DT
4596 goto out_unlock;
4597 }
4598
4599 attach_lkb(r, lkb);
4600 add_lkb(r, lkb, rl->rl_status);
4601 error = 0;
4602
4603 out_remid:
4604 /* this is the new value returned to the lock holder for
4605 saving in its process-copy lkb */
163a1859 4606 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
e7fd4179
DT
4607
4608 out_unlock:
4609 unlock_rsb(r);
4610 put_rsb(r);
4611 out:
4612 if (error)
163a1859
AV
4613 log_debug(ls, "recover_master_copy %d %x", error,
4614 le32_to_cpu(rl->rl_lkid));
4615 rl->rl_result = cpu_to_le32(error);
e7fd4179
DT
4616 return error;
4617}
4618
ae773d0b 4619/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4620int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4621{
4622 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4623 struct dlm_rsb *r;
4624 struct dlm_lkb *lkb;
4625 int error;
4626
163a1859 4627 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
e7fd4179 4628 if (error) {
163a1859
AV
4629 log_error(ls, "recover_process_copy no lkid %x",
4630 le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4631 return error;
4632 }
4633
4634 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4635
163a1859 4636 error = le32_to_cpu(rl->rl_result);
e7fd4179
DT
4637
4638 r = lkb->lkb_resource;
4639 hold_rsb(r);
4640 lock_rsb(r);
4641
4642 switch (error) {
dc200a88
DT
4643 case -EBADR:
4644 /* There's a chance the new master received our lock before
4645 dlm_recover_master_reply(), this wouldn't happen if we did
4646 a barrier between recover_masters and recover_locks. */
4647 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4648 (unsigned long)r, r->res_name);
4649 dlm_send_rcom_lock(r, lkb);
4650 goto out;
e7fd4179
DT
4651 case -EEXIST:
4652 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4653 /* fall through */
4654 case 0:
163a1859 4655 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
e7fd4179
DT
4656 break;
4657 default:
4658 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4659 error, lkb->lkb_id);
4660 }
4661
4662 /* an ack for dlm_recover_locks() which waits for replies from
4663 all the locks it sends to new masters */
4664 dlm_recovered_lock(r);
dc200a88 4665 out:
e7fd4179
DT
4666 unlock_rsb(r);
4667 put_rsb(r);
b3f58d8f 4668 dlm_put_lkb(lkb);
e7fd4179
DT
4669
4670 return 0;
4671}
4672
597d0cae
DT
4673int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4674 int mode, uint32_t flags, void *name, unsigned int namelen,
d7db923e 4675 unsigned long timeout_cs)
597d0cae
DT
4676{
4677 struct dlm_lkb *lkb;
4678 struct dlm_args args;
4679 int error;
4680
85e86edf 4681 dlm_lock_recovery(ls);
597d0cae
DT
4682
4683 error = create_lkb(ls, &lkb);
4684 if (error) {
4685 kfree(ua);
4686 goto out;
4687 }
4688
4689 if (flags & DLM_LKF_VALBLK) {
573c24c4 4690 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4691 if (!ua->lksb.sb_lvbptr) {
4692 kfree(ua);
4693 __put_lkb(ls, lkb);
4694 error = -ENOMEM;
4695 goto out;
4696 }
4697 }
4698
52bda2b5 4699 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
597d0cae
DT
4700 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4701 lock and that lkb_astparam is the dlm_user_args structure. */
4702
d7db923e 4703 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
e5dae548 4704 fake_astfn, ua, fake_bastfn, &args);
597d0cae 4705 lkb->lkb_flags |= DLM_IFL_USER;
597d0cae
DT
4706
4707 if (error) {
4708 __put_lkb(ls, lkb);
4709 goto out;
4710 }
4711
4712 error = request_lock(ls, lkb, name, namelen, &args);
4713
4714 switch (error) {
4715 case 0:
4716 break;
4717 case -EINPROGRESS:
4718 error = 0;
4719 break;
4720 case -EAGAIN:
4721 error = 0;
4722 /* fall through */
4723 default:
4724 __put_lkb(ls, lkb);
4725 goto out;
4726 }
4727
4728 /* add this new lkb to the per-process list of locks */
4729 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4730 hold_lkb(lkb);
597d0cae
DT
4731 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4732 spin_unlock(&ua->proc->locks_spin);
4733 out:
85e86edf 4734 dlm_unlock_recovery(ls);
597d0cae
DT
4735 return error;
4736}
4737
4738int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
d7db923e
DT
4739 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4740 unsigned long timeout_cs)
597d0cae
DT
4741{
4742 struct dlm_lkb *lkb;
4743 struct dlm_args args;
4744 struct dlm_user_args *ua;
4745 int error;
4746
85e86edf 4747 dlm_lock_recovery(ls);
597d0cae
DT
4748
4749 error = find_lkb(ls, lkid, &lkb);
4750 if (error)
4751 goto out;
4752
4753 /* user can change the params on its lock when it converts it, or
4754 add an lvb that didn't exist before */
4755
d292c0cc 4756 ua = lkb->lkb_ua;
597d0cae
DT
4757
4758 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
573c24c4 4759 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4760 if (!ua->lksb.sb_lvbptr) {
4761 error = -ENOMEM;
4762 goto out_put;
4763 }
4764 }
4765 if (lvb_in && ua->lksb.sb_lvbptr)
4766 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4767
d7db923e 4768 ua->xid = ua_tmp->xid;
597d0cae
DT
4769 ua->castparam = ua_tmp->castparam;
4770 ua->castaddr = ua_tmp->castaddr;
4771 ua->bastparam = ua_tmp->bastparam;
4772 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4773 ua->user_lksb = ua_tmp->user_lksb;
597d0cae 4774
d7db923e 4775 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
e5dae548 4776 fake_astfn, ua, fake_bastfn, &args);
597d0cae
DT
4777 if (error)
4778 goto out_put;
4779
4780 error = convert_lock(ls, lkb, &args);
4781
c85d65e9 4782 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
597d0cae
DT
4783 error = 0;
4784 out_put:
4785 dlm_put_lkb(lkb);
4786 out:
85e86edf 4787 dlm_unlock_recovery(ls);
597d0cae
DT
4788 kfree(ua_tmp);
4789 return error;
4790}
4791
4792int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4793 uint32_t flags, uint32_t lkid, char *lvb_in)
4794{
4795 struct dlm_lkb *lkb;
4796 struct dlm_args args;
4797 struct dlm_user_args *ua;
4798 int error;
4799
85e86edf 4800 dlm_lock_recovery(ls);
597d0cae
DT
4801
4802 error = find_lkb(ls, lkid, &lkb);
4803 if (error)
4804 goto out;
4805
d292c0cc 4806 ua = lkb->lkb_ua;
597d0cae
DT
4807
4808 if (lvb_in && ua->lksb.sb_lvbptr)
4809 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
b434eda6
PC
4810 if (ua_tmp->castparam)
4811 ua->castparam = ua_tmp->castparam;
cc346d55 4812 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4813
4814 error = set_unlock_args(flags, ua, &args);
4815 if (error)
4816 goto out_put;
4817
4818 error = unlock_lock(ls, lkb, &args);
4819
4820 if (error == -DLM_EUNLOCK)
4821 error = 0;
ef0c2bb0
DT
4822 /* from validate_unlock_args() */
4823 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4824 error = 0;
597d0cae
DT
4825 if (error)
4826 goto out_put;
4827
4828 spin_lock(&ua->proc->locks_spin);
a1bc86e6
DT
4829 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4830 if (!list_empty(&lkb->lkb_ownqueue))
4831 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4832 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4833 out_put:
4834 dlm_put_lkb(lkb);
4835 out:
85e86edf 4836 dlm_unlock_recovery(ls);
ef0c2bb0 4837 kfree(ua_tmp);
597d0cae
DT
4838 return error;
4839}
4840
4841int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4842 uint32_t flags, uint32_t lkid)
4843{
4844 struct dlm_lkb *lkb;
4845 struct dlm_args args;
4846 struct dlm_user_args *ua;
4847 int error;
4848
85e86edf 4849 dlm_lock_recovery(ls);
597d0cae
DT
4850
4851 error = find_lkb(ls, lkid, &lkb);
4852 if (error)
4853 goto out;
4854
d292c0cc 4855 ua = lkb->lkb_ua;
b434eda6
PC
4856 if (ua_tmp->castparam)
4857 ua->castparam = ua_tmp->castparam;
c059f70e 4858 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4859
4860 error = set_unlock_args(flags, ua, &args);
4861 if (error)
4862 goto out_put;
4863
4864 error = cancel_lock(ls, lkb, &args);
4865
4866 if (error == -DLM_ECANCEL)
4867 error = 0;
ef0c2bb0
DT
4868 /* from validate_unlock_args() */
4869 if (error == -EBUSY)
4870 error = 0;
597d0cae
DT
4871 out_put:
4872 dlm_put_lkb(lkb);
4873 out:
85e86edf 4874 dlm_unlock_recovery(ls);
ef0c2bb0 4875 kfree(ua_tmp);
597d0cae
DT
4876 return error;
4877}
4878
8b4021fa
DT
4879int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4880{
4881 struct dlm_lkb *lkb;
4882 struct dlm_args args;
4883 struct dlm_user_args *ua;
4884 struct dlm_rsb *r;
4885 int error;
4886
4887 dlm_lock_recovery(ls);
4888
4889 error = find_lkb(ls, lkid, &lkb);
4890 if (error)
4891 goto out;
4892
d292c0cc 4893 ua = lkb->lkb_ua;
8b4021fa
DT
4894
4895 error = set_unlock_args(flags, ua, &args);
4896 if (error)
4897 goto out_put;
4898
4899 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4900
4901 r = lkb->lkb_resource;
4902 hold_rsb(r);
4903 lock_rsb(r);
4904
4905 error = validate_unlock_args(lkb, &args);
4906 if (error)
4907 goto out_r;
4908 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4909
4910 error = _cancel_lock(r, lkb);
4911 out_r:
4912 unlock_rsb(r);
4913 put_rsb(r);
4914
4915 if (error == -DLM_ECANCEL)
4916 error = 0;
4917 /* from validate_unlock_args() */
4918 if (error == -EBUSY)
4919 error = 0;
4920 out_put:
4921 dlm_put_lkb(lkb);
4922 out:
4923 dlm_unlock_recovery(ls);
4924 return error;
4925}
4926
ef0c2bb0
DT
4927/* lkb's that are removed from the waiters list by revert are just left on the
4928 orphans list with the granted orphan locks, to be freed by purge */
4929
597d0cae
DT
4930static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4931{
ef0c2bb0
DT
4932 struct dlm_args args;
4933 int error;
597d0cae 4934
ef0c2bb0
DT
4935 hold_lkb(lkb);
4936 mutex_lock(&ls->ls_orphans_mutex);
4937 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4938 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 4939
d292c0cc 4940 set_unlock_args(0, lkb->lkb_ua, &args);
ef0c2bb0
DT
4941
4942 error = cancel_lock(ls, lkb, &args);
4943 if (error == -DLM_ECANCEL)
4944 error = 0;
4945 return error;
597d0cae
DT
4946}
4947
4948/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4949 Regardless of what rsb queue the lock is on, it's removed and freed. */
4950
4951static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4952{
597d0cae
DT
4953 struct dlm_args args;
4954 int error;
4955
d292c0cc 4956 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
597d0cae
DT
4957
4958 error = unlock_lock(ls, lkb, &args);
4959 if (error == -DLM_EUNLOCK)
4960 error = 0;
4961 return error;
4962}
4963
ef0c2bb0
DT
4964/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4965 (which does lock_rsb) due to deadlock with receiving a message that does
4966 lock_rsb followed by dlm_user_add_ast() */
4967
4968static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4969 struct dlm_user_proc *proc)
4970{
4971 struct dlm_lkb *lkb = NULL;
4972
4973 mutex_lock(&ls->ls_clear_proc_locks);
4974 if (list_empty(&proc->locks))
4975 goto out;
4976
4977 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4978 list_del_init(&lkb->lkb_ownqueue);
4979
4980 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4981 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4982 else
4983 lkb->lkb_flags |= DLM_IFL_DEAD;
4984 out:
4985 mutex_unlock(&ls->ls_clear_proc_locks);
4986 return lkb;
4987}
4988
597d0cae
DT
4989/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4990 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4991 which we clear here. */
4992
4993/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4994 list, and no more device_writes should add lkb's to proc->locks list; so we
4995 shouldn't need to take asts_spin or locks_spin here. this assumes that
4996 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4997 them ourself. */
4998
4999void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5000{
5001 struct dlm_lkb *lkb, *safe;
5002
85e86edf 5003 dlm_lock_recovery(ls);
597d0cae 5004
ef0c2bb0
DT
5005 while (1) {
5006 lkb = del_proc_lock(ls, proc);
5007 if (!lkb)
5008 break;
84d8cd69 5009 del_timeout(lkb);
ef0c2bb0 5010 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 5011 orphan_proc_lock(ls, lkb);
ef0c2bb0 5012 else
597d0cae 5013 unlock_proc_lock(ls, lkb);
597d0cae
DT
5014
5015 /* this removes the reference for the proc->locks list
5016 added by dlm_user_request, it may result in the lkb
5017 being freed */
5018
5019 dlm_put_lkb(lkb);
5020 }
a1bc86e6 5021
ef0c2bb0
DT
5022 mutex_lock(&ls->ls_clear_proc_locks);
5023
a1bc86e6
DT
5024 /* in-progress unlocks */
5025 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5026 list_del_init(&lkb->lkb_ownqueue);
5027 lkb->lkb_flags |= DLM_IFL_DEAD;
5028 dlm_put_lkb(lkb);
5029 }
5030
5031 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
8304d6f2
DT
5032 memset(&lkb->lkb_callbacks, 0,
5033 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5034 list_del_init(&lkb->lkb_astqueue);
a1bc86e6
DT
5035 dlm_put_lkb(lkb);
5036 }
5037
597d0cae 5038 mutex_unlock(&ls->ls_clear_proc_locks);
85e86edf 5039 dlm_unlock_recovery(ls);
597d0cae 5040}
a1bc86e6 5041
8499137d
DT
5042static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5043{
5044 struct dlm_lkb *lkb, *safe;
5045
5046 while (1) {
5047 lkb = NULL;
5048 spin_lock(&proc->locks_spin);
5049 if (!list_empty(&proc->locks)) {
5050 lkb = list_entry(proc->locks.next, struct dlm_lkb,
5051 lkb_ownqueue);
5052 list_del_init(&lkb->lkb_ownqueue);
5053 }
5054 spin_unlock(&proc->locks_spin);
5055
5056 if (!lkb)
5057 break;
5058
5059 lkb->lkb_flags |= DLM_IFL_DEAD;
5060 unlock_proc_lock(ls, lkb);
5061 dlm_put_lkb(lkb); /* ref from proc->locks list */
5062 }
5063
5064 spin_lock(&proc->locks_spin);
5065 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5066 list_del_init(&lkb->lkb_ownqueue);
5067 lkb->lkb_flags |= DLM_IFL_DEAD;
5068 dlm_put_lkb(lkb);
5069 }
5070 spin_unlock(&proc->locks_spin);
5071
5072 spin_lock(&proc->asts_spin);
5073 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
8304d6f2
DT
5074 memset(&lkb->lkb_callbacks, 0,
5075 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5076 list_del_init(&lkb->lkb_astqueue);
8499137d
DT
5077 dlm_put_lkb(lkb);
5078 }
5079 spin_unlock(&proc->asts_spin);
5080}
5081
5082/* pid of 0 means purge all orphans */
5083
5084static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5085{
5086 struct dlm_lkb *lkb, *safe;
5087
5088 mutex_lock(&ls->ls_orphans_mutex);
5089 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5090 if (pid && lkb->lkb_ownpid != pid)
5091 continue;
5092 unlock_proc_lock(ls, lkb);
5093 list_del_init(&lkb->lkb_ownqueue);
5094 dlm_put_lkb(lkb);
5095 }
5096 mutex_unlock(&ls->ls_orphans_mutex);
5097}
5098
5099static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5100{
5101 struct dlm_message *ms;
5102 struct dlm_mhandle *mh;
5103 int error;
5104
5105 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5106 DLM_MSG_PURGE, &ms, &mh);
5107 if (error)
5108 return error;
5109 ms->m_nodeid = nodeid;
5110 ms->m_pid = pid;
5111
5112 return send_message(mh, ms);
5113}
5114
5115int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5116 int nodeid, int pid)
5117{
5118 int error = 0;
5119
5120 if (nodeid != dlm_our_nodeid()) {
5121 error = send_purge(ls, nodeid, pid);
5122 } else {
85e86edf 5123 dlm_lock_recovery(ls);
8499137d
DT
5124 if (pid == current->pid)
5125 purge_proc_locks(ls, proc);
5126 else
5127 do_purge(ls, nodeid, pid);
85e86edf 5128 dlm_unlock_recovery(ls);
8499137d
DT
5129 }
5130 return error;
5131}
5132