dlm: delayed reply message warning
[linux-2.6-block.git] / fs / dlm / lock.c
CommitLineData
e7fd4179
DT
1/******************************************************************************
2*******************************************************************************
3**
7fe2b319 4** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
e7fd4179
DT
5**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
597d0cae 58#include <linux/types.h>
5a0e3ad6 59#include <linux/slab.h>
e7fd4179 60#include "dlm_internal.h"
597d0cae 61#include <linux/dlm_device.h>
e7fd4179
DT
62#include "memory.h"
63#include "lowcomms.h"
64#include "requestqueue.h"
65#include "util.h"
66#include "dir.h"
67#include "member.h"
68#include "lockspace.h"
69#include "ast.h"
70#include "lock.h"
71#include "rcom.h"
72#include "recover.h"
73#include "lvb_table.h"
597d0cae 74#include "user.h"
e7fd4179
DT
75#include "config.h"
76
77static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84static int send_remove(struct dlm_rsb *r);
85static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
3ae1acf9 86static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
e7fd4179
DT
87static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88 struct dlm_message *ms);
89static int receive_extralen(struct dlm_message *ms);
8499137d 90static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
3ae1acf9 91static void del_timeout(struct dlm_lkb *lkb);
e7fd4179
DT
92
93/*
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99 */
100
101static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111};
112
113/*
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
120 */
121
122const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132};
e7fd4179
DT
133
134#define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137int dlm_modes_compat(int mode1, int mode2)
138{
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140}
141
142/*
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
146 */
147
148static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158};
159
597d0cae 160void dlm_print_lkb(struct dlm_lkb *lkb)
e7fd4179
DT
161{
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
8304d6f2 163 " status %d rqmode %d grmode %d wait_type %d\n",
e7fd4179
DT
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
8304d6f2 166 lkb->lkb_grmode, lkb->lkb_wait_type);
e7fd4179
DT
167}
168
170e19ab 169static void dlm_print_rsb(struct dlm_rsb *r)
e7fd4179
DT
170{
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
174}
175
a345da3e
DT
176void dlm_dump_rsb(struct dlm_rsb *r)
177{
178 struct dlm_lkb *lkb;
179
180 dlm_print_rsb(r);
181
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 dlm_print_lkb(lkb);
196}
197
e7fd4179
DT
198/* Threads cannot use the lockspace while it's being recovered */
199
85e86edf 200static inline void dlm_lock_recovery(struct dlm_ls *ls)
e7fd4179
DT
201{
202 down_read(&ls->ls_in_recovery);
203}
204
85e86edf 205void dlm_unlock_recovery(struct dlm_ls *ls)
e7fd4179
DT
206{
207 up_read(&ls->ls_in_recovery);
208}
209
85e86edf 210int dlm_lock_recovery_try(struct dlm_ls *ls)
e7fd4179
DT
211{
212 return down_read_trylock(&ls->ls_in_recovery);
213}
214
215static inline int can_be_queued(struct dlm_lkb *lkb)
216{
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218}
219
220static inline int force_blocking_asts(struct dlm_lkb *lkb)
221{
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223}
224
225static inline int is_demoted(struct dlm_lkb *lkb)
226{
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228}
229
7d3c1feb
DT
230static inline int is_altmode(struct dlm_lkb *lkb)
231{
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233}
234
235static inline int is_granted(struct dlm_lkb *lkb)
236{
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238}
239
e7fd4179
DT
240static inline int is_remote(struct dlm_rsb *r)
241{
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
244}
245
246static inline int is_process_copy(struct dlm_lkb *lkb)
247{
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249}
250
251static inline int is_master_copy(struct dlm_lkb *lkb)
252{
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
90135925 255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
e7fd4179
DT
256}
257
258static inline int middle_conversion(struct dlm_lkb *lkb)
259{
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
90135925
DT
262 return 1;
263 return 0;
e7fd4179
DT
264}
265
266static inline int down_conversion(struct dlm_lkb *lkb)
267{
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269}
270
ef0c2bb0
DT
271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272{
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274}
275
276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277{
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279}
280
281static inline int is_overlap(struct dlm_lkb *lkb)
282{
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
285}
286
e7fd4179
DT
287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288{
289 if (is_master_copy(lkb))
290 return;
291
3ae1acf9
DT
292 del_timeout(lkb);
293
e7fd4179
DT
294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
3ae1acf9
DT
296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
8b4021fa
DT
303 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305 rv = -EDEADLK;
306 }
307
8304d6f2 308 dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
e7fd4179
DT
309}
310
ef0c2bb0
DT
311static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312{
313 queue_cast(r, lkb,
314 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315}
316
e7fd4179
DT
317static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318{
b6fa8796 319 if (is_master_copy(lkb)) {
e7fd4179 320 send_bast(r, lkb, rqmode);
b6fa8796 321 } else {
8304d6f2 322 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
b6fa8796 323 }
e7fd4179
DT
324}
325
326/*
327 * Basic operations on rsb's and lkb's
328 */
329
330static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
331{
332 struct dlm_rsb *r;
333
52bda2b5 334 r = dlm_allocate_rsb(ls, len);
e7fd4179
DT
335 if (!r)
336 return NULL;
337
338 r->res_ls = ls;
339 r->res_length = len;
340 memcpy(r->res_name, name, len);
90135925 341 mutex_init(&r->res_mutex);
e7fd4179
DT
342
343 INIT_LIST_HEAD(&r->res_lookup);
344 INIT_LIST_HEAD(&r->res_grantqueue);
345 INIT_LIST_HEAD(&r->res_convertqueue);
346 INIT_LIST_HEAD(&r->res_waitqueue);
347 INIT_LIST_HEAD(&r->res_root_list);
348 INIT_LIST_HEAD(&r->res_recover_list);
349
350 return r;
351}
352
353static int search_rsb_list(struct list_head *head, char *name, int len,
354 unsigned int flags, struct dlm_rsb **r_ret)
355{
356 struct dlm_rsb *r;
357 int error = 0;
358
359 list_for_each_entry(r, head, res_hashchain) {
360 if (len == r->res_length && !memcmp(name, r->res_name, len))
361 goto found;
362 }
18c60c0a 363 *r_ret = NULL;
597d0cae 364 return -EBADR;
e7fd4179
DT
365
366 found:
367 if (r->res_nodeid && (flags & R_MASTER))
368 error = -ENOTBLK;
369 *r_ret = r;
370 return error;
371}
372
373static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
374 unsigned int flags, struct dlm_rsb **r_ret)
375{
376 struct dlm_rsb *r;
377 int error;
378
379 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
380 if (!error) {
381 kref_get(&r->res_ref);
382 goto out;
383 }
384 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
385 if (error)
386 goto out;
387
388 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
389
390 if (dlm_no_directory(ls))
391 goto out;
392
393 if (r->res_nodeid == -1) {
394 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
395 r->res_first_lkid = 0;
396 } else if (r->res_nodeid > 0) {
397 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
398 r->res_first_lkid = 0;
399 } else {
400 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
401 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
402 }
403 out:
404 *r_ret = r;
405 return error;
406}
407
408static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409 unsigned int flags, struct dlm_rsb **r_ret)
410{
411 int error;
c7be761a 412 spin_lock(&ls->ls_rsbtbl[b].lock);
e7fd4179 413 error = _search_rsb(ls, name, len, b, flags, r_ret);
c7be761a 414 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
415 return error;
416}
417
418/*
419 * Find rsb in rsbtbl and potentially create/add one
420 *
421 * Delaying the release of rsb's has a similar benefit to applications keeping
422 * NL locks on an rsb, but without the guarantee that the cached master value
423 * will still be valid when the rsb is reused. Apps aren't always smart enough
424 * to keep NL locks on an rsb that they may lock again shortly; this can lead
425 * to excessive master lookups and removals if we don't delay the release.
426 *
427 * Searching for an rsb means looking through both the normal list and toss
428 * list. When found on the toss list the rsb is moved to the normal list with
429 * ref count of 1; when found on normal list the ref count is incremented.
430 */
431
432static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433 unsigned int flags, struct dlm_rsb **r_ret)
434{
a566a6b1 435 struct dlm_rsb *r = NULL, *tmp;
e7fd4179 436 uint32_t hash, bucket;
ef58bcca
AV
437 int error = -EINVAL;
438
439 if (namelen > DLM_RESNAME_MAXLEN)
440 goto out;
e7fd4179
DT
441
442 if (dlm_no_directory(ls))
443 flags |= R_CREATE;
444
ef58bcca 445 error = 0;
e7fd4179
DT
446 hash = jhash(name, namelen, 0);
447 bucket = hash & (ls->ls_rsbtbl_size - 1);
448
449 error = search_rsb(ls, name, namelen, bucket, flags, &r);
450 if (!error)
451 goto out;
452
597d0cae 453 if (error == -EBADR && !(flags & R_CREATE))
e7fd4179
DT
454 goto out;
455
456 /* the rsb was found but wasn't a master copy */
457 if (error == -ENOTBLK)
458 goto out;
459
460 error = -ENOMEM;
461 r = create_rsb(ls, name, namelen);
462 if (!r)
463 goto out;
464
465 r->res_hash = hash;
466 r->res_bucket = bucket;
467 r->res_nodeid = -1;
468 kref_init(&r->res_ref);
469
470 /* With no directory, the master can be set immediately */
471 if (dlm_no_directory(ls)) {
472 int nodeid = dlm_dir_nodeid(r);
473 if (nodeid == dlm_our_nodeid())
474 nodeid = 0;
475 r->res_nodeid = nodeid;
476 }
477
c7be761a 478 spin_lock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
479 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480 if (!error) {
c7be761a 481 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
52bda2b5 482 dlm_free_rsb(r);
e7fd4179
DT
483 r = tmp;
484 goto out;
485 }
486 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
c7be761a 487 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
488 error = 0;
489 out:
490 *r_ret = r;
491 return error;
492}
493
e7fd4179
DT
494/* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
496
497static inline void hold_rsb(struct dlm_rsb *r)
498{
499 kref_get(&r->res_ref);
500}
501
502void dlm_hold_rsb(struct dlm_rsb *r)
503{
504 hold_rsb(r);
505}
506
507static void toss_rsb(struct kref *kref)
508{
509 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 struct dlm_ls *ls = r->res_ls;
511
512 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 kref_init(&r->res_ref);
514 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 r->res_toss_time = jiffies;
516 if (r->res_lvbptr) {
52bda2b5 517 dlm_free_lvb(r->res_lvbptr);
e7fd4179
DT
518 r->res_lvbptr = NULL;
519 }
520}
521
522/* When all references to the rsb are gone it's transfered to
523 the tossed list for later disposal. */
524
525static void put_rsb(struct dlm_rsb *r)
526{
527 struct dlm_ls *ls = r->res_ls;
528 uint32_t bucket = r->res_bucket;
529
c7be761a 530 spin_lock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179 531 kref_put(&r->res_ref, toss_rsb);
c7be761a 532 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
e7fd4179
DT
533}
534
535void dlm_put_rsb(struct dlm_rsb *r)
536{
537 put_rsb(r);
538}
539
540/* See comment for unhold_lkb */
541
542static void unhold_rsb(struct dlm_rsb *r)
543{
544 int rv;
545 rv = kref_put(&r->res_ref, toss_rsb);
a345da3e 546 DLM_ASSERT(!rv, dlm_dump_rsb(r););
e7fd4179
DT
547}
548
549static void kill_rsb(struct kref *kref)
550{
551 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
555
a345da3e
DT
556 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
e7fd4179
DT
562}
563
564/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
566
567static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568{
569 hold_rsb(r);
570 lkb->lkb_resource = r;
571}
572
573static void detach_lkb(struct dlm_lkb *lkb)
574{
575 if (lkb->lkb_resource) {
576 put_rsb(lkb->lkb_resource);
577 lkb->lkb_resource = NULL;
578 }
579}
580
581static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582{
583 struct dlm_lkb *lkb, *tmp;
584 uint32_t lkid = 0;
585 uint16_t bucket;
586
52bda2b5 587 lkb = dlm_allocate_lkb(ls);
e7fd4179
DT
588 if (!lkb)
589 return -ENOMEM;
590
591 lkb->lkb_nodeid = -1;
592 lkb->lkb_grmode = DLM_LOCK_IV;
593 kref_init(&lkb->lkb_ref);
34e22bed 594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
ef0c2bb0 595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
3ae1acf9 596 INIT_LIST_HEAD(&lkb->lkb_time_list);
8304d6f2 597 INIT_LIST_HEAD(&lkb->lkb_astqueue);
e7fd4179
DT
598
599 get_random_bytes(&bucket, sizeof(bucket));
600 bucket &= (ls->ls_lkbtbl_size - 1);
601
602 write_lock(&ls->ls_lkbtbl[bucket].lock);
603
604 /* counter can roll over so we must verify lkid is not in use */
605
606 while (lkid == 0) {
ce03f12b 607 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
e7fd4179
DT
608
609 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610 lkb_idtbl_list) {
611 if (tmp->lkb_id != lkid)
612 continue;
613 lkid = 0;
614 break;
615 }
616 }
617
618 lkb->lkb_id = lkid;
619 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620 write_unlock(&ls->ls_lkbtbl[bucket].lock);
621
622 *lkb_ret = lkb;
623 return 0;
624}
625
626static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627{
e7fd4179 628 struct dlm_lkb *lkb;
ce03f12b 629 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
630
631 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632 if (lkb->lkb_id == lkid)
633 return lkb;
634 }
635 return NULL;
636}
637
638static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639{
640 struct dlm_lkb *lkb;
ce03f12b 641 uint16_t bucket = (lkid >> 16);
e7fd4179
DT
642
643 if (bucket >= ls->ls_lkbtbl_size)
644 return -EBADSLT;
645
646 read_lock(&ls->ls_lkbtbl[bucket].lock);
647 lkb = __find_lkb(ls, lkid);
648 if (lkb)
649 kref_get(&lkb->lkb_ref);
650 read_unlock(&ls->ls_lkbtbl[bucket].lock);
651
652 *lkb_ret = lkb;
653 return lkb ? 0 : -ENOENT;
654}
655
656static void kill_lkb(struct kref *kref)
657{
658 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659
660 /* All work is done after the return from kref_put() so we
661 can release the write_lock before the detach_lkb */
662
663 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664}
665
b3f58d8f
DT
666/* __put_lkb() is used when an lkb may not have an rsb attached to
667 it so we need to provide the lockspace explicitly */
668
669static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
e7fd4179 670{
ce03f12b 671 uint16_t bucket = (lkb->lkb_id >> 16);
e7fd4179
DT
672
673 write_lock(&ls->ls_lkbtbl[bucket].lock);
674 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675 list_del(&lkb->lkb_idtbl_list);
676 write_unlock(&ls->ls_lkbtbl[bucket].lock);
677
678 detach_lkb(lkb);
679
680 /* for local/process lkbs, lvbptr points to caller's lksb */
681 if (lkb->lkb_lvbptr && is_master_copy(lkb))
52bda2b5
DT
682 dlm_free_lvb(lkb->lkb_lvbptr);
683 dlm_free_lkb(lkb);
e7fd4179
DT
684 return 1;
685 } else {
686 write_unlock(&ls->ls_lkbtbl[bucket].lock);
687 return 0;
688 }
689}
690
691int dlm_put_lkb(struct dlm_lkb *lkb)
692{
b3f58d8f
DT
693 struct dlm_ls *ls;
694
695 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
696 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697
698 ls = lkb->lkb_resource->res_ls;
699 return __put_lkb(ls, lkb);
e7fd4179
DT
700}
701
702/* This is only called to add a reference when the code already holds
703 a valid reference to the lkb, so there's no need for locking. */
704
705static inline void hold_lkb(struct dlm_lkb *lkb)
706{
707 kref_get(&lkb->lkb_ref);
708}
709
710/* This is called when we need to remove a reference and are certain
711 it's not the last ref. e.g. del_lkb is always called between a
712 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713 put_lkb would work fine, but would involve unnecessary locking */
714
715static inline void unhold_lkb(struct dlm_lkb *lkb)
716{
717 int rv;
718 rv = kref_put(&lkb->lkb_ref, kill_lkb);
719 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
720}
721
722static void lkb_add_ordered(struct list_head *new, struct list_head *head,
723 int mode)
724{
725 struct dlm_lkb *lkb = NULL;
726
727 list_for_each_entry(lkb, head, lkb_statequeue)
728 if (lkb->lkb_rqmode < mode)
729 break;
730
99fb19d4 731 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
e7fd4179
DT
732}
733
734/* add/remove lkb to rsb's grant/convert/wait queue */
735
736static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737{
738 kref_get(&lkb->lkb_ref);
739
740 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741
eeda418d
DT
742 lkb->lkb_timestamp = ktime_get();
743
e7fd4179
DT
744 lkb->lkb_status = status;
745
746 switch (status) {
747 case DLM_LKSTS_WAITING:
748 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750 else
751 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752 break;
753 case DLM_LKSTS_GRANTED:
754 /* convention says granted locks kept in order of grmode */
755 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756 lkb->lkb_grmode);
757 break;
758 case DLM_LKSTS_CONVERT:
759 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761 else
762 list_add_tail(&lkb->lkb_statequeue,
763 &r->res_convertqueue);
764 break;
765 default:
766 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767 }
768}
769
770static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771{
772 lkb->lkb_status = 0;
773 list_del(&lkb->lkb_statequeue);
774 unhold_lkb(lkb);
775}
776
777static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778{
779 hold_lkb(lkb);
780 del_lkb(r, lkb);
781 add_lkb(r, lkb, sts);
782 unhold_lkb(lkb);
783}
784
ef0c2bb0
DT
785static int msg_reply_type(int mstype)
786{
787 switch (mstype) {
788 case DLM_MSG_REQUEST:
789 return DLM_MSG_REQUEST_REPLY;
790 case DLM_MSG_CONVERT:
791 return DLM_MSG_CONVERT_REPLY;
792 case DLM_MSG_UNLOCK:
793 return DLM_MSG_UNLOCK_REPLY;
794 case DLM_MSG_CANCEL:
795 return DLM_MSG_CANCEL_REPLY;
796 case DLM_MSG_LOOKUP:
797 return DLM_MSG_LOOKUP_REPLY;
798 }
799 return -1;
800}
801
c6ff669b
DT
802static int nodeid_warned(int nodeid, int num_nodes, int *warned)
803{
804 int i;
805
806 for (i = 0; i < num_nodes; i++) {
807 if (!warned[i]) {
808 warned[i] = nodeid;
809 return 0;
810 }
811 if (warned[i] == nodeid)
812 return 1;
813 }
814 return 0;
815}
816
817void dlm_scan_waiters(struct dlm_ls *ls)
818{
819 struct dlm_lkb *lkb;
820 ktime_t zero = ktime_set(0, 0);
821 s64 us;
822 s64 debug_maxus = 0;
823 u32 debug_scanned = 0;
824 u32 debug_expired = 0;
825 int num_nodes = 0;
826 int *warned = NULL;
827
828 if (!dlm_config.ci_waitwarn_us)
829 return;
830
831 mutex_lock(&ls->ls_waiters_mutex);
832
833 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
834 if (ktime_equal(lkb->lkb_wait_time, zero))
835 continue;
836
837 debug_scanned++;
838
839 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
840
841 if (us < dlm_config.ci_waitwarn_us)
842 continue;
843
844 lkb->lkb_wait_time = zero;
845
846 debug_expired++;
847 if (us > debug_maxus)
848 debug_maxus = us;
849
850 if (!num_nodes) {
851 num_nodes = ls->ls_num_nodes;
852 warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
853 if (warned)
854 memset(warned, 0, num_nodes * sizeof(int));
855 }
856 if (!warned)
857 continue;
858 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
859 continue;
860
861 log_error(ls, "waitwarn %x %lld %d us check connection to "
862 "node %d", lkb->lkb_id, (long long)us,
863 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
864 }
865 mutex_unlock(&ls->ls_waiters_mutex);
866
867 if (warned)
868 kfree(warned);
869
870 if (debug_expired)
871 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
872 debug_scanned, debug_expired,
873 dlm_config.ci_waitwarn_us, (long long)debug_maxus);
874}
875
e7fd4179
DT
876/* add/remove lkb from global waiters list of lkb's waiting for
877 a reply from a remote node */
878
c6ff669b 879static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
e7fd4179
DT
880{
881 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
ef0c2bb0 882 int error = 0;
e7fd4179 883
90135925 884 mutex_lock(&ls->ls_waiters_mutex);
ef0c2bb0
DT
885
886 if (is_overlap_unlock(lkb) ||
887 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
888 error = -EINVAL;
889 goto out;
890 }
891
892 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
893 switch (mstype) {
894 case DLM_MSG_UNLOCK:
895 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
896 break;
897 case DLM_MSG_CANCEL:
898 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
899 break;
900 default:
901 error = -EBUSY;
902 goto out;
903 }
904 lkb->lkb_wait_count++;
905 hold_lkb(lkb);
906
43279e53 907 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
ef0c2bb0
DT
908 lkb->lkb_id, lkb->lkb_wait_type, mstype,
909 lkb->lkb_wait_count, lkb->lkb_flags);
e7fd4179
DT
910 goto out;
911 }
ef0c2bb0
DT
912
913 DLM_ASSERT(!lkb->lkb_wait_count,
914 dlm_print_lkb(lkb);
915 printk("wait_count %d\n", lkb->lkb_wait_count););
916
917 lkb->lkb_wait_count++;
e7fd4179 918 lkb->lkb_wait_type = mstype;
c6ff669b
DT
919 lkb->lkb_wait_time = ktime_get();
920 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
ef0c2bb0 921 hold_lkb(lkb);
e7fd4179
DT
922 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
923 out:
ef0c2bb0 924 if (error)
43279e53 925 log_error(ls, "addwait error %x %d flags %x %d %d %s",
ef0c2bb0
DT
926 lkb->lkb_id, error, lkb->lkb_flags, mstype,
927 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
90135925 928 mutex_unlock(&ls->ls_waiters_mutex);
ef0c2bb0 929 return error;
e7fd4179
DT
930}
931
b790c3b7
DT
932/* We clear the RESEND flag because we might be taking an lkb off the waiters
933 list as part of process_requestqueue (e.g. a lookup that has an optimized
934 request reply on the requestqueue) between dlm_recover_waiters_pre() which
935 set RESEND and dlm_recover_waiters_post() */
936
43279e53
DT
937static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
938 struct dlm_message *ms)
e7fd4179 939{
ef0c2bb0
DT
940 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
941 int overlap_done = 0;
e7fd4179 942
ef0c2bb0 943 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
43279e53 944 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
945 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
946 overlap_done = 1;
947 goto out_del;
e7fd4179 948 }
ef0c2bb0
DT
949
950 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
43279e53 951 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
ef0c2bb0
DT
952 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
953 overlap_done = 1;
954 goto out_del;
955 }
956
43279e53
DT
957 /* Cancel state was preemptively cleared by a successful convert,
958 see next comment, nothing to do. */
959
960 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
961 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
962 log_debug(ls, "remwait %x cancel_reply wait_type %d",
963 lkb->lkb_id, lkb->lkb_wait_type);
964 return -1;
965 }
966
967 /* Remove for the convert reply, and premptively remove for the
968 cancel reply. A convert has been granted while there's still
969 an outstanding cancel on it (the cancel is moot and the result
970 in the cancel reply should be 0). We preempt the cancel reply
971 because the app gets the convert result and then can follow up
972 with another op, like convert. This subsequent op would see the
973 lingering state of the cancel and fail with -EBUSY. */
974
975 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
976 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
977 is_overlap_cancel(lkb) && ms && !ms->m_result) {
978 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
979 lkb->lkb_id);
980 lkb->lkb_wait_type = 0;
981 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
982 lkb->lkb_wait_count--;
983 goto out_del;
984 }
985
ef0c2bb0
DT
986 /* N.B. type of reply may not always correspond to type of original
987 msg due to lookup->request optimization, verify others? */
988
989 if (lkb->lkb_wait_type) {
990 lkb->lkb_wait_type = 0;
991 goto out_del;
992 }
993
43279e53
DT
994 log_error(ls, "remwait error %x reply %d flags %x no wait_type",
995 lkb->lkb_id, mstype, lkb->lkb_flags);
ef0c2bb0
DT
996 return -1;
997
998 out_del:
999 /* the force-unlock/cancel has completed and we haven't recvd a reply
1000 to the op that was in progress prior to the unlock/cancel; we
1001 give up on any reply to the earlier op. FIXME: not sure when/how
1002 this would happen */
1003
1004 if (overlap_done && lkb->lkb_wait_type) {
43279e53 1005 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
ef0c2bb0
DT
1006 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1007 lkb->lkb_wait_count--;
1008 lkb->lkb_wait_type = 0;
1009 }
1010
1011 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1012
b790c3b7 1013 lkb->lkb_flags &= ~DLM_IFL_RESEND;
ef0c2bb0
DT
1014 lkb->lkb_wait_count--;
1015 if (!lkb->lkb_wait_count)
1016 list_del_init(&lkb->lkb_wait_reply);
e7fd4179 1017 unhold_lkb(lkb);
ef0c2bb0 1018 return 0;
e7fd4179
DT
1019}
1020
ef0c2bb0 1021static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
e7fd4179
DT
1022{
1023 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1024 int error;
1025
90135925 1026 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1027 error = _remove_from_waiters(lkb, mstype, NULL);
90135925 1028 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
1029 return error;
1030}
1031
ef0c2bb0
DT
1032/* Handles situations where we might be processing a "fake" or "stub" reply in
1033 which we can't try to take waiters_mutex again. */
1034
1035static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1036{
1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038 int error;
1039
1040 if (ms != &ls->ls_stub_ms)
1041 mutex_lock(&ls->ls_waiters_mutex);
43279e53 1042 error = _remove_from_waiters(lkb, ms->m_type, ms);
ef0c2bb0
DT
1043 if (ms != &ls->ls_stub_ms)
1044 mutex_unlock(&ls->ls_waiters_mutex);
1045 return error;
1046}
1047
e7fd4179
DT
1048static void dir_remove(struct dlm_rsb *r)
1049{
1050 int to_nodeid;
1051
1052 if (dlm_no_directory(r->res_ls))
1053 return;
1054
1055 to_nodeid = dlm_dir_nodeid(r);
1056 if (to_nodeid != dlm_our_nodeid())
1057 send_remove(r);
1058 else
1059 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1060 r->res_name, r->res_length);
1061}
1062
1063/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1064 found since they are in order of newest to oldest? */
1065
1066static int shrink_bucket(struct dlm_ls *ls, int b)
1067{
1068 struct dlm_rsb *r;
1069 int count = 0, found;
1070
1071 for (;;) {
90135925 1072 found = 0;
c7be761a 1073 spin_lock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1074 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1075 res_hashchain) {
1076 if (!time_after_eq(jiffies, r->res_toss_time +
68c817a1 1077 dlm_config.ci_toss_secs * HZ))
e7fd4179 1078 continue;
90135925 1079 found = 1;
e7fd4179
DT
1080 break;
1081 }
1082
1083 if (!found) {
c7be761a 1084 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1085 break;
1086 }
1087
1088 if (kref_put(&r->res_ref, kill_rsb)) {
1089 list_del(&r->res_hashchain);
c7be761a 1090 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1091
1092 if (is_master(r))
1093 dir_remove(r);
52bda2b5 1094 dlm_free_rsb(r);
e7fd4179
DT
1095 count++;
1096 } else {
c7be761a 1097 spin_unlock(&ls->ls_rsbtbl[b].lock);
e7fd4179
DT
1098 log_error(ls, "tossed rsb in use %s", r->res_name);
1099 }
1100 }
1101
1102 return count;
1103}
1104
1105void dlm_scan_rsbs(struct dlm_ls *ls)
1106{
1107 int i;
1108
e7fd4179
DT
1109 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1110 shrink_bucket(ls, i);
85e86edf
DT
1111 if (dlm_locking_stopped(ls))
1112 break;
e7fd4179
DT
1113 cond_resched();
1114 }
1115}
1116
3ae1acf9
DT
1117static void add_timeout(struct dlm_lkb *lkb)
1118{
1119 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1120
eeda418d 1121 if (is_master_copy(lkb))
3ae1acf9 1122 return;
3ae1acf9
DT
1123
1124 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1125 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1126 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1127 goto add_it;
1128 }
84d8cd69
DT
1129 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1130 goto add_it;
3ae1acf9
DT
1131 return;
1132
1133 add_it:
1134 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1135 mutex_lock(&ls->ls_timeout_mutex);
1136 hold_lkb(lkb);
3ae1acf9
DT
1137 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1138 mutex_unlock(&ls->ls_timeout_mutex);
1139}
1140
1141static void del_timeout(struct dlm_lkb *lkb)
1142{
1143 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1144
1145 mutex_lock(&ls->ls_timeout_mutex);
1146 if (!list_empty(&lkb->lkb_time_list)) {
1147 list_del_init(&lkb->lkb_time_list);
1148 unhold_lkb(lkb);
1149 }
1150 mutex_unlock(&ls->ls_timeout_mutex);
1151}
1152
1153/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1154 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1155 and then lock rsb because of lock ordering in add_timeout. We may need
1156 to specify some special timeout-related bits in the lkb that are just to
1157 be accessed under the timeout_mutex. */
1158
1159void dlm_scan_timeout(struct dlm_ls *ls)
1160{
1161 struct dlm_rsb *r;
1162 struct dlm_lkb *lkb;
1163 int do_cancel, do_warn;
eeda418d 1164 s64 wait_us;
3ae1acf9
DT
1165
1166 for (;;) {
1167 if (dlm_locking_stopped(ls))
1168 break;
1169
1170 do_cancel = 0;
1171 do_warn = 0;
1172 mutex_lock(&ls->ls_timeout_mutex);
1173 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1174
eeda418d
DT
1175 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1176 lkb->lkb_timestamp));
1177
3ae1acf9 1178 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
eeda418d 1179 wait_us >= (lkb->lkb_timeout_cs * 10000))
3ae1acf9
DT
1180 do_cancel = 1;
1181
1182 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
eeda418d 1183 wait_us >= dlm_config.ci_timewarn_cs * 10000)
3ae1acf9
DT
1184 do_warn = 1;
1185
1186 if (!do_cancel && !do_warn)
1187 continue;
1188 hold_lkb(lkb);
1189 break;
1190 }
1191 mutex_unlock(&ls->ls_timeout_mutex);
1192
1193 if (!do_cancel && !do_warn)
1194 break;
1195
1196 r = lkb->lkb_resource;
1197 hold_rsb(r);
1198 lock_rsb(r);
1199
1200 if (do_warn) {
1201 /* clear flag so we only warn once */
1202 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1203 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1204 del_timeout(lkb);
1205 dlm_timeout_warn(lkb);
1206 }
1207
1208 if (do_cancel) {
b3cab7b9 1209 log_debug(ls, "timeout cancel %x node %d %s",
639aca41 1210 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
3ae1acf9
DT
1211 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1212 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1213 del_timeout(lkb);
1214 _cancel_lock(r, lkb);
1215 }
1216
1217 unlock_rsb(r);
1218 unhold_rsb(r);
1219 dlm_put_lkb(lkb);
1220 }
1221}
1222
1223/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1224 dlm_recoverd before checking/setting ls_recover_begin. */
1225
1226void dlm_adjust_timeouts(struct dlm_ls *ls)
1227{
1228 struct dlm_lkb *lkb;
eeda418d 1229 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
3ae1acf9
DT
1230
1231 ls->ls_recover_begin = 0;
1232 mutex_lock(&ls->ls_timeout_mutex);
1233 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
eeda418d 1234 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
3ae1acf9 1235 mutex_unlock(&ls->ls_timeout_mutex);
c6ff669b
DT
1236
1237 if (!dlm_config.ci_waitwarn_us)
1238 return;
1239
1240 mutex_lock(&ls->ls_waiters_mutex);
1241 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1242 if (ktime_to_us(lkb->lkb_wait_time))
1243 lkb->lkb_wait_time = ktime_get();
1244 }
1245 mutex_unlock(&ls->ls_waiters_mutex);
3ae1acf9
DT
1246}
1247
e7fd4179
DT
1248/* lkb is master or local copy */
1249
1250static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1251{
1252 int b, len = r->res_ls->ls_lvblen;
1253
1254 /* b=1 lvb returned to caller
1255 b=0 lvb written to rsb or invalidated
1256 b=-1 do nothing */
1257
1258 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1259
1260 if (b == 1) {
1261 if (!lkb->lkb_lvbptr)
1262 return;
1263
1264 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1265 return;
1266
1267 if (!r->res_lvbptr)
1268 return;
1269
1270 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1271 lkb->lkb_lvbseq = r->res_lvbseq;
1272
1273 } else if (b == 0) {
1274 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1275 rsb_set_flag(r, RSB_VALNOTVALID);
1276 return;
1277 }
1278
1279 if (!lkb->lkb_lvbptr)
1280 return;
1281
1282 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1283 return;
1284
1285 if (!r->res_lvbptr)
52bda2b5 1286 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1287
1288 if (!r->res_lvbptr)
1289 return;
1290
1291 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1292 r->res_lvbseq++;
1293 lkb->lkb_lvbseq = r->res_lvbseq;
1294 rsb_clear_flag(r, RSB_VALNOTVALID);
1295 }
1296
1297 if (rsb_flag(r, RSB_VALNOTVALID))
1298 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1299}
1300
1301static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1302{
1303 if (lkb->lkb_grmode < DLM_LOCK_PW)
1304 return;
1305
1306 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1307 rsb_set_flag(r, RSB_VALNOTVALID);
1308 return;
1309 }
1310
1311 if (!lkb->lkb_lvbptr)
1312 return;
1313
1314 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1315 return;
1316
1317 if (!r->res_lvbptr)
52bda2b5 1318 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
e7fd4179
DT
1319
1320 if (!r->res_lvbptr)
1321 return;
1322
1323 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1324 r->res_lvbseq++;
1325 rsb_clear_flag(r, RSB_VALNOTVALID);
1326}
1327
1328/* lkb is process copy (pc) */
1329
1330static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1331 struct dlm_message *ms)
1332{
1333 int b;
1334
1335 if (!lkb->lkb_lvbptr)
1336 return;
1337
1338 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1339 return;
1340
597d0cae 1341 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
e7fd4179
DT
1342 if (b == 1) {
1343 int len = receive_extralen(ms);
a9cc9159
AV
1344 if (len > DLM_RESNAME_MAXLEN)
1345 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
1346 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1347 lkb->lkb_lvbseq = ms->m_lvbseq;
1348 }
1349}
1350
1351/* Manipulate lkb's on rsb's convert/granted/waiting queues
1352 remove_lock -- used for unlock, removes lkb from granted
1353 revert_lock -- used for cancel, moves lkb from convert to granted
1354 grant_lock -- used for request and convert, adds lkb to granted or
1355 moves lkb from convert or waiting to granted
1356
1357 Each of these is used for master or local copy lkb's. There is
1358 also a _pc() variation used to make the corresponding change on
1359 a process copy (pc) lkb. */
1360
1361static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1362{
1363 del_lkb(r, lkb);
1364 lkb->lkb_grmode = DLM_LOCK_IV;
1365 /* this unhold undoes the original ref from create_lkb()
1366 so this leads to the lkb being freed */
1367 unhold_lkb(lkb);
1368}
1369
1370static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1371{
1372 set_lvb_unlock(r, lkb);
1373 _remove_lock(r, lkb);
1374}
1375
1376static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377{
1378 _remove_lock(r, lkb);
1379}
1380
ef0c2bb0
DT
1381/* returns: 0 did nothing
1382 1 moved lock to granted
1383 -1 removed lock */
1384
1385static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1386{
ef0c2bb0
DT
1387 int rv = 0;
1388
e7fd4179
DT
1389 lkb->lkb_rqmode = DLM_LOCK_IV;
1390
1391 switch (lkb->lkb_status) {
597d0cae
DT
1392 case DLM_LKSTS_GRANTED:
1393 break;
e7fd4179
DT
1394 case DLM_LKSTS_CONVERT:
1395 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
ef0c2bb0 1396 rv = 1;
e7fd4179
DT
1397 break;
1398 case DLM_LKSTS_WAITING:
1399 del_lkb(r, lkb);
1400 lkb->lkb_grmode = DLM_LOCK_IV;
1401 /* this unhold undoes the original ref from create_lkb()
1402 so this leads to the lkb being freed */
1403 unhold_lkb(lkb);
ef0c2bb0 1404 rv = -1;
e7fd4179
DT
1405 break;
1406 default:
1407 log_print("invalid status for revert %d", lkb->lkb_status);
1408 }
ef0c2bb0 1409 return rv;
e7fd4179
DT
1410}
1411
ef0c2bb0 1412static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
e7fd4179 1413{
ef0c2bb0 1414 return revert_lock(r, lkb);
e7fd4179
DT
1415}
1416
1417static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1418{
1419 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1420 lkb->lkb_grmode = lkb->lkb_rqmode;
1421 if (lkb->lkb_status)
1422 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1423 else
1424 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1425 }
1426
1427 lkb->lkb_rqmode = DLM_LOCK_IV;
e7fd4179
DT
1428}
1429
1430static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1431{
1432 set_lvb_lock(r, lkb);
1433 _grant_lock(r, lkb);
1434 lkb->lkb_highbast = 0;
1435}
1436
1437static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1438 struct dlm_message *ms)
1439{
1440 set_lvb_lock_pc(r, lkb, ms);
1441 _grant_lock(r, lkb);
1442}
1443
1444/* called by grant_pending_locks() which means an async grant message must
1445 be sent to the requesting node in addition to granting the lock if the
1446 lkb belongs to a remote node. */
1447
1448static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1449{
1450 grant_lock(r, lkb);
1451 if (is_master_copy(lkb))
1452 send_grant(r, lkb);
1453 else
1454 queue_cast(r, lkb, 0);
1455}
1456
7d3c1feb
DT
1457/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1458 change the granted/requested modes. We're munging things accordingly in
1459 the process copy.
1460 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1461 conversion deadlock
1462 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1463 compatible with other granted locks */
1464
1465static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1466{
1467 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1468 log_print("munge_demoted %x invalid reply type %d",
1469 lkb->lkb_id, ms->m_type);
1470 return;
1471 }
1472
1473 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1474 log_print("munge_demoted %x invalid modes gr %d rq %d",
1475 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1476 return;
1477 }
1478
1479 lkb->lkb_grmode = DLM_LOCK_NL;
1480}
1481
1482static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1483{
1484 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1485 ms->m_type != DLM_MSG_GRANT) {
1486 log_print("munge_altmode %x invalid reply type %d",
1487 lkb->lkb_id, ms->m_type);
1488 return;
1489 }
1490
1491 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1492 lkb->lkb_rqmode = DLM_LOCK_PR;
1493 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1494 lkb->lkb_rqmode = DLM_LOCK_CW;
1495 else {
1496 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1497 dlm_print_lkb(lkb);
1498 }
1499}
1500
e7fd4179
DT
1501static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1502{
1503 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1504 lkb_statequeue);
1505 if (lkb->lkb_id == first->lkb_id)
90135925 1506 return 1;
e7fd4179 1507
90135925 1508 return 0;
e7fd4179
DT
1509}
1510
e7fd4179
DT
1511/* Check if the given lkb conflicts with another lkb on the queue. */
1512
1513static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1514{
1515 struct dlm_lkb *this;
1516
1517 list_for_each_entry(this, head, lkb_statequeue) {
1518 if (this == lkb)
1519 continue;
3bcd3687 1520 if (!modes_compat(this, lkb))
90135925 1521 return 1;
e7fd4179 1522 }
90135925 1523 return 0;
e7fd4179
DT
1524}
1525
1526/*
1527 * "A conversion deadlock arises with a pair of lock requests in the converting
1528 * queue for one resource. The granted mode of each lock blocks the requested
1529 * mode of the other lock."
1530 *
c85d65e9
DT
1531 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1532 * convert queue from being granted, then deadlk/demote lkb.
e7fd4179
DT
1533 *
1534 * Example:
1535 * Granted Queue: empty
1536 * Convert Queue: NL->EX (first lock)
1537 * PR->EX (second lock)
1538 *
1539 * The first lock can't be granted because of the granted mode of the second
1540 * lock and the second lock can't be granted because it's not first in the
c85d65e9
DT
1541 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1542 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1543 * flag set and return DEMOTED in the lksb flags.
e7fd4179 1544 *
c85d65e9
DT
1545 * Originally, this function detected conv-deadlk in a more limited scope:
1546 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1547 * - if lkb1 was the first entry in the queue (not just earlier), and was
1548 * blocked by the granted mode of lkb2, and there was nothing on the
1549 * granted queue preventing lkb1 from being granted immediately, i.e.
1550 * lkb2 was the only thing preventing lkb1 from being granted.
1551 *
1552 * That second condition meant we'd only say there was conv-deadlk if
1553 * resolving it (by demotion) would lead to the first lock on the convert
1554 * queue being granted right away. It allowed conversion deadlocks to exist
1555 * between locks on the convert queue while they couldn't be granted anyway.
1556 *
1557 * Now, we detect and take action on conversion deadlocks immediately when
1558 * they're created, even if they may not be immediately consequential. If
1559 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1560 * mode that would prevent lkb1's conversion from being granted, we do a
1561 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1562 * I think this means that the lkb_is_ahead condition below should always
1563 * be zero, i.e. there will never be conv-deadlk between two locks that are
1564 * both already on the convert queue.
e7fd4179
DT
1565 */
1566
c85d65e9 1567static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
e7fd4179 1568{
c85d65e9
DT
1569 struct dlm_lkb *lkb1;
1570 int lkb_is_ahead = 0;
e7fd4179 1571
c85d65e9
DT
1572 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1573 if (lkb1 == lkb2) {
1574 lkb_is_ahead = 1;
e7fd4179
DT
1575 continue;
1576 }
1577
c85d65e9
DT
1578 if (!lkb_is_ahead) {
1579 if (!modes_compat(lkb2, lkb1))
1580 return 1;
1581 } else {
1582 if (!modes_compat(lkb2, lkb1) &&
1583 !modes_compat(lkb1, lkb2))
1584 return 1;
1585 }
e7fd4179 1586 }
90135925 1587 return 0;
e7fd4179
DT
1588}
1589
1590/*
1591 * Return 1 if the lock can be granted, 0 otherwise.
1592 * Also detect and resolve conversion deadlocks.
1593 *
1594 * lkb is the lock to be granted
1595 *
1596 * now is 1 if the function is being called in the context of the
1597 * immediate request, it is 0 if called later, after the lock has been
1598 * queued.
1599 *
1600 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1601 */
1602
1603static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1604{
1605 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1606
1607 /*
1608 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1609 * a new request for a NL mode lock being blocked.
1610 *
1611 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1612 * request, then it would be granted. In essence, the use of this flag
1613 * tells the Lock Manager to expedite theis request by not considering
1614 * what may be in the CONVERTING or WAITING queues... As of this
1615 * writing, the EXPEDITE flag can be used only with new requests for NL
1616 * mode locks. This flag is not valid for conversion requests.
1617 *
1618 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1619 * conversion or used with a non-NL requested mode. We also know an
1620 * EXPEDITE request is always granted immediately, so now must always
1621 * be 1. The full condition to grant an expedite request: (now &&
1622 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1623 * therefore be shortened to just checking the flag.
1624 */
1625
1626 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
90135925 1627 return 1;
e7fd4179
DT
1628
1629 /*
1630 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1631 * added to the remaining conditions.
1632 */
1633
1634 if (queue_conflict(&r->res_grantqueue, lkb))
1635 goto out;
1636
1637 /*
1638 * 6-3: By default, a conversion request is immediately granted if the
1639 * requested mode is compatible with the modes of all other granted
1640 * locks
1641 */
1642
1643 if (queue_conflict(&r->res_convertqueue, lkb))
1644 goto out;
1645
1646 /*
1647 * 6-5: But the default algorithm for deciding whether to grant or
1648 * queue conversion requests does not by itself guarantee that such
1649 * requests are serviced on a "first come first serve" basis. This, in
1650 * turn, can lead to a phenomenon known as "indefinate postponement".
1651 *
1652 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1653 * the system service employed to request a lock conversion. This flag
1654 * forces certain conversion requests to be queued, even if they are
1655 * compatible with the granted modes of other locks on the same
1656 * resource. Thus, the use of this flag results in conversion requests
1657 * being ordered on a "first come first servce" basis.
1658 *
1659 * DCT: This condition is all about new conversions being able to occur
1660 * "in place" while the lock remains on the granted queue (assuming
1661 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1662 * doesn't _have_ to go onto the convert queue where it's processed in
1663 * order. The "now" variable is necessary to distinguish converts
1664 * being received and processed for the first time now, because once a
1665 * convert is moved to the conversion queue the condition below applies
1666 * requiring fifo granting.
1667 */
1668
1669 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
90135925 1670 return 1;
e7fd4179
DT
1671
1672 /*
3bcd3687
DT
1673 * The NOORDER flag is set to avoid the standard vms rules on grant
1674 * order.
e7fd4179
DT
1675 */
1676
1677 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
90135925 1678 return 1;
e7fd4179
DT
1679
1680 /*
1681 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1682 * granted until all other conversion requests ahead of it are granted
1683 * and/or canceled.
1684 */
1685
1686 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
90135925 1687 return 1;
e7fd4179
DT
1688
1689 /*
1690 * 6-4: By default, a new request is immediately granted only if all
1691 * three of the following conditions are satisfied when the request is
1692 * issued:
1693 * - The queue of ungranted conversion requests for the resource is
1694 * empty.
1695 * - The queue of ungranted new requests for the resource is empty.
1696 * - The mode of the new request is compatible with the most
1697 * restrictive mode of all granted locks on the resource.
1698 */
1699
1700 if (now && !conv && list_empty(&r->res_convertqueue) &&
1701 list_empty(&r->res_waitqueue))
90135925 1702 return 1;
e7fd4179
DT
1703
1704 /*
1705 * 6-4: Once a lock request is in the queue of ungranted new requests,
1706 * it cannot be granted until the queue of ungranted conversion
1707 * requests is empty, all ungranted new requests ahead of it are
1708 * granted and/or canceled, and it is compatible with the granted mode
1709 * of the most restrictive lock granted on the resource.
1710 */
1711
1712 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1713 first_in_list(lkb, &r->res_waitqueue))
90135925 1714 return 1;
e7fd4179 1715 out:
90135925 1716 return 0;
e7fd4179
DT
1717}
1718
c85d65e9
DT
1719static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1720 int *err)
e7fd4179 1721{
e7fd4179
DT
1722 int rv;
1723 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
c85d65e9
DT
1724 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1725
1726 if (err)
1727 *err = 0;
e7fd4179
DT
1728
1729 rv = _can_be_granted(r, lkb, now);
1730 if (rv)
1731 goto out;
1732
c85d65e9
DT
1733 /*
1734 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1735 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1736 * cancels one of the locks.
1737 */
1738
1739 if (is_convert && can_be_queued(lkb) &&
1740 conversion_deadlock_detect(r, lkb)) {
1741 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1742 lkb->lkb_grmode = DLM_LOCK_NL;
1743 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1744 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1745 if (err)
1746 *err = -EDEADLK;
1747 else {
1748 log_print("can_be_granted deadlock %x now %d",
1749 lkb->lkb_id, now);
1750 dlm_dump_rsb(r);
1751 }
1752 }
e7fd4179 1753 goto out;
c85d65e9 1754 }
e7fd4179 1755
c85d65e9
DT
1756 /*
1757 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1758 * to grant a request in a mode other than the normal rqmode. It's a
1759 * simple way to provide a big optimization to applications that can
1760 * use them.
1761 */
1762
1763 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
e7fd4179 1764 alt = DLM_LOCK_PR;
c85d65e9 1765 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
e7fd4179
DT
1766 alt = DLM_LOCK_CW;
1767
1768 if (alt) {
1769 lkb->lkb_rqmode = alt;
1770 rv = _can_be_granted(r, lkb, now);
1771 if (rv)
1772 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1773 else
1774 lkb->lkb_rqmode = rqmode;
1775 }
1776 out:
1777 return rv;
1778}
1779
c85d65e9
DT
1780/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1781 for locks pending on the convert list. Once verified (watch for these
1782 log_prints), we should be able to just call _can_be_granted() and not
1783 bother with the demote/deadlk cases here (and there's no easy way to deal
1784 with a deadlk here, we'd have to generate something like grant_lock with
1785 the deadlk error.) */
1786
36509258
DT
1787/* Returns the highest requested mode of all blocked conversions; sets
1788 cw if there's a blocked conversion to DLM_LOCK_CW. */
c85d65e9 1789
36509258 1790static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1791{
1792 struct dlm_lkb *lkb, *s;
1793 int hi, demoted, quit, grant_restart, demote_restart;
c85d65e9 1794 int deadlk;
e7fd4179
DT
1795
1796 quit = 0;
1797 restart:
1798 grant_restart = 0;
1799 demote_restart = 0;
1800 hi = DLM_LOCK_IV;
1801
1802 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1803 demoted = is_demoted(lkb);
c85d65e9
DT
1804 deadlk = 0;
1805
1806 if (can_be_granted(r, lkb, 0, &deadlk)) {
e7fd4179
DT
1807 grant_lock_pending(r, lkb);
1808 grant_restart = 1;
c85d65e9 1809 continue;
e7fd4179 1810 }
c85d65e9
DT
1811
1812 if (!demoted && is_demoted(lkb)) {
1813 log_print("WARN: pending demoted %x node %d %s",
1814 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1815 demote_restart = 1;
1816 continue;
1817 }
1818
1819 if (deadlk) {
1820 log_print("WARN: pending deadlock %x node %d %s",
1821 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1822 dlm_dump_rsb(r);
1823 continue;
1824 }
1825
1826 hi = max_t(int, lkb->lkb_rqmode, hi);
36509258
DT
1827
1828 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1829 *cw = 1;
e7fd4179
DT
1830 }
1831
1832 if (grant_restart)
1833 goto restart;
1834 if (demote_restart && !quit) {
1835 quit = 1;
1836 goto restart;
1837 }
1838
1839 return max_t(int, high, hi);
1840}
1841
36509258 1842static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
e7fd4179
DT
1843{
1844 struct dlm_lkb *lkb, *s;
1845
1846 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
c85d65e9 1847 if (can_be_granted(r, lkb, 0, NULL))
e7fd4179 1848 grant_lock_pending(r, lkb);
36509258 1849 else {
e7fd4179 1850 high = max_t(int, lkb->lkb_rqmode, high);
36509258
DT
1851 if (lkb->lkb_rqmode == DLM_LOCK_CW)
1852 *cw = 1;
1853 }
e7fd4179
DT
1854 }
1855
1856 return high;
1857}
1858
36509258
DT
1859/* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1860 on either the convert or waiting queue.
1861 high is the largest rqmode of all locks blocked on the convert or
1862 waiting queue. */
1863
1864static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1865{
1866 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1867 if (gr->lkb_highbast < DLM_LOCK_EX)
1868 return 1;
1869 return 0;
1870 }
1871
1872 if (gr->lkb_highbast < high &&
1873 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1874 return 1;
1875 return 0;
1876}
1877
e7fd4179
DT
1878static void grant_pending_locks(struct dlm_rsb *r)
1879{
1880 struct dlm_lkb *lkb, *s;
1881 int high = DLM_LOCK_IV;
36509258 1882 int cw = 0;
e7fd4179 1883
a345da3e 1884 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
e7fd4179 1885
36509258
DT
1886 high = grant_pending_convert(r, high, &cw);
1887 high = grant_pending_wait(r, high, &cw);
e7fd4179
DT
1888
1889 if (high == DLM_LOCK_IV)
1890 return;
1891
1892 /*
1893 * If there are locks left on the wait/convert queue then send blocking
1894 * ASTs to granted locks based on the largest requested mode (high)
36509258 1895 * found above.
e7fd4179
DT
1896 */
1897
1898 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
e5dae548 1899 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
329fc4c3
DT
1900 if (cw && high == DLM_LOCK_PR &&
1901 lkb->lkb_grmode == DLM_LOCK_PR)
36509258
DT
1902 queue_bast(r, lkb, DLM_LOCK_CW);
1903 else
1904 queue_bast(r, lkb, high);
e7fd4179
DT
1905 lkb->lkb_highbast = high;
1906 }
1907 }
1908}
1909
36509258
DT
1910static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1911{
1912 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1913 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1914 if (gr->lkb_highbast < DLM_LOCK_EX)
1915 return 1;
1916 return 0;
1917 }
1918
1919 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1920 return 1;
1921 return 0;
1922}
1923
e7fd4179
DT
1924static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1925 struct dlm_lkb *lkb)
1926{
1927 struct dlm_lkb *gr;
1928
1929 list_for_each_entry(gr, head, lkb_statequeue) {
314dd2a0
SW
1930 /* skip self when sending basts to convertqueue */
1931 if (gr == lkb)
1932 continue;
e5dae548 1933 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
e7fd4179
DT
1934 queue_bast(r, gr, lkb->lkb_rqmode);
1935 gr->lkb_highbast = lkb->lkb_rqmode;
1936 }
1937 }
1938}
1939
1940static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1941{
1942 send_bast_queue(r, &r->res_grantqueue, lkb);
1943}
1944
1945static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1946{
1947 send_bast_queue(r, &r->res_grantqueue, lkb);
1948 send_bast_queue(r, &r->res_convertqueue, lkb);
1949}
1950
1951/* set_master(r, lkb) -- set the master nodeid of a resource
1952
1953 The purpose of this function is to set the nodeid field in the given
1954 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1955 known, it can just be copied to the lkb and the function will return
1956 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1957 before it can be copied to the lkb.
1958
1959 When the rsb nodeid is being looked up remotely, the initial lkb
1960 causing the lookup is kept on the ls_waiters list waiting for the
1961 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1962 on the rsb's res_lookup list until the master is verified.
1963
1964 Return values:
1965 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1966 1: the rsb master is not available and the lkb has been placed on
1967 a wait queue
1968*/
1969
1970static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1971{
1972 struct dlm_ls *ls = r->res_ls;
755b5eb8 1973 int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
e7fd4179
DT
1974
1975 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1976 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1977 r->res_first_lkid = lkb->lkb_id;
1978 lkb->lkb_nodeid = r->res_nodeid;
1979 return 0;
1980 }
1981
1982 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1983 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1984 return 1;
1985 }
1986
1987 if (r->res_nodeid == 0) {
1988 lkb->lkb_nodeid = 0;
1989 return 0;
1990 }
1991
1992 if (r->res_nodeid > 0) {
1993 lkb->lkb_nodeid = r->res_nodeid;
1994 return 0;
1995 }
1996
a345da3e 1997 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
e7fd4179
DT
1998
1999 dir_nodeid = dlm_dir_nodeid(r);
2000
2001 if (dir_nodeid != our_nodeid) {
2002 r->res_first_lkid = lkb->lkb_id;
2003 send_lookup(r, lkb);
2004 return 1;
2005 }
2006
755b5eb8 2007 for (i = 0; i < 2; i++) {
e7fd4179
DT
2008 /* It's possible for dlm_scand to remove an old rsb for
2009 this same resource from the toss list, us to create
2010 a new one, look up the master locally, and find it
2011 already exists just before dlm_scand does the
2012 dir_remove() on the previous rsb. */
2013
2014 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2015 r->res_length, &ret_nodeid);
2016 if (!error)
2017 break;
2018 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2019 schedule();
2020 }
755b5eb8
DT
2021 if (error && error != -EEXIST)
2022 return error;
e7fd4179
DT
2023
2024 if (ret_nodeid == our_nodeid) {
2025 r->res_first_lkid = 0;
2026 r->res_nodeid = 0;
2027 lkb->lkb_nodeid = 0;
2028 } else {
2029 r->res_first_lkid = lkb->lkb_id;
2030 r->res_nodeid = ret_nodeid;
2031 lkb->lkb_nodeid = ret_nodeid;
2032 }
2033 return 0;
2034}
2035
2036static void process_lookup_list(struct dlm_rsb *r)
2037{
2038 struct dlm_lkb *lkb, *safe;
2039
2040 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
ef0c2bb0 2041 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2042 _request_lock(r, lkb);
2043 schedule();
2044 }
2045}
2046
2047/* confirm_master -- confirm (or deny) an rsb's master nodeid */
2048
2049static void confirm_master(struct dlm_rsb *r, int error)
2050{
2051 struct dlm_lkb *lkb;
2052
2053 if (!r->res_first_lkid)
2054 return;
2055
2056 switch (error) {
2057 case 0:
2058 case -EINPROGRESS:
2059 r->res_first_lkid = 0;
2060 process_lookup_list(r);
2061 break;
2062
2063 case -EAGAIN:
aec64e1b
DT
2064 case -EBADR:
2065 case -ENOTBLK:
2066 /* the remote request failed and won't be retried (it was
2067 a NOQUEUE, or has been canceled/unlocked); make a waiting
2068 lkb the first_lkid */
e7fd4179
DT
2069
2070 r->res_first_lkid = 0;
2071
2072 if (!list_empty(&r->res_lookup)) {
2073 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2074 lkb_rsb_lookup);
ef0c2bb0 2075 list_del_init(&lkb->lkb_rsb_lookup);
e7fd4179
DT
2076 r->res_first_lkid = lkb->lkb_id;
2077 _request_lock(r, lkb);
761b9d3f 2078 }
e7fd4179
DT
2079 break;
2080
2081 default:
2082 log_error(r->res_ls, "confirm_master unknown error %d", error);
2083 }
2084}
2085
2086static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
e5dae548
DT
2087 int namelen, unsigned long timeout_cs,
2088 void (*ast) (void *astparam),
2089 void *astparam,
2090 void (*bast) (void *astparam, int mode),
2091 struct dlm_args *args)
e7fd4179
DT
2092{
2093 int rv = -EINVAL;
2094
2095 /* check for invalid arg usage */
2096
2097 if (mode < 0 || mode > DLM_LOCK_EX)
2098 goto out;
2099
2100 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2101 goto out;
2102
2103 if (flags & DLM_LKF_CANCEL)
2104 goto out;
2105
2106 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2107 goto out;
2108
2109 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2110 goto out;
2111
2112 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2113 goto out;
2114
2115 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2116 goto out;
2117
2118 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2119 goto out;
2120
2121 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2122 goto out;
2123
2124 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2125 goto out;
2126
2127 if (!ast || !lksb)
2128 goto out;
2129
2130 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2131 goto out;
2132
e7fd4179
DT
2133 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2134 goto out;
2135
2136 /* these args will be copied to the lkb in validate_lock_args,
2137 it cannot be done now because when converting locks, fields in
2138 an active lkb cannot be modified before locking the rsb */
2139
2140 args->flags = flags;
e5dae548
DT
2141 args->astfn = ast;
2142 args->astparam = astparam;
2143 args->bastfn = bast;
d7db923e 2144 args->timeout = timeout_cs;
e7fd4179
DT
2145 args->mode = mode;
2146 args->lksb = lksb;
e7fd4179
DT
2147 rv = 0;
2148 out:
2149 return rv;
2150}
2151
2152static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2153{
2154 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2155 DLM_LKF_FORCEUNLOCK))
2156 return -EINVAL;
2157
ef0c2bb0
DT
2158 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2159 return -EINVAL;
2160
e7fd4179 2161 args->flags = flags;
e5dae548 2162 args->astparam = astarg;
e7fd4179
DT
2163 return 0;
2164}
2165
2166static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2167 struct dlm_args *args)
2168{
2169 int rv = -EINVAL;
2170
2171 if (args->flags & DLM_LKF_CONVERT) {
2172 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2173 goto out;
2174
2175 if (args->flags & DLM_LKF_QUECVT &&
2176 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2177 goto out;
2178
2179 rv = -EBUSY;
2180 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2181 goto out;
2182
2183 if (lkb->lkb_wait_type)
2184 goto out;
ef0c2bb0
DT
2185
2186 if (is_overlap(lkb))
2187 goto out;
e7fd4179
DT
2188 }
2189
2190 lkb->lkb_exflags = args->flags;
2191 lkb->lkb_sbflags = 0;
e5dae548 2192 lkb->lkb_astfn = args->astfn;
e7fd4179 2193 lkb->lkb_astparam = args->astparam;
e5dae548 2194 lkb->lkb_bastfn = args->bastfn;
e7fd4179
DT
2195 lkb->lkb_rqmode = args->mode;
2196 lkb->lkb_lksb = args->lksb;
2197 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2198 lkb->lkb_ownpid = (int) current->pid;
d7db923e 2199 lkb->lkb_timeout_cs = args->timeout;
e7fd4179
DT
2200 rv = 0;
2201 out:
43279e53
DT
2202 if (rv)
2203 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2204 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2205 lkb->lkb_status, lkb->lkb_wait_type,
2206 lkb->lkb_resource->res_name);
e7fd4179
DT
2207 return rv;
2208}
2209
ef0c2bb0
DT
2210/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2211 for success */
2212
2213/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2214 because there may be a lookup in progress and it's valid to do
2215 cancel/unlockf on it */
2216
e7fd4179
DT
2217static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2218{
ef0c2bb0 2219 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
e7fd4179
DT
2220 int rv = -EINVAL;
2221
ef0c2bb0
DT
2222 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2223 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2224 dlm_print_lkb(lkb);
e7fd4179 2225 goto out;
ef0c2bb0 2226 }
e7fd4179 2227
ef0c2bb0
DT
2228 /* an lkb may still exist even though the lock is EOL'ed due to a
2229 cancel, unlock or failed noqueue request; an app can't use these
2230 locks; return same error as if the lkid had not been found at all */
e7fd4179 2231
ef0c2bb0
DT
2232 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2233 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2234 rv = -ENOENT;
e7fd4179 2235 goto out;
ef0c2bb0 2236 }
e7fd4179 2237
ef0c2bb0
DT
2238 /* an lkb may be waiting for an rsb lookup to complete where the
2239 lookup was initiated by another lock */
2240
42dc1601
DT
2241 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2242 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
ef0c2bb0
DT
2243 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2244 list_del_init(&lkb->lkb_rsb_lookup);
2245 queue_cast(lkb->lkb_resource, lkb,
2246 args->flags & DLM_LKF_CANCEL ?
2247 -DLM_ECANCEL : -DLM_EUNLOCK);
2248 unhold_lkb(lkb); /* undoes create_lkb() */
ef0c2bb0 2249 }
42dc1601
DT
2250 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2251 rv = -EBUSY;
2252 goto out;
ef0c2bb0
DT
2253 }
2254
2255 /* cancel not allowed with another cancel/unlock in progress */
2256
2257 if (args->flags & DLM_LKF_CANCEL) {
2258 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2259 goto out;
2260
2261 if (is_overlap(lkb))
2262 goto out;
2263
3ae1acf9
DT
2264 /* don't let scand try to do a cancel */
2265 del_timeout(lkb);
2266
ef0c2bb0
DT
2267 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2268 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2269 rv = -EBUSY;
2270 goto out;
2271 }
2272
a536e381
DT
2273 /* there's nothing to cancel */
2274 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2275 !lkb->lkb_wait_type) {
2276 rv = -EBUSY;
2277 goto out;
2278 }
2279
ef0c2bb0
DT
2280 switch (lkb->lkb_wait_type) {
2281 case DLM_MSG_LOOKUP:
2282 case DLM_MSG_REQUEST:
2283 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2284 rv = -EBUSY;
2285 goto out;
2286 case DLM_MSG_UNLOCK:
2287 case DLM_MSG_CANCEL:
2288 goto out;
2289 }
2290 /* add_to_waiters() will set OVERLAP_CANCEL */
2291 goto out_ok;
2292 }
2293
2294 /* do we need to allow a force-unlock if there's a normal unlock
2295 already in progress? in what conditions could the normal unlock
2296 fail such that we'd want to send a force-unlock to be sure? */
2297
2298 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2299 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2300 goto out;
2301
2302 if (is_overlap_unlock(lkb))
2303 goto out;
e7fd4179 2304
3ae1acf9
DT
2305 /* don't let scand try to do a cancel */
2306 del_timeout(lkb);
2307
ef0c2bb0
DT
2308 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2309 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2310 rv = -EBUSY;
2311 goto out;
2312 }
2313
2314 switch (lkb->lkb_wait_type) {
2315 case DLM_MSG_LOOKUP:
2316 case DLM_MSG_REQUEST:
2317 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2318 rv = -EBUSY;
2319 goto out;
2320 case DLM_MSG_UNLOCK:
2321 goto out;
2322 }
2323 /* add_to_waiters() will set OVERLAP_UNLOCK */
2324 goto out_ok;
2325 }
2326
2327 /* normal unlock not allowed if there's any op in progress */
e7fd4179 2328 rv = -EBUSY;
ef0c2bb0 2329 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
e7fd4179
DT
2330 goto out;
2331
2332 out_ok:
ef0c2bb0
DT
2333 /* an overlapping op shouldn't blow away exflags from other op */
2334 lkb->lkb_exflags |= args->flags;
e7fd4179
DT
2335 lkb->lkb_sbflags = 0;
2336 lkb->lkb_astparam = args->astparam;
e7fd4179
DT
2337 rv = 0;
2338 out:
ef0c2bb0
DT
2339 if (rv)
2340 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2341 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2342 args->flags, lkb->lkb_wait_type,
2343 lkb->lkb_resource->res_name);
e7fd4179
DT
2344 return rv;
2345}
2346
2347/*
2348 * Four stage 4 varieties:
2349 * do_request(), do_convert(), do_unlock(), do_cancel()
2350 * These are called on the master node for the given lock and
2351 * from the central locking logic.
2352 */
2353
2354static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2355{
2356 int error = 0;
2357
c85d65e9 2358 if (can_be_granted(r, lkb, 1, NULL)) {
e7fd4179
DT
2359 grant_lock(r, lkb);
2360 queue_cast(r, lkb, 0);
2361 goto out;
2362 }
2363
2364 if (can_be_queued(lkb)) {
2365 error = -EINPROGRESS;
2366 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9 2367 add_timeout(lkb);
e7fd4179
DT
2368 goto out;
2369 }
2370
2371 error = -EAGAIN;
e7fd4179 2372 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2373 out:
2374 return error;
2375}
2376
cf6620ac
DT
2377static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2378 int error)
2379{
2380 switch (error) {
2381 case -EAGAIN:
2382 if (force_blocking_asts(lkb))
2383 send_blocking_asts_all(r, lkb);
2384 break;
2385 case -EINPROGRESS:
2386 send_blocking_asts(r, lkb);
2387 break;
2388 }
2389}
2390
e7fd4179
DT
2391static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2392{
2393 int error = 0;
c85d65e9 2394 int deadlk = 0;
e7fd4179
DT
2395
2396 /* changing an existing lock may allow others to be granted */
2397
c85d65e9 2398 if (can_be_granted(r, lkb, 1, &deadlk)) {
e7fd4179
DT
2399 grant_lock(r, lkb);
2400 queue_cast(r, lkb, 0);
e7fd4179
DT
2401 goto out;
2402 }
2403
c85d65e9
DT
2404 /* can_be_granted() detected that this lock would block in a conversion
2405 deadlock, so we leave it on the granted queue and return EDEADLK in
2406 the ast for the convert. */
2407
2408 if (deadlk) {
2409 /* it's left on the granted queue */
2410 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2411 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2412 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2413 revert_lock(r, lkb);
2414 queue_cast(r, lkb, -EDEADLK);
2415 error = -EDEADLK;
2416 goto out;
2417 }
2418
7d3c1feb
DT
2419 /* is_demoted() means the can_be_granted() above set the grmode
2420 to NL, and left us on the granted queue. This auto-demotion
2421 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2422 now grantable. We have to try to grant other converting locks
2423 before we try again to grant this one. */
2424
2425 if (is_demoted(lkb)) {
36509258 2426 grant_pending_convert(r, DLM_LOCK_IV, NULL);
7d3c1feb
DT
2427 if (_can_be_granted(r, lkb, 1)) {
2428 grant_lock(r, lkb);
2429 queue_cast(r, lkb, 0);
7d3c1feb
DT
2430 goto out;
2431 }
2432 /* else fall through and move to convert queue */
2433 }
2434
2435 if (can_be_queued(lkb)) {
e7fd4179
DT
2436 error = -EINPROGRESS;
2437 del_lkb(r, lkb);
2438 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 2439 add_timeout(lkb);
e7fd4179
DT
2440 goto out;
2441 }
2442
2443 error = -EAGAIN;
e7fd4179 2444 queue_cast(r, lkb, -EAGAIN);
e7fd4179
DT
2445 out:
2446 return error;
2447}
2448
cf6620ac
DT
2449static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2450 int error)
2451{
2452 switch (error) {
2453 case 0:
2454 grant_pending_locks(r);
2455 /* grant_pending_locks also sends basts */
2456 break;
2457 case -EAGAIN:
2458 if (force_blocking_asts(lkb))
2459 send_blocking_asts_all(r, lkb);
2460 break;
2461 case -EINPROGRESS:
2462 send_blocking_asts(r, lkb);
2463 break;
2464 }
2465}
2466
e7fd4179
DT
2467static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2468{
2469 remove_lock(r, lkb);
2470 queue_cast(r, lkb, -DLM_EUNLOCK);
e7fd4179
DT
2471 return -DLM_EUNLOCK;
2472}
2473
cf6620ac
DT
2474static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2475 int error)
2476{
2477 grant_pending_locks(r);
2478}
2479
ef0c2bb0 2480/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
907b9bce 2481
e7fd4179
DT
2482static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2483{
ef0c2bb0
DT
2484 int error;
2485
2486 error = revert_lock(r, lkb);
2487 if (error) {
2488 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
2489 return -DLM_ECANCEL;
2490 }
2491 return 0;
e7fd4179
DT
2492}
2493
cf6620ac
DT
2494static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2495 int error)
2496{
2497 if (error)
2498 grant_pending_locks(r);
2499}
2500
e7fd4179
DT
2501/*
2502 * Four stage 3 varieties:
2503 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2504 */
2505
2506/* add a new lkb to a possibly new rsb, called by requesting process */
2507
2508static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2509{
2510 int error;
2511
2512 /* set_master: sets lkb nodeid from r */
2513
2514 error = set_master(r, lkb);
2515 if (error < 0)
2516 goto out;
2517 if (error) {
2518 error = 0;
2519 goto out;
2520 }
2521
cf6620ac 2522 if (is_remote(r)) {
e7fd4179
DT
2523 /* receive_request() calls do_request() on remote node */
2524 error = send_request(r, lkb);
cf6620ac 2525 } else {
e7fd4179 2526 error = do_request(r, lkb);
cf6620ac
DT
2527 /* for remote locks the request_reply is sent
2528 between do_request and do_request_effects */
2529 do_request_effects(r, lkb, error);
2530 }
e7fd4179
DT
2531 out:
2532 return error;
2533}
2534
3bcd3687 2535/* change some property of an existing lkb, e.g. mode */
e7fd4179
DT
2536
2537static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2538{
2539 int error;
2540
cf6620ac 2541 if (is_remote(r)) {
e7fd4179
DT
2542 /* receive_convert() calls do_convert() on remote node */
2543 error = send_convert(r, lkb);
cf6620ac 2544 } else {
e7fd4179 2545 error = do_convert(r, lkb);
cf6620ac
DT
2546 /* for remote locks the convert_reply is sent
2547 between do_convert and do_convert_effects */
2548 do_convert_effects(r, lkb, error);
2549 }
e7fd4179
DT
2550
2551 return error;
2552}
2553
2554/* remove an existing lkb from the granted queue */
2555
2556static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2557{
2558 int error;
2559
cf6620ac 2560 if (is_remote(r)) {
e7fd4179
DT
2561 /* receive_unlock() calls do_unlock() on remote node */
2562 error = send_unlock(r, lkb);
cf6620ac 2563 } else {
e7fd4179 2564 error = do_unlock(r, lkb);
cf6620ac
DT
2565 /* for remote locks the unlock_reply is sent
2566 between do_unlock and do_unlock_effects */
2567 do_unlock_effects(r, lkb, error);
2568 }
e7fd4179
DT
2569
2570 return error;
2571}
2572
2573/* remove an existing lkb from the convert or wait queue */
2574
2575static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2576{
2577 int error;
2578
cf6620ac 2579 if (is_remote(r)) {
e7fd4179
DT
2580 /* receive_cancel() calls do_cancel() on remote node */
2581 error = send_cancel(r, lkb);
cf6620ac 2582 } else {
e7fd4179 2583 error = do_cancel(r, lkb);
cf6620ac
DT
2584 /* for remote locks the cancel_reply is sent
2585 between do_cancel and do_cancel_effects */
2586 do_cancel_effects(r, lkb, error);
2587 }
e7fd4179
DT
2588
2589 return error;
2590}
2591
2592/*
2593 * Four stage 2 varieties:
2594 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2595 */
2596
2597static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2598 int len, struct dlm_args *args)
2599{
2600 struct dlm_rsb *r;
2601 int error;
2602
2603 error = validate_lock_args(ls, lkb, args);
2604 if (error)
2605 goto out;
2606
2607 error = find_rsb(ls, name, len, R_CREATE, &r);
2608 if (error)
2609 goto out;
2610
2611 lock_rsb(r);
2612
2613 attach_lkb(r, lkb);
2614 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2615
2616 error = _request_lock(r, lkb);
2617
2618 unlock_rsb(r);
2619 put_rsb(r);
2620
2621 out:
2622 return error;
2623}
2624
2625static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2626 struct dlm_args *args)
2627{
2628 struct dlm_rsb *r;
2629 int error;
2630
2631 r = lkb->lkb_resource;
2632
2633 hold_rsb(r);
2634 lock_rsb(r);
2635
2636 error = validate_lock_args(ls, lkb, args);
2637 if (error)
2638 goto out;
2639
2640 error = _convert_lock(r, lkb);
2641 out:
2642 unlock_rsb(r);
2643 put_rsb(r);
2644 return error;
2645}
2646
2647static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2648 struct dlm_args *args)
2649{
2650 struct dlm_rsb *r;
2651 int error;
2652
2653 r = lkb->lkb_resource;
2654
2655 hold_rsb(r);
2656 lock_rsb(r);
2657
2658 error = validate_unlock_args(lkb, args);
2659 if (error)
2660 goto out;
2661
2662 error = _unlock_lock(r, lkb);
2663 out:
2664 unlock_rsb(r);
2665 put_rsb(r);
2666 return error;
2667}
2668
2669static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2670 struct dlm_args *args)
2671{
2672 struct dlm_rsb *r;
2673 int error;
2674
2675 r = lkb->lkb_resource;
2676
2677 hold_rsb(r);
2678 lock_rsb(r);
2679
2680 error = validate_unlock_args(lkb, args);
2681 if (error)
2682 goto out;
2683
2684 error = _cancel_lock(r, lkb);
2685 out:
2686 unlock_rsb(r);
2687 put_rsb(r);
2688 return error;
2689}
2690
2691/*
2692 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2693 */
2694
2695int dlm_lock(dlm_lockspace_t *lockspace,
2696 int mode,
2697 struct dlm_lksb *lksb,
2698 uint32_t flags,
2699 void *name,
2700 unsigned int namelen,
2701 uint32_t parent_lkid,
2702 void (*ast) (void *astarg),
2703 void *astarg,
3bcd3687 2704 void (*bast) (void *astarg, int mode))
e7fd4179
DT
2705{
2706 struct dlm_ls *ls;
2707 struct dlm_lkb *lkb;
2708 struct dlm_args args;
2709 int error, convert = flags & DLM_LKF_CONVERT;
2710
2711 ls = dlm_find_lockspace_local(lockspace);
2712 if (!ls)
2713 return -EINVAL;
2714
85e86edf 2715 dlm_lock_recovery(ls);
e7fd4179
DT
2716
2717 if (convert)
2718 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2719 else
2720 error = create_lkb(ls, &lkb);
2721
2722 if (error)
2723 goto out;
2724
d7db923e 2725 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3bcd3687 2726 astarg, bast, &args);
e7fd4179
DT
2727 if (error)
2728 goto out_put;
2729
2730 if (convert)
2731 error = convert_lock(ls, lkb, &args);
2732 else
2733 error = request_lock(ls, lkb, name, namelen, &args);
2734
2735 if (error == -EINPROGRESS)
2736 error = 0;
2737 out_put:
2738 if (convert || error)
b3f58d8f 2739 __put_lkb(ls, lkb);
c85d65e9 2740 if (error == -EAGAIN || error == -EDEADLK)
e7fd4179
DT
2741 error = 0;
2742 out:
85e86edf 2743 dlm_unlock_recovery(ls);
e7fd4179
DT
2744 dlm_put_lockspace(ls);
2745 return error;
2746}
2747
2748int dlm_unlock(dlm_lockspace_t *lockspace,
2749 uint32_t lkid,
2750 uint32_t flags,
2751 struct dlm_lksb *lksb,
2752 void *astarg)
2753{
2754 struct dlm_ls *ls;
2755 struct dlm_lkb *lkb;
2756 struct dlm_args args;
2757 int error;
2758
2759 ls = dlm_find_lockspace_local(lockspace);
2760 if (!ls)
2761 return -EINVAL;
2762
85e86edf 2763 dlm_lock_recovery(ls);
e7fd4179
DT
2764
2765 error = find_lkb(ls, lkid, &lkb);
2766 if (error)
2767 goto out;
2768
2769 error = set_unlock_args(flags, astarg, &args);
2770 if (error)
2771 goto out_put;
2772
2773 if (flags & DLM_LKF_CANCEL)
2774 error = cancel_lock(ls, lkb, &args);
2775 else
2776 error = unlock_lock(ls, lkb, &args);
2777
2778 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2779 error = 0;
ef0c2bb0
DT
2780 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2781 error = 0;
e7fd4179 2782 out_put:
b3f58d8f 2783 dlm_put_lkb(lkb);
e7fd4179 2784 out:
85e86edf 2785 dlm_unlock_recovery(ls);
e7fd4179
DT
2786 dlm_put_lockspace(ls);
2787 return error;
2788}
2789
2790/*
2791 * send/receive routines for remote operations and replies
2792 *
2793 * send_args
2794 * send_common
2795 * send_request receive_request
2796 * send_convert receive_convert
2797 * send_unlock receive_unlock
2798 * send_cancel receive_cancel
2799 * send_grant receive_grant
2800 * send_bast receive_bast
2801 * send_lookup receive_lookup
2802 * send_remove receive_remove
2803 *
2804 * send_common_reply
2805 * receive_request_reply send_request_reply
2806 * receive_convert_reply send_convert_reply
2807 * receive_unlock_reply send_unlock_reply
2808 * receive_cancel_reply send_cancel_reply
2809 * receive_lookup_reply send_lookup_reply
2810 */
2811
7e4dac33
DT
2812static int _create_message(struct dlm_ls *ls, int mb_len,
2813 int to_nodeid, int mstype,
2814 struct dlm_message **ms_ret,
2815 struct dlm_mhandle **mh_ret)
e7fd4179
DT
2816{
2817 struct dlm_message *ms;
2818 struct dlm_mhandle *mh;
2819 char *mb;
e7fd4179
DT
2820
2821 /* get_buffer gives us a message handle (mh) that we need to
2822 pass into lowcomms_commit and a message buffer (mb) that we
2823 write our data into */
2824
573c24c4 2825 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
e7fd4179
DT
2826 if (!mh)
2827 return -ENOBUFS;
2828
2829 memset(mb, 0, mb_len);
2830
2831 ms = (struct dlm_message *) mb;
2832
2833 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
7e4dac33 2834 ms->m_header.h_lockspace = ls->ls_global_id;
e7fd4179
DT
2835 ms->m_header.h_nodeid = dlm_our_nodeid();
2836 ms->m_header.h_length = mb_len;
2837 ms->m_header.h_cmd = DLM_MSG;
2838
2839 ms->m_type = mstype;
2840
2841 *mh_ret = mh;
2842 *ms_ret = ms;
2843 return 0;
2844}
2845
7e4dac33
DT
2846static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2847 int to_nodeid, int mstype,
2848 struct dlm_message **ms_ret,
2849 struct dlm_mhandle **mh_ret)
2850{
2851 int mb_len = sizeof(struct dlm_message);
2852
2853 switch (mstype) {
2854 case DLM_MSG_REQUEST:
2855 case DLM_MSG_LOOKUP:
2856 case DLM_MSG_REMOVE:
2857 mb_len += r->res_length;
2858 break;
2859 case DLM_MSG_CONVERT:
2860 case DLM_MSG_UNLOCK:
2861 case DLM_MSG_REQUEST_REPLY:
2862 case DLM_MSG_CONVERT_REPLY:
2863 case DLM_MSG_GRANT:
2864 if (lkb && lkb->lkb_lvbptr)
2865 mb_len += r->res_ls->ls_lvblen;
2866 break;
2867 }
2868
2869 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2870 ms_ret, mh_ret);
2871}
2872
e7fd4179
DT
2873/* further lowcomms enhancements or alternate implementations may make
2874 the return value from this function useful at some point */
2875
2876static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2877{
2878 dlm_message_out(ms);
2879 dlm_lowcomms_commit_buffer(mh);
2880 return 0;
2881}
2882
2883static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2884 struct dlm_message *ms)
2885{
2886 ms->m_nodeid = lkb->lkb_nodeid;
2887 ms->m_pid = lkb->lkb_ownpid;
2888 ms->m_lkid = lkb->lkb_id;
2889 ms->m_remid = lkb->lkb_remid;
2890 ms->m_exflags = lkb->lkb_exflags;
2891 ms->m_sbflags = lkb->lkb_sbflags;
2892 ms->m_flags = lkb->lkb_flags;
2893 ms->m_lvbseq = lkb->lkb_lvbseq;
2894 ms->m_status = lkb->lkb_status;
2895 ms->m_grmode = lkb->lkb_grmode;
2896 ms->m_rqmode = lkb->lkb_rqmode;
2897 ms->m_hash = r->res_hash;
2898
2899 /* m_result and m_bastmode are set from function args,
2900 not from lkb fields */
2901
e5dae548 2902 if (lkb->lkb_bastfn)
8304d6f2 2903 ms->m_asts |= DLM_CB_BAST;
e5dae548 2904 if (lkb->lkb_astfn)
8304d6f2 2905 ms->m_asts |= DLM_CB_CAST;
e7fd4179 2906
da49f36f
DT
2907 /* compare with switch in create_message; send_remove() doesn't
2908 use send_args() */
e7fd4179 2909
da49f36f
DT
2910 switch (ms->m_type) {
2911 case DLM_MSG_REQUEST:
2912 case DLM_MSG_LOOKUP:
2913 memcpy(ms->m_extra, r->res_name, r->res_length);
2914 break;
2915 case DLM_MSG_CONVERT:
2916 case DLM_MSG_UNLOCK:
2917 case DLM_MSG_REQUEST_REPLY:
2918 case DLM_MSG_CONVERT_REPLY:
2919 case DLM_MSG_GRANT:
2920 if (!lkb->lkb_lvbptr)
2921 break;
e7fd4179 2922 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
da49f36f
DT
2923 break;
2924 }
e7fd4179
DT
2925}
2926
2927static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2928{
2929 struct dlm_message *ms;
2930 struct dlm_mhandle *mh;
2931 int to_nodeid, error;
2932
c6ff669b
DT
2933 to_nodeid = r->res_nodeid;
2934
2935 error = add_to_waiters(lkb, mstype, to_nodeid);
ef0c2bb0
DT
2936 if (error)
2937 return error;
e7fd4179 2938
e7fd4179
DT
2939 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2940 if (error)
2941 goto fail;
2942
2943 send_args(r, lkb, ms);
2944
2945 error = send_message(mh, ms);
2946 if (error)
2947 goto fail;
2948 return 0;
2949
2950 fail:
ef0c2bb0 2951 remove_from_waiters(lkb, msg_reply_type(mstype));
e7fd4179
DT
2952 return error;
2953}
2954
2955static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2956{
2957 return send_common(r, lkb, DLM_MSG_REQUEST);
2958}
2959
2960static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2961{
2962 int error;
2963
2964 error = send_common(r, lkb, DLM_MSG_CONVERT);
2965
2966 /* down conversions go without a reply from the master */
2967 if (!error && down_conversion(lkb)) {
ef0c2bb0
DT
2968 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2969 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 2970 r->res_ls->ls_stub_ms.m_result = 0;
32f105a1 2971 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
e7fd4179
DT
2972 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2973 }
2974
2975 return error;
2976}
2977
2978/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2979 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2980 that the master is still correct. */
2981
2982static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2983{
2984 return send_common(r, lkb, DLM_MSG_UNLOCK);
2985}
2986
2987static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2988{
2989 return send_common(r, lkb, DLM_MSG_CANCEL);
2990}
2991
2992static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2993{
2994 struct dlm_message *ms;
2995 struct dlm_mhandle *mh;
2996 int to_nodeid, error;
2997
2998 to_nodeid = lkb->lkb_nodeid;
2999
3000 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3001 if (error)
3002 goto out;
3003
3004 send_args(r, lkb, ms);
3005
3006 ms->m_result = 0;
3007
3008 error = send_message(mh, ms);
3009 out:
3010 return error;
3011}
3012
3013static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3014{
3015 struct dlm_message *ms;
3016 struct dlm_mhandle *mh;
3017 int to_nodeid, error;
3018
3019 to_nodeid = lkb->lkb_nodeid;
3020
3021 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3022 if (error)
3023 goto out;
3024
3025 send_args(r, lkb, ms);
3026
3027 ms->m_bastmode = mode;
3028
3029 error = send_message(mh, ms);
3030 out:
3031 return error;
3032}
3033
3034static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3035{
3036 struct dlm_message *ms;
3037 struct dlm_mhandle *mh;
3038 int to_nodeid, error;
3039
c6ff669b
DT
3040 to_nodeid = dlm_dir_nodeid(r);
3041
3042 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
ef0c2bb0
DT
3043 if (error)
3044 return error;
e7fd4179 3045
e7fd4179
DT
3046 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3047 if (error)
3048 goto fail;
3049
3050 send_args(r, lkb, ms);
3051
3052 error = send_message(mh, ms);
3053 if (error)
3054 goto fail;
3055 return 0;
3056
3057 fail:
ef0c2bb0 3058 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
e7fd4179
DT
3059 return error;
3060}
3061
3062static int send_remove(struct dlm_rsb *r)
3063{
3064 struct dlm_message *ms;
3065 struct dlm_mhandle *mh;
3066 int to_nodeid, error;
3067
3068 to_nodeid = dlm_dir_nodeid(r);
3069
3070 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3071 if (error)
3072 goto out;
3073
3074 memcpy(ms->m_extra, r->res_name, r->res_length);
3075 ms->m_hash = r->res_hash;
3076
3077 error = send_message(mh, ms);
3078 out:
3079 return error;
3080}
3081
3082static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3083 int mstype, int rv)
3084{
3085 struct dlm_message *ms;
3086 struct dlm_mhandle *mh;
3087 int to_nodeid, error;
3088
3089 to_nodeid = lkb->lkb_nodeid;
3090
3091 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3092 if (error)
3093 goto out;
3094
3095 send_args(r, lkb, ms);
3096
3097 ms->m_result = rv;
3098
3099 error = send_message(mh, ms);
3100 out:
3101 return error;
3102}
3103
3104static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3105{
3106 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3107}
3108
3109static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3110{
3111 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3112}
3113
3114static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3115{
3116 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3117}
3118
3119static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3120{
3121 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3122}
3123
3124static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3125 int ret_nodeid, int rv)
3126{
3127 struct dlm_rsb *r = &ls->ls_stub_rsb;
3128 struct dlm_message *ms;
3129 struct dlm_mhandle *mh;
3130 int error, nodeid = ms_in->m_header.h_nodeid;
3131
3132 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3133 if (error)
3134 goto out;
3135
3136 ms->m_lkid = ms_in->m_lkid;
3137 ms->m_result = rv;
3138 ms->m_nodeid = ret_nodeid;
3139
3140 error = send_message(mh, ms);
3141 out:
3142 return error;
3143}
3144
3145/* which args we save from a received message depends heavily on the type
3146 of message, unlike the send side where we can safely send everything about
3147 the lkb for any type of message */
3148
3149static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3150{
3151 lkb->lkb_exflags = ms->m_exflags;
6f90a8b1 3152 lkb->lkb_sbflags = ms->m_sbflags;
e7fd4179
DT
3153 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3154 (ms->m_flags & 0x0000FFFF);
3155}
3156
3157static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3158{
3159 lkb->lkb_sbflags = ms->m_sbflags;
3160 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3161 (ms->m_flags & 0x0000FFFF);
3162}
3163
3164static int receive_extralen(struct dlm_message *ms)
3165{
3166 return (ms->m_header.h_length - sizeof(struct dlm_message));
3167}
3168
e7fd4179
DT
3169static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3170 struct dlm_message *ms)
3171{
3172 int len;
3173
3174 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3175 if (!lkb->lkb_lvbptr)
52bda2b5 3176 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
3177 if (!lkb->lkb_lvbptr)
3178 return -ENOMEM;
3179 len = receive_extralen(ms);
a9cc9159
AV
3180 if (len > DLM_RESNAME_MAXLEN)
3181 len = DLM_RESNAME_MAXLEN;
e7fd4179
DT
3182 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3183 }
3184 return 0;
3185}
3186
e5dae548
DT
3187static void fake_bastfn(void *astparam, int mode)
3188{
3189 log_print("fake_bastfn should not be called");
3190}
3191
3192static void fake_astfn(void *astparam)
3193{
3194 log_print("fake_astfn should not be called");
3195}
3196
e7fd4179
DT
3197static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3198 struct dlm_message *ms)
3199{
3200 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3201 lkb->lkb_ownpid = ms->m_pid;
3202 lkb->lkb_remid = ms->m_lkid;
3203 lkb->lkb_grmode = DLM_LOCK_IV;
3204 lkb->lkb_rqmode = ms->m_rqmode;
e5dae548 3205
8304d6f2
DT
3206 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3207 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 3208
8d07fd50
DT
3209 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3210 /* lkb was just created so there won't be an lvb yet */
52bda2b5 3211 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
8d07fd50
DT
3212 if (!lkb->lkb_lvbptr)
3213 return -ENOMEM;
3214 }
e7fd4179
DT
3215
3216 return 0;
3217}
3218
3219static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3220 struct dlm_message *ms)
3221{
e7fd4179
DT
3222 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3223 return -EBUSY;
3224
e7fd4179
DT
3225 if (receive_lvb(ls, lkb, ms))
3226 return -ENOMEM;
3227
3228 lkb->lkb_rqmode = ms->m_rqmode;
3229 lkb->lkb_lvbseq = ms->m_lvbseq;
3230
3231 return 0;
3232}
3233
3234static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3235 struct dlm_message *ms)
3236{
e7fd4179
DT
3237 if (receive_lvb(ls, lkb, ms))
3238 return -ENOMEM;
3239 return 0;
3240}
3241
3242/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3243 uses to send a reply and that the remote end uses to process the reply. */
3244
3245static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3246{
3247 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3248 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3249 lkb->lkb_remid = ms->m_lkid;
3250}
3251
c54e04b0
DT
3252/* This is called after the rsb is locked so that we can safely inspect
3253 fields in the lkb. */
3254
3255static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3256{
3257 int from = ms->m_header.h_nodeid;
3258 int error = 0;
3259
3260 switch (ms->m_type) {
3261 case DLM_MSG_CONVERT:
3262 case DLM_MSG_UNLOCK:
3263 case DLM_MSG_CANCEL:
3264 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3265 error = -EINVAL;
3266 break;
3267
3268 case DLM_MSG_CONVERT_REPLY:
3269 case DLM_MSG_UNLOCK_REPLY:
3270 case DLM_MSG_CANCEL_REPLY:
3271 case DLM_MSG_GRANT:
3272 case DLM_MSG_BAST:
3273 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3274 error = -EINVAL;
3275 break;
3276
3277 case DLM_MSG_REQUEST_REPLY:
3278 if (!is_process_copy(lkb))
3279 error = -EINVAL;
3280 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3281 error = -EINVAL;
3282 break;
3283
3284 default:
3285 error = -EINVAL;
3286 }
3287
3288 if (error)
3289 log_error(lkb->lkb_resource->res_ls,
3290 "ignore invalid message %d from %d %x %x %x %d",
3291 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3292 lkb->lkb_flags, lkb->lkb_nodeid);
3293 return error;
3294}
3295
e7fd4179
DT
3296static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3297{
3298 struct dlm_lkb *lkb;
3299 struct dlm_rsb *r;
3300 int error, namelen;
3301
3302 error = create_lkb(ls, &lkb);
3303 if (error)
3304 goto fail;
3305
3306 receive_flags(lkb, ms);
3307 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3308 error = receive_request_args(ls, lkb, ms);
3309 if (error) {
b3f58d8f 3310 __put_lkb(ls, lkb);
e7fd4179
DT
3311 goto fail;
3312 }
3313
3314 namelen = receive_extralen(ms);
3315
3316 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3317 if (error) {
b3f58d8f 3318 __put_lkb(ls, lkb);
e7fd4179
DT
3319 goto fail;
3320 }
3321
3322 lock_rsb(r);
3323
3324 attach_lkb(r, lkb);
3325 error = do_request(r, lkb);
3326 send_request_reply(r, lkb, error);
cf6620ac 3327 do_request_effects(r, lkb, error);
e7fd4179
DT
3328
3329 unlock_rsb(r);
3330 put_rsb(r);
3331
3332 if (error == -EINPROGRESS)
3333 error = 0;
3334 if (error)
b3f58d8f 3335 dlm_put_lkb(lkb);
e7fd4179
DT
3336 return;
3337
3338 fail:
3339 setup_stub_lkb(ls, ms);
3340 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3341}
3342
3343static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3344{
3345 struct dlm_lkb *lkb;
3346 struct dlm_rsb *r;
90135925 3347 int error, reply = 1;
e7fd4179
DT
3348
3349 error = find_lkb(ls, ms->m_remid, &lkb);
3350 if (error)
3351 goto fail;
3352
3353 r = lkb->lkb_resource;
3354
3355 hold_rsb(r);
3356 lock_rsb(r);
3357
c54e04b0
DT
3358 error = validate_message(lkb, ms);
3359 if (error)
3360 goto out;
3361
e7fd4179 3362 receive_flags(lkb, ms);
cf6620ac 3363
e7fd4179 3364 error = receive_convert_args(ls, lkb, ms);
cf6620ac
DT
3365 if (error) {
3366 send_convert_reply(r, lkb, error);
3367 goto out;
3368 }
3369
e7fd4179
DT
3370 reply = !down_conversion(lkb);
3371
3372 error = do_convert(r, lkb);
e7fd4179
DT
3373 if (reply)
3374 send_convert_reply(r, lkb, error);
cf6620ac 3375 do_convert_effects(r, lkb, error);
c54e04b0 3376 out:
e7fd4179
DT
3377 unlock_rsb(r);
3378 put_rsb(r);
b3f58d8f 3379 dlm_put_lkb(lkb);
e7fd4179
DT
3380 return;
3381
3382 fail:
3383 setup_stub_lkb(ls, ms);
3384 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3385}
3386
3387static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3388{
3389 struct dlm_lkb *lkb;
3390 struct dlm_rsb *r;
3391 int error;
3392
3393 error = find_lkb(ls, ms->m_remid, &lkb);
3394 if (error)
3395 goto fail;
3396
3397 r = lkb->lkb_resource;
3398
3399 hold_rsb(r);
3400 lock_rsb(r);
3401
c54e04b0
DT
3402 error = validate_message(lkb, ms);
3403 if (error)
3404 goto out;
3405
e7fd4179 3406 receive_flags(lkb, ms);
cf6620ac 3407
e7fd4179 3408 error = receive_unlock_args(ls, lkb, ms);
cf6620ac
DT
3409 if (error) {
3410 send_unlock_reply(r, lkb, error);
3411 goto out;
3412 }
e7fd4179
DT
3413
3414 error = do_unlock(r, lkb);
e7fd4179 3415 send_unlock_reply(r, lkb, error);
cf6620ac 3416 do_unlock_effects(r, lkb, error);
c54e04b0 3417 out:
e7fd4179
DT
3418 unlock_rsb(r);
3419 put_rsb(r);
b3f58d8f 3420 dlm_put_lkb(lkb);
e7fd4179
DT
3421 return;
3422
3423 fail:
3424 setup_stub_lkb(ls, ms);
3425 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3426}
3427
3428static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3429{
3430 struct dlm_lkb *lkb;
3431 struct dlm_rsb *r;
3432 int error;
3433
3434 error = find_lkb(ls, ms->m_remid, &lkb);
3435 if (error)
3436 goto fail;
3437
3438 receive_flags(lkb, ms);
3439
3440 r = lkb->lkb_resource;
3441
3442 hold_rsb(r);
3443 lock_rsb(r);
3444
c54e04b0
DT
3445 error = validate_message(lkb, ms);
3446 if (error)
3447 goto out;
3448
e7fd4179
DT
3449 error = do_cancel(r, lkb);
3450 send_cancel_reply(r, lkb, error);
cf6620ac 3451 do_cancel_effects(r, lkb, error);
c54e04b0 3452 out:
e7fd4179
DT
3453 unlock_rsb(r);
3454 put_rsb(r);
b3f58d8f 3455 dlm_put_lkb(lkb);
e7fd4179
DT
3456 return;
3457
3458 fail:
3459 setup_stub_lkb(ls, ms);
3460 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3461}
3462
3463static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3464{
3465 struct dlm_lkb *lkb;
3466 struct dlm_rsb *r;
3467 int error;
3468
3469 error = find_lkb(ls, ms->m_remid, &lkb);
3470 if (error) {
c54e04b0
DT
3471 log_debug(ls, "receive_grant from %d no lkb %x",
3472 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3473 return;
3474 }
e7fd4179
DT
3475
3476 r = lkb->lkb_resource;
3477
3478 hold_rsb(r);
3479 lock_rsb(r);
3480
c54e04b0
DT
3481 error = validate_message(lkb, ms);
3482 if (error)
3483 goto out;
3484
e7fd4179 3485 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3486 if (is_altmode(lkb))
3487 munge_altmode(lkb, ms);
e7fd4179
DT
3488 grant_lock_pc(r, lkb, ms);
3489 queue_cast(r, lkb, 0);
c54e04b0 3490 out:
e7fd4179
DT
3491 unlock_rsb(r);
3492 put_rsb(r);
b3f58d8f 3493 dlm_put_lkb(lkb);
e7fd4179
DT
3494}
3495
3496static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3497{
3498 struct dlm_lkb *lkb;
3499 struct dlm_rsb *r;
3500 int error;
3501
3502 error = find_lkb(ls, ms->m_remid, &lkb);
3503 if (error) {
c54e04b0
DT
3504 log_debug(ls, "receive_bast from %d no lkb %x",
3505 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3506 return;
3507 }
e7fd4179
DT
3508
3509 r = lkb->lkb_resource;
3510
3511 hold_rsb(r);
3512 lock_rsb(r);
3513
c54e04b0
DT
3514 error = validate_message(lkb, ms);
3515 if (error)
3516 goto out;
e7fd4179 3517
c54e04b0
DT
3518 queue_bast(r, lkb, ms->m_bastmode);
3519 out:
e7fd4179
DT
3520 unlock_rsb(r);
3521 put_rsb(r);
b3f58d8f 3522 dlm_put_lkb(lkb);
e7fd4179
DT
3523}
3524
3525static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3526{
3527 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3528
3529 from_nodeid = ms->m_header.h_nodeid;
3530 our_nodeid = dlm_our_nodeid();
3531
3532 len = receive_extralen(ms);
3533
3534 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3535 if (dir_nodeid != our_nodeid) {
3536 log_error(ls, "lookup dir_nodeid %d from %d",
3537 dir_nodeid, from_nodeid);
3538 error = -EINVAL;
3539 ret_nodeid = -1;
3540 goto out;
3541 }
3542
3543 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3544
3545 /* Optimization: we're master so treat lookup as a request */
3546 if (!error && ret_nodeid == our_nodeid) {
3547 receive_request(ls, ms);
3548 return;
3549 }
3550 out:
3551 send_lookup_reply(ls, ms, ret_nodeid, error);
3552}
3553
3554static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3555{
3556 int len, dir_nodeid, from_nodeid;
3557
3558 from_nodeid = ms->m_header.h_nodeid;
3559
3560 len = receive_extralen(ms);
3561
3562 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3563 if (dir_nodeid != dlm_our_nodeid()) {
3564 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3565 dir_nodeid, from_nodeid);
3566 return;
3567 }
3568
3569 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3570}
3571
8499137d
DT
3572static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3573{
3574 do_purge(ls, ms->m_nodeid, ms->m_pid);
3575}
3576
e7fd4179
DT
3577static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3578{
3579 struct dlm_lkb *lkb;
3580 struct dlm_rsb *r;
ef0c2bb0 3581 int error, mstype, result;
e7fd4179
DT
3582
3583 error = find_lkb(ls, ms->m_remid, &lkb);
3584 if (error) {
c54e04b0
DT
3585 log_debug(ls, "receive_request_reply from %d no lkb %x",
3586 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3587 return;
3588 }
e7fd4179 3589
e7fd4179
DT
3590 r = lkb->lkb_resource;
3591 hold_rsb(r);
3592 lock_rsb(r);
3593
c54e04b0
DT
3594 error = validate_message(lkb, ms);
3595 if (error)
3596 goto out;
3597
ef0c2bb0
DT
3598 mstype = lkb->lkb_wait_type;
3599 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3600 if (error)
3601 goto out;
3602
e7fd4179
DT
3603 /* Optimization: the dir node was also the master, so it took our
3604 lookup as a request and sent request reply instead of lookup reply */
3605 if (mstype == DLM_MSG_LOOKUP) {
3606 r->res_nodeid = ms->m_header.h_nodeid;
3607 lkb->lkb_nodeid = r->res_nodeid;
3608 }
3609
ef0c2bb0
DT
3610 /* this is the value returned from do_request() on the master */
3611 result = ms->m_result;
3612
3613 switch (result) {
e7fd4179 3614 case -EAGAIN:
ef0c2bb0 3615 /* request would block (be queued) on remote master */
e7fd4179
DT
3616 queue_cast(r, lkb, -EAGAIN);
3617 confirm_master(r, -EAGAIN);
ef0c2bb0 3618 unhold_lkb(lkb); /* undoes create_lkb() */
e7fd4179
DT
3619 break;
3620
3621 case -EINPROGRESS:
3622 case 0:
3623 /* request was queued or granted on remote master */
3624 receive_flags_reply(lkb, ms);
3625 lkb->lkb_remid = ms->m_lkid;
7d3c1feb
DT
3626 if (is_altmode(lkb))
3627 munge_altmode(lkb, ms);
3ae1acf9 3628 if (result) {
e7fd4179 3629 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3ae1acf9
DT
3630 add_timeout(lkb);
3631 } else {
e7fd4179
DT
3632 grant_lock_pc(r, lkb, ms);
3633 queue_cast(r, lkb, 0);
3634 }
ef0c2bb0 3635 confirm_master(r, result);
e7fd4179
DT
3636 break;
3637
597d0cae 3638 case -EBADR:
e7fd4179
DT
3639 case -ENOTBLK:
3640 /* find_rsb failed to find rsb or rsb wasn't master */
ef0c2bb0
DT
3641 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3642 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
e7fd4179
DT
3643 r->res_nodeid = -1;
3644 lkb->lkb_nodeid = -1;
ef0c2bb0
DT
3645
3646 if (is_overlap(lkb)) {
3647 /* we'll ignore error in cancel/unlock reply */
3648 queue_cast_overlap(r, lkb);
aec64e1b 3649 confirm_master(r, result);
ef0c2bb0
DT
3650 unhold_lkb(lkb); /* undoes create_lkb() */
3651 } else
3652 _request_lock(r, lkb);
e7fd4179
DT
3653 break;
3654
3655 default:
ef0c2bb0
DT
3656 log_error(ls, "receive_request_reply %x error %d",
3657 lkb->lkb_id, result);
e7fd4179
DT
3658 }
3659
ef0c2bb0
DT
3660 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3661 log_debug(ls, "receive_request_reply %x result %d unlock",
3662 lkb->lkb_id, result);
3663 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3664 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3665 send_unlock(r, lkb);
3666 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3667 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3668 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3669 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3670 send_cancel(r, lkb);
3671 } else {
3672 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3673 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3674 }
3675 out:
e7fd4179
DT
3676 unlock_rsb(r);
3677 put_rsb(r);
b3f58d8f 3678 dlm_put_lkb(lkb);
e7fd4179
DT
3679}
3680
3681static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3682 struct dlm_message *ms)
3683{
e7fd4179 3684 /* this is the value returned from do_convert() on the master */
ef0c2bb0 3685 switch (ms->m_result) {
e7fd4179
DT
3686 case -EAGAIN:
3687 /* convert would block (be queued) on remote master */
3688 queue_cast(r, lkb, -EAGAIN);
3689 break;
3690
c85d65e9
DT
3691 case -EDEADLK:
3692 receive_flags_reply(lkb, ms);
3693 revert_lock_pc(r, lkb);
3694 queue_cast(r, lkb, -EDEADLK);
3695 break;
3696
e7fd4179
DT
3697 case -EINPROGRESS:
3698 /* convert was queued on remote master */
7d3c1feb
DT
3699 receive_flags_reply(lkb, ms);
3700 if (is_demoted(lkb))
3701 munge_demoted(lkb, ms);
e7fd4179
DT
3702 del_lkb(r, lkb);
3703 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3ae1acf9 3704 add_timeout(lkb);
e7fd4179
DT
3705 break;
3706
3707 case 0:
3708 /* convert was granted on remote master */
3709 receive_flags_reply(lkb, ms);
7d3c1feb
DT
3710 if (is_demoted(lkb))
3711 munge_demoted(lkb, ms);
e7fd4179
DT
3712 grant_lock_pc(r, lkb, ms);
3713 queue_cast(r, lkb, 0);
3714 break;
3715
3716 default:
ef0c2bb0
DT
3717 log_error(r->res_ls, "receive_convert_reply %x error %d",
3718 lkb->lkb_id, ms->m_result);
e7fd4179
DT
3719 }
3720}
3721
3722static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3723{
3724 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3725 int error;
e7fd4179
DT
3726
3727 hold_rsb(r);
3728 lock_rsb(r);
3729
c54e04b0
DT
3730 error = validate_message(lkb, ms);
3731 if (error)
3732 goto out;
3733
ef0c2bb0
DT
3734 /* stub reply can happen with waiters_mutex held */
3735 error = remove_from_waiters_ms(lkb, ms);
3736 if (error)
3737 goto out;
e7fd4179 3738
ef0c2bb0
DT
3739 __receive_convert_reply(r, lkb, ms);
3740 out:
e7fd4179
DT
3741 unlock_rsb(r);
3742 put_rsb(r);
3743}
3744
3745static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3746{
3747 struct dlm_lkb *lkb;
3748 int error;
3749
3750 error = find_lkb(ls, ms->m_remid, &lkb);
3751 if (error) {
c54e04b0
DT
3752 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3753 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3754 return;
3755 }
e7fd4179 3756
e7fd4179 3757 _receive_convert_reply(lkb, ms);
b3f58d8f 3758 dlm_put_lkb(lkb);
e7fd4179
DT
3759}
3760
3761static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3762{
3763 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3764 int error;
e7fd4179
DT
3765
3766 hold_rsb(r);
3767 lock_rsb(r);
3768
c54e04b0
DT
3769 error = validate_message(lkb, ms);
3770 if (error)
3771 goto out;
3772
ef0c2bb0
DT
3773 /* stub reply can happen with waiters_mutex held */
3774 error = remove_from_waiters_ms(lkb, ms);
3775 if (error)
3776 goto out;
3777
e7fd4179
DT
3778 /* this is the value returned from do_unlock() on the master */
3779
ef0c2bb0 3780 switch (ms->m_result) {
e7fd4179
DT
3781 case -DLM_EUNLOCK:
3782 receive_flags_reply(lkb, ms);
3783 remove_lock_pc(r, lkb);
3784 queue_cast(r, lkb, -DLM_EUNLOCK);
3785 break;
ef0c2bb0
DT
3786 case -ENOENT:
3787 break;
e7fd4179 3788 default:
ef0c2bb0
DT
3789 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3790 lkb->lkb_id, ms->m_result);
e7fd4179 3791 }
ef0c2bb0 3792 out:
e7fd4179
DT
3793 unlock_rsb(r);
3794 put_rsb(r);
3795}
3796
3797static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3798{
3799 struct dlm_lkb *lkb;
3800 int error;
3801
3802 error = find_lkb(ls, ms->m_remid, &lkb);
3803 if (error) {
c54e04b0
DT
3804 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3805 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3806 return;
3807 }
e7fd4179 3808
e7fd4179 3809 _receive_unlock_reply(lkb, ms);
b3f58d8f 3810 dlm_put_lkb(lkb);
e7fd4179
DT
3811}
3812
3813static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3814{
3815 struct dlm_rsb *r = lkb->lkb_resource;
ef0c2bb0 3816 int error;
e7fd4179
DT
3817
3818 hold_rsb(r);
3819 lock_rsb(r);
3820
c54e04b0
DT
3821 error = validate_message(lkb, ms);
3822 if (error)
3823 goto out;
3824
ef0c2bb0
DT
3825 /* stub reply can happen with waiters_mutex held */
3826 error = remove_from_waiters_ms(lkb, ms);
3827 if (error)
3828 goto out;
3829
e7fd4179
DT
3830 /* this is the value returned from do_cancel() on the master */
3831
ef0c2bb0 3832 switch (ms->m_result) {
e7fd4179
DT
3833 case -DLM_ECANCEL:
3834 receive_flags_reply(lkb, ms);
3835 revert_lock_pc(r, lkb);
84d8cd69 3836 queue_cast(r, lkb, -DLM_ECANCEL);
ef0c2bb0
DT
3837 break;
3838 case 0:
e7fd4179
DT
3839 break;
3840 default:
ef0c2bb0
DT
3841 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3842 lkb->lkb_id, ms->m_result);
e7fd4179 3843 }
ef0c2bb0 3844 out:
e7fd4179
DT
3845 unlock_rsb(r);
3846 put_rsb(r);
3847}
3848
3849static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3850{
3851 struct dlm_lkb *lkb;
3852 int error;
3853
3854 error = find_lkb(ls, ms->m_remid, &lkb);
3855 if (error) {
c54e04b0
DT
3856 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3857 ms->m_header.h_nodeid, ms->m_remid);
e7fd4179
DT
3858 return;
3859 }
e7fd4179 3860
e7fd4179 3861 _receive_cancel_reply(lkb, ms);
b3f58d8f 3862 dlm_put_lkb(lkb);
e7fd4179
DT
3863}
3864
3865static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3866{
3867 struct dlm_lkb *lkb;
3868 struct dlm_rsb *r;
3869 int error, ret_nodeid;
3870
3871 error = find_lkb(ls, ms->m_lkid, &lkb);
3872 if (error) {
3873 log_error(ls, "receive_lookup_reply no lkb");
3874 return;
3875 }
3876
ef0c2bb0 3877 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
e7fd4179 3878 FIXME: will a non-zero error ever be returned? */
e7fd4179
DT
3879
3880 r = lkb->lkb_resource;
3881 hold_rsb(r);
3882 lock_rsb(r);
3883
ef0c2bb0
DT
3884 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3885 if (error)
3886 goto out;
3887
e7fd4179
DT
3888 ret_nodeid = ms->m_nodeid;
3889 if (ret_nodeid == dlm_our_nodeid()) {
3890 r->res_nodeid = 0;
3891 ret_nodeid = 0;
3892 r->res_first_lkid = 0;
3893 } else {
3894 /* set_master() will copy res_nodeid to lkb_nodeid */
3895 r->res_nodeid = ret_nodeid;
3896 }
3897
ef0c2bb0
DT
3898 if (is_overlap(lkb)) {
3899 log_debug(ls, "receive_lookup_reply %x unlock %x",
3900 lkb->lkb_id, lkb->lkb_flags);
3901 queue_cast_overlap(r, lkb);
3902 unhold_lkb(lkb); /* undoes create_lkb() */
3903 goto out_list;
3904 }
3905
e7fd4179
DT
3906 _request_lock(r, lkb);
3907
ef0c2bb0 3908 out_list:
e7fd4179
DT
3909 if (!ret_nodeid)
3910 process_lookup_list(r);
ef0c2bb0 3911 out:
e7fd4179
DT
3912 unlock_rsb(r);
3913 put_rsb(r);
b3f58d8f 3914 dlm_put_lkb(lkb);
e7fd4179
DT
3915}
3916
c36258b5 3917static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
e7fd4179 3918{
46b43eed
DT
3919 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3920 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3921 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3922 ms->m_remid, ms->m_result);
3923 return;
3924 }
3925
e7fd4179
DT
3926 switch (ms->m_type) {
3927
3928 /* messages sent to a master node */
3929
3930 case DLM_MSG_REQUEST:
3931 receive_request(ls, ms);
3932 break;
3933
3934 case DLM_MSG_CONVERT:
3935 receive_convert(ls, ms);
3936 break;
3937
3938 case DLM_MSG_UNLOCK:
3939 receive_unlock(ls, ms);
3940 break;
3941
3942 case DLM_MSG_CANCEL:
3943 receive_cancel(ls, ms);
3944 break;
3945
3946 /* messages sent from a master node (replies to above) */
3947
3948 case DLM_MSG_REQUEST_REPLY:
3949 receive_request_reply(ls, ms);
3950 break;
3951
3952 case DLM_MSG_CONVERT_REPLY:
3953 receive_convert_reply(ls, ms);
3954 break;
3955
3956 case DLM_MSG_UNLOCK_REPLY:
3957 receive_unlock_reply(ls, ms);
3958 break;
3959
3960 case DLM_MSG_CANCEL_REPLY:
3961 receive_cancel_reply(ls, ms);
3962 break;
3963
3964 /* messages sent from a master node (only two types of async msg) */
3965
3966 case DLM_MSG_GRANT:
3967 receive_grant(ls, ms);
3968 break;
3969
3970 case DLM_MSG_BAST:
3971 receive_bast(ls, ms);
3972 break;
3973
3974 /* messages sent to a dir node */
3975
3976 case DLM_MSG_LOOKUP:
3977 receive_lookup(ls, ms);
3978 break;
3979
3980 case DLM_MSG_REMOVE:
3981 receive_remove(ls, ms);
3982 break;
3983
3984 /* messages sent from a dir node (remove has no reply) */
3985
3986 case DLM_MSG_LOOKUP_REPLY:
3987 receive_lookup_reply(ls, ms);
3988 break;
3989
8499137d
DT
3990 /* other messages */
3991
3992 case DLM_MSG_PURGE:
3993 receive_purge(ls, ms);
3994 break;
3995
e7fd4179
DT
3996 default:
3997 log_error(ls, "unknown message type %d", ms->m_type);
3998 }
3999
e7fd4179 4000 dlm_astd_wake();
e7fd4179
DT
4001}
4002
c36258b5
DT
4003/* If the lockspace is in recovery mode (locking stopped), then normal
4004 messages are saved on the requestqueue for processing after recovery is
4005 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4006 messages off the requestqueue before we process new ones. This occurs right
4007 after recovery completes when we transition from saving all messages on
4008 requestqueue, to processing all the saved messages, to processing new
4009 messages as they arrive. */
e7fd4179 4010
c36258b5
DT
4011static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4012 int nodeid)
4013{
4014 if (dlm_locking_stopped(ls)) {
8b0d8e03 4015 dlm_add_requestqueue(ls, nodeid, ms);
c36258b5
DT
4016 } else {
4017 dlm_wait_requestqueue(ls);
4018 _receive_message(ls, ms);
4019 }
4020}
4021
4022/* This is called by dlm_recoverd to process messages that were saved on
4023 the requestqueue. */
4024
4025void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4026{
4027 _receive_message(ls, ms);
4028}
4029
4030/* This is called by the midcomms layer when something is received for
4031 the lockspace. It could be either a MSG (normal message sent as part of
4032 standard locking activity) or an RCOM (recovery message sent as part of
4033 lockspace recovery). */
4034
eef7d739 4035void dlm_receive_buffer(union dlm_packet *p, int nodeid)
c36258b5 4036{
eef7d739 4037 struct dlm_header *hd = &p->header;
c36258b5
DT
4038 struct dlm_ls *ls;
4039 int type = 0;
4040
4041 switch (hd->h_cmd) {
4042 case DLM_MSG:
eef7d739
AV
4043 dlm_message_in(&p->message);
4044 type = p->message.m_type;
c36258b5
DT
4045 break;
4046 case DLM_RCOM:
eef7d739
AV
4047 dlm_rcom_in(&p->rcom);
4048 type = p->rcom.rc_type;
c36258b5
DT
4049 break;
4050 default:
4051 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4052 return;
4053 }
4054
4055 if (hd->h_nodeid != nodeid) {
4056 log_print("invalid h_nodeid %d from %d lockspace %x",
4057 hd->h_nodeid, nodeid, hd->h_lockspace);
4058 return;
4059 }
4060
4061 ls = dlm_find_lockspace_global(hd->h_lockspace);
4062 if (!ls) {
594199eb
DT
4063 if (dlm_config.ci_log_debug)
4064 log_print("invalid lockspace %x from %d cmd %d type %d",
4065 hd->h_lockspace, nodeid, hd->h_cmd, type);
c36258b5
DT
4066
4067 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
eef7d739 4068 dlm_send_ls_not_ready(nodeid, &p->rcom);
c36258b5
DT
4069 return;
4070 }
4071
4072 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4073 be inactive (in this ls) before transitioning to recovery mode */
4074
4075 down_read(&ls->ls_recv_active);
4076 if (hd->h_cmd == DLM_MSG)
eef7d739 4077 dlm_receive_message(ls, &p->message, nodeid);
c36258b5 4078 else
eef7d739 4079 dlm_receive_rcom(ls, &p->rcom, nodeid);
c36258b5
DT
4080 up_read(&ls->ls_recv_active);
4081
4082 dlm_put_lockspace(ls);
4083}
e7fd4179
DT
4084
4085static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4086{
4087 if (middle_conversion(lkb)) {
4088 hold_lkb(lkb);
ef0c2bb0 4089 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
e7fd4179 4090 ls->ls_stub_ms.m_result = -EINPROGRESS;
075529b5 4091 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
c54e04b0 4092 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
e7fd4179
DT
4093 _receive_convert_reply(lkb, &ls->ls_stub_ms);
4094
4095 /* Same special case as in receive_rcom_lock_args() */
4096 lkb->lkb_grmode = DLM_LOCK_IV;
4097 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4098 unhold_lkb(lkb);
4099
4100 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4101 lkb->lkb_flags |= DLM_IFL_RESEND;
4102 }
4103
4104 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4105 conversions are async; there's no reply from the remote master */
4106}
4107
4108/* A waiting lkb needs recovery if the master node has failed, or
4109 the master node is changing (only when no directory is used) */
4110
4111static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4112{
4113 if (dlm_is_removed(ls, lkb->lkb_nodeid))
4114 return 1;
4115
4116 if (!dlm_no_directory(ls))
4117 return 0;
4118
4119 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4120 return 1;
4121
4122 return 0;
4123}
4124
4125/* Recovery for locks that are waiting for replies from nodes that are now
4126 gone. We can just complete unlocks and cancels by faking a reply from the
4127 dead node. Requests and up-conversions we flag to be resent after
4128 recovery. Down-conversions can just be completed with a fake reply like
4129 unlocks. Conversions between PR and CW need special attention. */
4130
4131void dlm_recover_waiters_pre(struct dlm_ls *ls)
4132{
4133 struct dlm_lkb *lkb, *safe;
601342ce 4134 int wait_type, stub_unlock_result, stub_cancel_result;
e7fd4179 4135
90135925 4136 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4137
4138 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4139 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4140 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4141
4142 /* all outstanding lookups, regardless of destination will be
4143 resent after recovery is done */
4144
4145 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4146 lkb->lkb_flags |= DLM_IFL_RESEND;
4147 continue;
4148 }
4149
4150 if (!waiter_needs_recovery(ls, lkb))
4151 continue;
4152
601342ce
DT
4153 wait_type = lkb->lkb_wait_type;
4154 stub_unlock_result = -DLM_EUNLOCK;
4155 stub_cancel_result = -DLM_ECANCEL;
4156
4157 /* Main reply may have been received leaving a zero wait_type,
4158 but a reply for the overlapping op may not have been
4159 received. In that case we need to fake the appropriate
4160 reply for the overlap op. */
4161
4162 if (!wait_type) {
4163 if (is_overlap_cancel(lkb)) {
4164 wait_type = DLM_MSG_CANCEL;
4165 if (lkb->lkb_grmode == DLM_LOCK_IV)
4166 stub_cancel_result = 0;
4167 }
4168 if (is_overlap_unlock(lkb)) {
4169 wait_type = DLM_MSG_UNLOCK;
4170 if (lkb->lkb_grmode == DLM_LOCK_IV)
4171 stub_unlock_result = -ENOENT;
4172 }
4173
4174 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4175 lkb->lkb_id, lkb->lkb_flags, wait_type,
4176 stub_cancel_result, stub_unlock_result);
4177 }
4178
4179 switch (wait_type) {
e7fd4179
DT
4180
4181 case DLM_MSG_REQUEST:
4182 lkb->lkb_flags |= DLM_IFL_RESEND;
4183 break;
4184
4185 case DLM_MSG_CONVERT:
4186 recover_convert_waiter(ls, lkb);
4187 break;
4188
4189 case DLM_MSG_UNLOCK:
4190 hold_lkb(lkb);
ef0c2bb0 4191 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
601342ce 4192 ls->ls_stub_ms.m_result = stub_unlock_result;
075529b5 4193 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
c54e04b0 4194 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
e7fd4179 4195 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 4196 dlm_put_lkb(lkb);
e7fd4179
DT
4197 break;
4198
4199 case DLM_MSG_CANCEL:
4200 hold_lkb(lkb);
ef0c2bb0 4201 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
601342ce 4202 ls->ls_stub_ms.m_result = stub_cancel_result;
075529b5 4203 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
c54e04b0 4204 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
e7fd4179 4205 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
b3f58d8f 4206 dlm_put_lkb(lkb);
e7fd4179
DT
4207 break;
4208
4209 default:
601342ce
DT
4210 log_error(ls, "invalid lkb wait_type %d %d",
4211 lkb->lkb_wait_type, wait_type);
e7fd4179 4212 }
81456807 4213 schedule();
e7fd4179 4214 }
90135925 4215 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179
DT
4216}
4217
ef0c2bb0 4218static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
e7fd4179
DT
4219{
4220 struct dlm_lkb *lkb;
ef0c2bb0 4221 int found = 0;
e7fd4179 4222
90135925 4223 mutex_lock(&ls->ls_waiters_mutex);
e7fd4179
DT
4224 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4225 if (lkb->lkb_flags & DLM_IFL_RESEND) {
ef0c2bb0
DT
4226 hold_lkb(lkb);
4227 found = 1;
e7fd4179
DT
4228 break;
4229 }
4230 }
90135925 4231 mutex_unlock(&ls->ls_waiters_mutex);
e7fd4179 4232
ef0c2bb0 4233 if (!found)
e7fd4179 4234 lkb = NULL;
ef0c2bb0 4235 return lkb;
e7fd4179
DT
4236}
4237
4238/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
4239 master or dir-node for r. Processing the lkb may result in it being placed
4240 back on waiters. */
4241
ef0c2bb0
DT
4242/* We do this after normal locking has been enabled and any saved messages
4243 (in requestqueue) have been processed. We should be confident that at
4244 this point we won't get or process a reply to any of these waiting
4245 operations. But, new ops may be coming in on the rsbs/locks here from
4246 userspace or remotely. */
4247
4248/* there may have been an overlap unlock/cancel prior to recovery or after
4249 recovery. if before, the lkb may still have a pos wait_count; if after, the
4250 overlap flag would just have been set and nothing new sent. we can be
4251 confident here than any replies to either the initial op or overlap ops
4252 prior to recovery have been received. */
4253
e7fd4179
DT
4254int dlm_recover_waiters_post(struct dlm_ls *ls)
4255{
4256 struct dlm_lkb *lkb;
4257 struct dlm_rsb *r;
ef0c2bb0 4258 int error = 0, mstype, err, oc, ou;
e7fd4179
DT
4259
4260 while (1) {
4261 if (dlm_locking_stopped(ls)) {
4262 log_debug(ls, "recover_waiters_post aborted");
4263 error = -EINTR;
4264 break;
4265 }
4266
ef0c2bb0
DT
4267 lkb = find_resend_waiter(ls);
4268 if (!lkb)
e7fd4179
DT
4269 break;
4270
4271 r = lkb->lkb_resource;
ef0c2bb0
DT
4272 hold_rsb(r);
4273 lock_rsb(r);
4274
4275 mstype = lkb->lkb_wait_type;
4276 oc = is_overlap_cancel(lkb);
4277 ou = is_overlap_unlock(lkb);
4278 err = 0;
e7fd4179
DT
4279
4280 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4281 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4282
ef0c2bb0
DT
4283 /* At this point we assume that we won't get a reply to any
4284 previous op or overlap op on this lock. First, do a big
4285 remove_from_waiters() for all previous ops. */
4286
4287 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4288 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4289 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4290 lkb->lkb_wait_type = 0;
4291 lkb->lkb_wait_count = 0;
4292 mutex_lock(&ls->ls_waiters_mutex);
4293 list_del_init(&lkb->lkb_wait_reply);
4294 mutex_unlock(&ls->ls_waiters_mutex);
4295 unhold_lkb(lkb); /* for waiters list */
4296
4297 if (oc || ou) {
4298 /* do an unlock or cancel instead of resending */
4299 switch (mstype) {
4300 case DLM_MSG_LOOKUP:
4301 case DLM_MSG_REQUEST:
4302 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4303 -DLM_ECANCEL);
4304 unhold_lkb(lkb); /* undoes create_lkb() */
4305 break;
4306 case DLM_MSG_CONVERT:
4307 if (oc) {
4308 queue_cast(r, lkb, -DLM_ECANCEL);
4309 } else {
4310 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4311 _unlock_lock(r, lkb);
4312 }
4313 break;
4314 default:
4315 err = 1;
4316 }
4317 } else {
4318 switch (mstype) {
4319 case DLM_MSG_LOOKUP:
4320 case DLM_MSG_REQUEST:
4321 _request_lock(r, lkb);
4322 if (is_master(r))
4323 confirm_master(r, 0);
4324 break;
4325 case DLM_MSG_CONVERT:
4326 _convert_lock(r, lkb);
4327 break;
4328 default:
4329 err = 1;
4330 }
e7fd4179 4331 }
ef0c2bb0
DT
4332
4333 if (err)
4334 log_error(ls, "recover_waiters_post %x %d %x %d %d",
4335 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4336 unlock_rsb(r);
4337 put_rsb(r);
4338 dlm_put_lkb(lkb);
e7fd4179
DT
4339 }
4340
4341 return error;
4342}
4343
4344static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4345 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4346{
4347 struct dlm_ls *ls = r->res_ls;
4348 struct dlm_lkb *lkb, *safe;
4349
4350 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4351 if (test(ls, lkb)) {
97a35d1e 4352 rsb_set_flag(r, RSB_LOCKS_PURGED);
e7fd4179
DT
4353 del_lkb(r, lkb);
4354 /* this put should free the lkb */
b3f58d8f 4355 if (!dlm_put_lkb(lkb))
e7fd4179
DT
4356 log_error(ls, "purged lkb not released");
4357 }
4358 }
4359}
4360
4361static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4362{
4363 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4364}
4365
4366static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4367{
4368 return is_master_copy(lkb);
4369}
4370
4371static void purge_dead_locks(struct dlm_rsb *r)
4372{
4373 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4374 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4375 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4376}
4377
4378void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4379{
4380 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4381 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4382 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4383}
4384
4385/* Get rid of locks held by nodes that are gone. */
4386
4387int dlm_purge_locks(struct dlm_ls *ls)
4388{
4389 struct dlm_rsb *r;
4390
4391 log_debug(ls, "dlm_purge_locks");
4392
4393 down_write(&ls->ls_root_sem);
4394 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4395 hold_rsb(r);
4396 lock_rsb(r);
4397 if (is_master(r))
4398 purge_dead_locks(r);
4399 unlock_rsb(r);
4400 unhold_rsb(r);
4401
4402 schedule();
4403 }
4404 up_write(&ls->ls_root_sem);
4405
4406 return 0;
4407}
4408
97a35d1e
DT
4409static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4410{
4411 struct dlm_rsb *r, *r_ret = NULL;
4412
c7be761a 4413 spin_lock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4414 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4415 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4416 continue;
4417 hold_rsb(r);
4418 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4419 r_ret = r;
4420 break;
4421 }
c7be761a 4422 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
97a35d1e
DT
4423 return r_ret;
4424}
4425
4426void dlm_grant_after_purge(struct dlm_ls *ls)
e7fd4179
DT
4427{
4428 struct dlm_rsb *r;
2b4e926a 4429 int bucket = 0;
e7fd4179 4430
2b4e926a
DT
4431 while (1) {
4432 r = find_purged_rsb(ls, bucket);
4433 if (!r) {
4434 if (bucket == ls->ls_rsbtbl_size - 1)
4435 break;
4436 bucket++;
97a35d1e 4437 continue;
2b4e926a 4438 }
97a35d1e
DT
4439 lock_rsb(r);
4440 if (is_master(r)) {
4441 grant_pending_locks(r);
4442 confirm_master(r, 0);
e7fd4179 4443 }
97a35d1e
DT
4444 unlock_rsb(r);
4445 put_rsb(r);
2b4e926a 4446 schedule();
e7fd4179 4447 }
e7fd4179
DT
4448}
4449
4450static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4451 uint32_t remid)
4452{
4453 struct dlm_lkb *lkb;
4454
4455 list_for_each_entry(lkb, head, lkb_statequeue) {
4456 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4457 return lkb;
4458 }
4459 return NULL;
4460}
4461
4462static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4463 uint32_t remid)
4464{
4465 struct dlm_lkb *lkb;
4466
4467 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4468 if (lkb)
4469 return lkb;
4470 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4471 if (lkb)
4472 return lkb;
4473 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4474 if (lkb)
4475 return lkb;
4476 return NULL;
4477}
4478
ae773d0b 4479/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4480static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4481 struct dlm_rsb *r, struct dlm_rcom *rc)
4482{
4483 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
e7fd4179
DT
4484
4485 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
163a1859
AV
4486 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4487 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4488 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4489 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
e7fd4179 4490 lkb->lkb_flags |= DLM_IFL_MSTCPY;
163a1859 4491 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
e7fd4179
DT
4492 lkb->lkb_rqmode = rl->rl_rqmode;
4493 lkb->lkb_grmode = rl->rl_grmode;
4494 /* don't set lkb_status because add_lkb wants to itself */
4495
8304d6f2
DT
4496 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4497 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
e7fd4179 4498
e7fd4179 4499 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
a5dd0631
AV
4500 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4501 sizeof(struct rcom_lock);
4502 if (lvblen > ls->ls_lvblen)
4503 return -EINVAL;
52bda2b5 4504 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
e7fd4179
DT
4505 if (!lkb->lkb_lvbptr)
4506 return -ENOMEM;
e7fd4179
DT
4507 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4508 }
4509
4510 /* Conversions between PR and CW (middle modes) need special handling.
4511 The real granted mode of these converting locks cannot be determined
4512 until all locks have been rebuilt on the rsb (recover_conversion) */
4513
163a1859
AV
4514 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4515 middle_conversion(lkb)) {
e7fd4179
DT
4516 rl->rl_status = DLM_LKSTS_CONVERT;
4517 lkb->lkb_grmode = DLM_LOCK_IV;
4518 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4519 }
4520
4521 return 0;
4522}
4523
4524/* This lkb may have been recovered in a previous aborted recovery so we need
4525 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4526 If so we just send back a standard reply. If not, we create a new lkb with
4527 the given values and send back our lkid. We send back our lkid by sending
4528 back the rcom_lock struct we got but with the remid field filled in. */
4529
ae773d0b 4530/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4531int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4532{
4533 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4534 struct dlm_rsb *r;
4535 struct dlm_lkb *lkb;
4536 int error;
4537
4538 if (rl->rl_parent_lkid) {
4539 error = -EOPNOTSUPP;
4540 goto out;
4541 }
4542
163a1859
AV
4543 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4544 R_MASTER, &r);
e7fd4179
DT
4545 if (error)
4546 goto out;
4547
4548 lock_rsb(r);
4549
163a1859 4550 lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4551 if (lkb) {
4552 error = -EEXIST;
4553 goto out_remid;
4554 }
4555
4556 error = create_lkb(ls, &lkb);
4557 if (error)
4558 goto out_unlock;
4559
4560 error = receive_rcom_lock_args(ls, lkb, r, rc);
4561 if (error) {
b3f58d8f 4562 __put_lkb(ls, lkb);
e7fd4179
DT
4563 goto out_unlock;
4564 }
4565
4566 attach_lkb(r, lkb);
4567 add_lkb(r, lkb, rl->rl_status);
4568 error = 0;
4569
4570 out_remid:
4571 /* this is the new value returned to the lock holder for
4572 saving in its process-copy lkb */
163a1859 4573 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
e7fd4179
DT
4574
4575 out_unlock:
4576 unlock_rsb(r);
4577 put_rsb(r);
4578 out:
4579 if (error)
163a1859
AV
4580 log_debug(ls, "recover_master_copy %d %x", error,
4581 le32_to_cpu(rl->rl_lkid));
4582 rl->rl_result = cpu_to_le32(error);
e7fd4179
DT
4583 return error;
4584}
4585
ae773d0b 4586/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
4587int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4588{
4589 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4590 struct dlm_rsb *r;
4591 struct dlm_lkb *lkb;
4592 int error;
4593
163a1859 4594 error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
e7fd4179 4595 if (error) {
163a1859
AV
4596 log_error(ls, "recover_process_copy no lkid %x",
4597 le32_to_cpu(rl->rl_lkid));
e7fd4179
DT
4598 return error;
4599 }
4600
4601 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4602
163a1859 4603 error = le32_to_cpu(rl->rl_result);
e7fd4179
DT
4604
4605 r = lkb->lkb_resource;
4606 hold_rsb(r);
4607 lock_rsb(r);
4608
4609 switch (error) {
dc200a88
DT
4610 case -EBADR:
4611 /* There's a chance the new master received our lock before
4612 dlm_recover_master_reply(), this wouldn't happen if we did
4613 a barrier between recover_masters and recover_locks. */
4614 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4615 (unsigned long)r, r->res_name);
4616 dlm_send_rcom_lock(r, lkb);
4617 goto out;
e7fd4179
DT
4618 case -EEXIST:
4619 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4620 /* fall through */
4621 case 0:
163a1859 4622 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
e7fd4179
DT
4623 break;
4624 default:
4625 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4626 error, lkb->lkb_id);
4627 }
4628
4629 /* an ack for dlm_recover_locks() which waits for replies from
4630 all the locks it sends to new masters */
4631 dlm_recovered_lock(r);
dc200a88 4632 out:
e7fd4179
DT
4633 unlock_rsb(r);
4634 put_rsb(r);
b3f58d8f 4635 dlm_put_lkb(lkb);
e7fd4179
DT
4636
4637 return 0;
4638}
4639
597d0cae
DT
4640int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4641 int mode, uint32_t flags, void *name, unsigned int namelen,
d7db923e 4642 unsigned long timeout_cs)
597d0cae
DT
4643{
4644 struct dlm_lkb *lkb;
4645 struct dlm_args args;
4646 int error;
4647
85e86edf 4648 dlm_lock_recovery(ls);
597d0cae
DT
4649
4650 error = create_lkb(ls, &lkb);
4651 if (error) {
4652 kfree(ua);
4653 goto out;
4654 }
4655
4656 if (flags & DLM_LKF_VALBLK) {
573c24c4 4657 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4658 if (!ua->lksb.sb_lvbptr) {
4659 kfree(ua);
4660 __put_lkb(ls, lkb);
4661 error = -ENOMEM;
4662 goto out;
4663 }
4664 }
4665
52bda2b5 4666 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
597d0cae
DT
4667 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4668 lock and that lkb_astparam is the dlm_user_args structure. */
4669
d7db923e 4670 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
e5dae548 4671 fake_astfn, ua, fake_bastfn, &args);
597d0cae 4672 lkb->lkb_flags |= DLM_IFL_USER;
597d0cae
DT
4673
4674 if (error) {
4675 __put_lkb(ls, lkb);
4676 goto out;
4677 }
4678
4679 error = request_lock(ls, lkb, name, namelen, &args);
4680
4681 switch (error) {
4682 case 0:
4683 break;
4684 case -EINPROGRESS:
4685 error = 0;
4686 break;
4687 case -EAGAIN:
4688 error = 0;
4689 /* fall through */
4690 default:
4691 __put_lkb(ls, lkb);
4692 goto out;
4693 }
4694
4695 /* add this new lkb to the per-process list of locks */
4696 spin_lock(&ua->proc->locks_spin);
ef0c2bb0 4697 hold_lkb(lkb);
597d0cae
DT
4698 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4699 spin_unlock(&ua->proc->locks_spin);
4700 out:
85e86edf 4701 dlm_unlock_recovery(ls);
597d0cae
DT
4702 return error;
4703}
4704
4705int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
d7db923e
DT
4706 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4707 unsigned long timeout_cs)
597d0cae
DT
4708{
4709 struct dlm_lkb *lkb;
4710 struct dlm_args args;
4711 struct dlm_user_args *ua;
4712 int error;
4713
85e86edf 4714 dlm_lock_recovery(ls);
597d0cae
DT
4715
4716 error = find_lkb(ls, lkid, &lkb);
4717 if (error)
4718 goto out;
4719
4720 /* user can change the params on its lock when it converts it, or
4721 add an lvb that didn't exist before */
4722
d292c0cc 4723 ua = lkb->lkb_ua;
597d0cae
DT
4724
4725 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
573c24c4 4726 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
597d0cae
DT
4727 if (!ua->lksb.sb_lvbptr) {
4728 error = -ENOMEM;
4729 goto out_put;
4730 }
4731 }
4732 if (lvb_in && ua->lksb.sb_lvbptr)
4733 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4734
d7db923e 4735 ua->xid = ua_tmp->xid;
597d0cae
DT
4736 ua->castparam = ua_tmp->castparam;
4737 ua->castaddr = ua_tmp->castaddr;
4738 ua->bastparam = ua_tmp->bastparam;
4739 ua->bastaddr = ua_tmp->bastaddr;
10948eb4 4740 ua->user_lksb = ua_tmp->user_lksb;
597d0cae 4741
d7db923e 4742 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
e5dae548 4743 fake_astfn, ua, fake_bastfn, &args);
597d0cae
DT
4744 if (error)
4745 goto out_put;
4746
4747 error = convert_lock(ls, lkb, &args);
4748
c85d65e9 4749 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
597d0cae
DT
4750 error = 0;
4751 out_put:
4752 dlm_put_lkb(lkb);
4753 out:
85e86edf 4754 dlm_unlock_recovery(ls);
597d0cae
DT
4755 kfree(ua_tmp);
4756 return error;
4757}
4758
4759int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4760 uint32_t flags, uint32_t lkid, char *lvb_in)
4761{
4762 struct dlm_lkb *lkb;
4763 struct dlm_args args;
4764 struct dlm_user_args *ua;
4765 int error;
4766
85e86edf 4767 dlm_lock_recovery(ls);
597d0cae
DT
4768
4769 error = find_lkb(ls, lkid, &lkb);
4770 if (error)
4771 goto out;
4772
d292c0cc 4773 ua = lkb->lkb_ua;
597d0cae
DT
4774
4775 if (lvb_in && ua->lksb.sb_lvbptr)
4776 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
b434eda6
PC
4777 if (ua_tmp->castparam)
4778 ua->castparam = ua_tmp->castparam;
cc346d55 4779 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4780
4781 error = set_unlock_args(flags, ua, &args);
4782 if (error)
4783 goto out_put;
4784
4785 error = unlock_lock(ls, lkb, &args);
4786
4787 if (error == -DLM_EUNLOCK)
4788 error = 0;
ef0c2bb0
DT
4789 /* from validate_unlock_args() */
4790 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4791 error = 0;
597d0cae
DT
4792 if (error)
4793 goto out_put;
4794
4795 spin_lock(&ua->proc->locks_spin);
a1bc86e6
DT
4796 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4797 if (!list_empty(&lkb->lkb_ownqueue))
4798 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
597d0cae 4799 spin_unlock(&ua->proc->locks_spin);
597d0cae
DT
4800 out_put:
4801 dlm_put_lkb(lkb);
4802 out:
85e86edf 4803 dlm_unlock_recovery(ls);
ef0c2bb0 4804 kfree(ua_tmp);
597d0cae
DT
4805 return error;
4806}
4807
4808int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4809 uint32_t flags, uint32_t lkid)
4810{
4811 struct dlm_lkb *lkb;
4812 struct dlm_args args;
4813 struct dlm_user_args *ua;
4814 int error;
4815
85e86edf 4816 dlm_lock_recovery(ls);
597d0cae
DT
4817
4818 error = find_lkb(ls, lkid, &lkb);
4819 if (error)
4820 goto out;
4821
d292c0cc 4822 ua = lkb->lkb_ua;
b434eda6
PC
4823 if (ua_tmp->castparam)
4824 ua->castparam = ua_tmp->castparam;
c059f70e 4825 ua->user_lksb = ua_tmp->user_lksb;
597d0cae
DT
4826
4827 error = set_unlock_args(flags, ua, &args);
4828 if (error)
4829 goto out_put;
4830
4831 error = cancel_lock(ls, lkb, &args);
4832
4833 if (error == -DLM_ECANCEL)
4834 error = 0;
ef0c2bb0
DT
4835 /* from validate_unlock_args() */
4836 if (error == -EBUSY)
4837 error = 0;
597d0cae
DT
4838 out_put:
4839 dlm_put_lkb(lkb);
4840 out:
85e86edf 4841 dlm_unlock_recovery(ls);
ef0c2bb0 4842 kfree(ua_tmp);
597d0cae
DT
4843 return error;
4844}
4845
8b4021fa
DT
4846int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4847{
4848 struct dlm_lkb *lkb;
4849 struct dlm_args args;
4850 struct dlm_user_args *ua;
4851 struct dlm_rsb *r;
4852 int error;
4853
4854 dlm_lock_recovery(ls);
4855
4856 error = find_lkb(ls, lkid, &lkb);
4857 if (error)
4858 goto out;
4859
d292c0cc 4860 ua = lkb->lkb_ua;
8b4021fa
DT
4861
4862 error = set_unlock_args(flags, ua, &args);
4863 if (error)
4864 goto out_put;
4865
4866 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4867
4868 r = lkb->lkb_resource;
4869 hold_rsb(r);
4870 lock_rsb(r);
4871
4872 error = validate_unlock_args(lkb, &args);
4873 if (error)
4874 goto out_r;
4875 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4876
4877 error = _cancel_lock(r, lkb);
4878 out_r:
4879 unlock_rsb(r);
4880 put_rsb(r);
4881
4882 if (error == -DLM_ECANCEL)
4883 error = 0;
4884 /* from validate_unlock_args() */
4885 if (error == -EBUSY)
4886 error = 0;
4887 out_put:
4888 dlm_put_lkb(lkb);
4889 out:
4890 dlm_unlock_recovery(ls);
4891 return error;
4892}
4893
ef0c2bb0
DT
4894/* lkb's that are removed from the waiters list by revert are just left on the
4895 orphans list with the granted orphan locks, to be freed by purge */
4896
597d0cae
DT
4897static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4898{
ef0c2bb0
DT
4899 struct dlm_args args;
4900 int error;
597d0cae 4901
ef0c2bb0
DT
4902 hold_lkb(lkb);
4903 mutex_lock(&ls->ls_orphans_mutex);
4904 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4905 mutex_unlock(&ls->ls_orphans_mutex);
597d0cae 4906
d292c0cc 4907 set_unlock_args(0, lkb->lkb_ua, &args);
ef0c2bb0
DT
4908
4909 error = cancel_lock(ls, lkb, &args);
4910 if (error == -DLM_ECANCEL)
4911 error = 0;
4912 return error;
597d0cae
DT
4913}
4914
4915/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4916 Regardless of what rsb queue the lock is on, it's removed and freed. */
4917
4918static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4919{
597d0cae
DT
4920 struct dlm_args args;
4921 int error;
4922
d292c0cc 4923 set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
597d0cae
DT
4924
4925 error = unlock_lock(ls, lkb, &args);
4926 if (error == -DLM_EUNLOCK)
4927 error = 0;
4928 return error;
4929}
4930
ef0c2bb0
DT
4931/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4932 (which does lock_rsb) due to deadlock with receiving a message that does
4933 lock_rsb followed by dlm_user_add_ast() */
4934
4935static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4936 struct dlm_user_proc *proc)
4937{
4938 struct dlm_lkb *lkb = NULL;
4939
4940 mutex_lock(&ls->ls_clear_proc_locks);
4941 if (list_empty(&proc->locks))
4942 goto out;
4943
4944 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4945 list_del_init(&lkb->lkb_ownqueue);
4946
4947 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4948 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4949 else
4950 lkb->lkb_flags |= DLM_IFL_DEAD;
4951 out:
4952 mutex_unlock(&ls->ls_clear_proc_locks);
4953 return lkb;
4954}
4955
597d0cae
DT
4956/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4957 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4958 which we clear here. */
4959
4960/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4961 list, and no more device_writes should add lkb's to proc->locks list; so we
4962 shouldn't need to take asts_spin or locks_spin here. this assumes that
4963 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4964 them ourself. */
4965
4966void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4967{
4968 struct dlm_lkb *lkb, *safe;
4969
85e86edf 4970 dlm_lock_recovery(ls);
597d0cae 4971
ef0c2bb0
DT
4972 while (1) {
4973 lkb = del_proc_lock(ls, proc);
4974 if (!lkb)
4975 break;
84d8cd69 4976 del_timeout(lkb);
ef0c2bb0 4977 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
597d0cae 4978 orphan_proc_lock(ls, lkb);
ef0c2bb0 4979 else
597d0cae 4980 unlock_proc_lock(ls, lkb);
597d0cae
DT
4981
4982 /* this removes the reference for the proc->locks list
4983 added by dlm_user_request, it may result in the lkb
4984 being freed */
4985
4986 dlm_put_lkb(lkb);
4987 }
a1bc86e6 4988
ef0c2bb0
DT
4989 mutex_lock(&ls->ls_clear_proc_locks);
4990
a1bc86e6
DT
4991 /* in-progress unlocks */
4992 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4993 list_del_init(&lkb->lkb_ownqueue);
4994 lkb->lkb_flags |= DLM_IFL_DEAD;
4995 dlm_put_lkb(lkb);
4996 }
4997
4998 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
8304d6f2
DT
4999 memset(&lkb->lkb_callbacks, 0,
5000 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5001 list_del_init(&lkb->lkb_astqueue);
a1bc86e6
DT
5002 dlm_put_lkb(lkb);
5003 }
5004
597d0cae 5005 mutex_unlock(&ls->ls_clear_proc_locks);
85e86edf 5006 dlm_unlock_recovery(ls);
597d0cae 5007}
a1bc86e6 5008
8499137d
DT
5009static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5010{
5011 struct dlm_lkb *lkb, *safe;
5012
5013 while (1) {
5014 lkb = NULL;
5015 spin_lock(&proc->locks_spin);
5016 if (!list_empty(&proc->locks)) {
5017 lkb = list_entry(proc->locks.next, struct dlm_lkb,
5018 lkb_ownqueue);
5019 list_del_init(&lkb->lkb_ownqueue);
5020 }
5021 spin_unlock(&proc->locks_spin);
5022
5023 if (!lkb)
5024 break;
5025
5026 lkb->lkb_flags |= DLM_IFL_DEAD;
5027 unlock_proc_lock(ls, lkb);
5028 dlm_put_lkb(lkb); /* ref from proc->locks list */
5029 }
5030
5031 spin_lock(&proc->locks_spin);
5032 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5033 list_del_init(&lkb->lkb_ownqueue);
5034 lkb->lkb_flags |= DLM_IFL_DEAD;
5035 dlm_put_lkb(lkb);
5036 }
5037 spin_unlock(&proc->locks_spin);
5038
5039 spin_lock(&proc->asts_spin);
5040 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
8304d6f2
DT
5041 memset(&lkb->lkb_callbacks, 0,
5042 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5043 list_del_init(&lkb->lkb_astqueue);
8499137d
DT
5044 dlm_put_lkb(lkb);
5045 }
5046 spin_unlock(&proc->asts_spin);
5047}
5048
5049/* pid of 0 means purge all orphans */
5050
5051static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5052{
5053 struct dlm_lkb *lkb, *safe;
5054
5055 mutex_lock(&ls->ls_orphans_mutex);
5056 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5057 if (pid && lkb->lkb_ownpid != pid)
5058 continue;
5059 unlock_proc_lock(ls, lkb);
5060 list_del_init(&lkb->lkb_ownqueue);
5061 dlm_put_lkb(lkb);
5062 }
5063 mutex_unlock(&ls->ls_orphans_mutex);
5064}
5065
5066static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5067{
5068 struct dlm_message *ms;
5069 struct dlm_mhandle *mh;
5070 int error;
5071
5072 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5073 DLM_MSG_PURGE, &ms, &mh);
5074 if (error)
5075 return error;
5076 ms->m_nodeid = nodeid;
5077 ms->m_pid = pid;
5078
5079 return send_message(mh, ms);
5080}
5081
5082int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5083 int nodeid, int pid)
5084{
5085 int error = 0;
5086
5087 if (nodeid != dlm_our_nodeid()) {
5088 error = send_purge(ls, nodeid, pid);
5089 } else {
85e86edf 5090 dlm_lock_recovery(ls);
8499137d
DT
5091 if (pid == current->pid)
5092 purge_proc_locks(ls, proc);
5093 else
5094 do_purge(ls, nodeid, pid);
85e86edf 5095 dlm_unlock_recovery(ls);
8499137d
DT
5096 }
5097 return error;
5098}
5099