Merge tag 'for-linus-6.12a-rc2-tag' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-block.git] / fs / dlm / rcom.c
CommitLineData
2522fe45 1// SPDX-License-Identifier: GPL-2.0-only
e7fd4179
DT
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
dbcfc347 6** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
e7fd4179 7**
e7fd4179
DT
8**
9*******************************************************************************
10******************************************************************************/
11
12#include "dlm_internal.h"
13#include "lockspace.h"
14#include "member.h"
15#include "lowcomms.h"
16#include "midcomms.h"
17#include "rcom.h"
18#include "recover.h"
19#include "dir.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "util.h"
e7fd4179
DT
24
25static int rcom_response(struct dlm_ls *ls)
26{
27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28}
29
a070a91c 30static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
c4f4e135
AA
31 struct dlm_rcom **rc_ret, char *mb, int mb_len,
32 uint64_t seq)
e7fd4179
DT
33{
34 struct dlm_rcom *rc;
e7fd4179
DT
35
36 rc = (struct dlm_rcom *) mb;
37
3428785a
AA
38 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
39 rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
40 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
41 rc->rc_header.h_length = cpu_to_le16(mb_len);
e7fd4179
DT
42 rc->rc_header.h_cmd = DLM_RCOM;
43
2f9dbeda 44 rc->rc_type = cpu_to_le32(type);
c4f4e135 45 rc->rc_seq = cpu_to_le64(seq);
38aa8b0c 46
e7fd4179 47 *rc_ret = rc;
a070a91c
AA
48}
49
50static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
c4f4e135
AA
51 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret,
52 uint64_t seq)
a070a91c
AA
53{
54 int mb_len = sizeof(struct dlm_rcom) + len;
55 struct dlm_mhandle *mh;
56 char *mb;
57
98808644 58 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
a070a91c
AA
59 if (!mh) {
60 log_print("%s to %d type %d len %d ENOBUFS",
61 __func__, to_nodeid, type, len);
62 return -ENOBUFS;
63 }
64
c4f4e135 65 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
a070a91c 66 *mh_ret = mh;
e7fd4179
DT
67 return 0;
68}
69
a070a91c
AA
70static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
71 int len, struct dlm_rcom **rc_ret,
c4f4e135 72 struct dlm_msg **msg_ret, uint64_t seq)
a070a91c
AA
73{
74 int mb_len = sizeof(struct dlm_rcom) + len;
8f2dc78d 75 struct dlm_msg *msg;
a070a91c
AA
76 char *mb;
77
98808644 78 msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL);
8f2dc78d 79 if (!msg) {
a070a91c
AA
80 log_print("create_rcom to %d type %d len %d ENOBUFS",
81 to_nodeid, type, len);
82 return -ENOBUFS;
83 }
84
c4f4e135 85 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
8f2dc78d 86 *msg_ret = msg;
a070a91c
AA
87 return 0;
88}
89
88aa023a 90static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
a070a91c 91{
e01c4b7b 92 dlm_midcomms_commit_mhandle(mh, NULL, 0);
a070a91c
AA
93}
94
88aa023a 95static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
a070a91c 96{
8f2dc78d
AA
97 dlm_lowcomms_commit_msg(msg);
98 dlm_lowcomms_put_msg(msg);
e7fd4179
DT
99}
100
757a4271
DT
101static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
102 uint32_t flags)
103{
104 rs->rs_flags = cpu_to_le32(flags);
105}
106
e7fd4179
DT
107/* When replying to a status request, a node also sends back its
108 configuration values. The requesting node then checks that the remote
109 node is configured the same way as itself. */
110
757a4271
DT
111static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
112 uint32_t num_slots)
e7fd4179 113{
93ff2971
AV
114 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
115 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
757a4271
DT
116
117 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
118 rf->rf_num_slots = cpu_to_le16(num_slots);
119 rf->rf_generation = cpu_to_le32(ls->ls_generation);
e7fd4179
DT
120}
121
757a4271 122static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
e7fd4179 123{
9e971b71
DT
124 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
125
3428785a 126 if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) {
9e971b71
DT
127 log_error(ls, "version mismatch: %x nodeid %d: %x",
128 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
3428785a 129 le32_to_cpu(rc->rc_header.h_version));
8b0e7b2c 130 return -EPROTO;
9e971b71
DT
131 }
132
93ff2971
AV
133 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
134 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
e7fd4179 135 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
93ff2971
AV
136 ls->ls_lvblen, ls->ls_exflags, nodeid,
137 le32_to_cpu(rf->rf_lvblen),
138 le32_to_cpu(rf->rf_lsflags));
8b0e7b2c 139 return -EPROTO;
e7fd4179
DT
140 }
141 return 0;
142}
143
2f9dbeda 144static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
98f176fb 145{
578acf9a 146 spin_lock_bh(&ls->ls_rcom_spin);
2f9dbeda 147 *new_seq = cpu_to_le64(++ls->ls_rcom_seq);
98f176fb 148 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
578acf9a 149 spin_unlock_bh(&ls->ls_rcom_spin);
98f176fb
DT
150}
151
152static void disallow_sync_reply(struct dlm_ls *ls)
153{
578acf9a 154 spin_lock_bh(&ls->ls_rcom_spin);
98f176fb
DT
155 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
156 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
578acf9a 157 spin_unlock_bh(&ls->ls_rcom_spin);
98f176fb
DT
158}
159
757a4271
DT
160/*
161 * low nodeid gathers one slot value at a time from each node.
162 * it sets need_slots=0, and saves rf_our_slot returned from each
163 * rcom_config.
164 *
165 * other nodes gather all slot values at once from the low nodeid.
166 * they set need_slots=1, and ignore the rf_our_slot returned from each
167 * rcom_config. they use the rf_num_slots returned from the low
168 * node's rcom_config.
169 */
170
c4f4e135
AA
171int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
172 uint64_t seq)
e7fd4179
DT
173{
174 struct dlm_rcom *rc;
8f2dc78d 175 struct dlm_msg *msg;
e7fd4179
DT
176 int error = 0;
177
faa0f267 178 ls->ls_recover_nodeid = nodeid;
e7fd4179
DT
179
180 if (nodeid == dlm_our_nodeid()) {
4007685c 181 rc = ls->ls_recover_buf;
2f9dbeda 182 rc->rc_result = cpu_to_le32(dlm_recover_status(ls));
e7fd4179
DT
183 goto out;
184 }
185
59661212 186retry:
a070a91c 187 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
c4f4e135
AA
188 sizeof(struct rcom_status), &rc, &msg,
189 seq);
e7fd4179
DT
190 if (error)
191 goto out;
98f176fb 192
757a4271
DT
193 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
194
98f176fb 195 allow_sync_reply(ls, &rc->rc_id);
d10a0b88 196 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
e7fd4179 197
88aa023a 198 send_rcom_stateless(msg, rc);
e7fd4179
DT
199
200 error = dlm_wait_function(ls, &rcom_response);
98f176fb 201 disallow_sync_reply(ls);
59661212 202 if (error == -ETIMEDOUT)
203 goto retry;
e7fd4179
DT
204 if (error)
205 goto out;
206
4007685c 207 rc = ls->ls_recover_buf;
e7fd4179 208
2f9dbeda 209 if (rc->rc_result == cpu_to_le32(-ESRCH)) {
e7fd4179
DT
210 /* we pretend the remote lockspace exists with 0 status */
211 log_debug(ls, "remote node %d not ready", nodeid);
212 rc->rc_result = 0;
757a4271
DT
213 error = 0;
214 } else {
215 error = check_rcom_config(ls, rc, nodeid);
216 }
217
e7fd4179
DT
218 /* the caller looks at rc_result for the remote recovery status */
219 out:
220 return error;
221}
222
11519351
AA
223static void receive_rcom_status(struct dlm_ls *ls,
224 const struct dlm_rcom *rc_in,
c4f4e135 225 uint64_t seq)
e7fd4179
DT
226{
227 struct dlm_rcom *rc;
757a4271
DT
228 struct rcom_status *rs;
229 uint32_t status;
3428785a 230 int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
757a4271 231 int len = sizeof(struct rcom_config);
8f2dc78d 232 struct dlm_msg *msg;
757a4271
DT
233 int num_slots = 0;
234 int error;
235
236 if (!dlm_slots_version(&rc_in->rc_header)) {
237 status = dlm_recover_status(ls);
238 goto do_create;
239 }
240
241 rs = (struct rcom_status *)rc_in->rc_buf;
e7fd4179 242
c07127b4 243 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
757a4271
DT
244 status = dlm_recover_status(ls);
245 goto do_create;
246 }
247
578acf9a 248 spin_lock_bh(&ls->ls_recover_lock);
757a4271
DT
249 status = ls->ls_recover_status;
250 num_slots = ls->ls_num_slots;
578acf9a 251 spin_unlock_bh(&ls->ls_recover_lock);
757a4271
DT
252 len += num_slots * sizeof(struct rcom_slot);
253
254 do_create:
a070a91c 255 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
c4f4e135 256 len, &rc, &msg, seq);
e7fd4179
DT
257 if (error)
258 return;
757a4271 259
4a99c3d9 260 rc->rc_id = rc_in->rc_id;
38aa8b0c 261 rc->rc_seq_reply = rc_in->rc_seq;
2f9dbeda 262 rc->rc_result = cpu_to_le32(status);
757a4271
DT
263
264 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
265
266 if (!num_slots)
267 goto do_send;
268
578acf9a 269 spin_lock_bh(&ls->ls_recover_lock);
757a4271 270 if (ls->ls_num_slots != num_slots) {
578acf9a 271 spin_unlock_bh(&ls->ls_recover_lock);
757a4271
DT
272 log_debug(ls, "receive_rcom_status num_slots %d to %d",
273 num_slots, ls->ls_num_slots);
274 rc->rc_result = 0;
275 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
276 goto do_send;
277 }
278
279 dlm_slots_copy_out(ls, rc);
578acf9a 280 spin_unlock_bh(&ls->ls_recover_lock);
e7fd4179 281
757a4271 282 do_send:
88aa023a 283 send_rcom_stateless(msg, rc);
e7fd4179
DT
284}
285
11519351 286static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
e7fd4179 287{
578acf9a 288 spin_lock_bh(&ls->ls_rcom_spin);
98f176fb 289 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
2f9dbeda 290 le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
98f176fb 291 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
2f9dbeda 292 le32_to_cpu(rc_in->rc_type),
3428785a 293 le32_to_cpu(rc_in->rc_header.h_nodeid),
2f9dbeda 294 (unsigned long long)le64_to_cpu(rc_in->rc_id),
57adf7ee 295 (unsigned long long)ls->ls_rcom_seq);
98f176fb 296 goto out;
4a99c3d9 297 }
3428785a
AA
298 memcpy(ls->ls_recover_buf, rc_in,
299 le16_to_cpu(rc_in->rc_header.h_length));
e7fd4179 300 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
98f176fb 301 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
e7fd4179 302 wake_up(&ls->ls_wait_general);
98f176fb 303 out:
578acf9a 304 spin_unlock_bh(&ls->ls_rcom_spin);
e7fd4179
DT
305}
306
c4f4e135
AA
307int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
308 int last_len, uint64_t seq)
e7fd4179 309{
a3d85fcf 310 struct dlm_mhandle *mh;
e7fd4179 311 struct dlm_rcom *rc;
4007685c 312 int error = 0;
e7fd4179 313
faa0f267 314 ls->ls_recover_nodeid = nodeid;
e7fd4179 315
59661212 316retry:
a3d85fcf
AA
317 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len,
318 &rc, &mh, seq);
e7fd4179
DT
319 if (error)
320 goto out;
321 memcpy(rc->rc_buf, last_name, last_len);
98f176fb
DT
322
323 allow_sync_reply(ls, &rc->rc_id);
d10a0b88 324 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
e7fd4179 325
a3d85fcf 326 send_rcom(mh, rc);
e7fd4179
DT
327
328 error = dlm_wait_function(ls, &rcom_response);
98f176fb 329 disallow_sync_reply(ls);
59661212 330 if (error == -ETIMEDOUT)
331 goto retry;
e7fd4179
DT
332 out:
333 return error;
334}
335
11519351 336static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
c4f4e135 337 uint64_t seq)
e7fd4179 338{
a3d85fcf 339 struct dlm_mhandle *mh;
e7fd4179 340 struct dlm_rcom *rc;
38aa8b0c 341 int error, inlen, outlen, nodeid;
e7fd4179 342
3428785a
AA
343 nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
344 inlen = le16_to_cpu(rc_in->rc_header.h_length) -
345 sizeof(struct dlm_rcom);
d10a0b88 346 outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
e7fd4179 347
a3d85fcf
AA
348 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
349 &rc, &mh, seq);
e7fd4179
DT
350 if (error)
351 return;
4a99c3d9 352 rc->rc_id = rc_in->rc_id;
38aa8b0c 353 rc->rc_seq_reply = rc_in->rc_seq;
e7fd4179
DT
354
355 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
356 nodeid);
a3d85fcf 357 send_rcom(mh, rc);
e7fd4179
DT
358}
359
c4f4e135 360int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
e7fd4179
DT
361{
362 struct dlm_rcom *rc;
363 struct dlm_mhandle *mh;
364 struct dlm_ls *ls = r->res_ls;
365 int error;
366
367 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
c4f4e135 368 &rc, &mh, seq);
e7fd4179
DT
369 if (error)
370 goto out;
371 memcpy(rc->rc_buf, r->res_name, r->res_length);
2f9dbeda 372 rc->rc_id = cpu_to_le64(r->res_id);
e7fd4179 373
88aa023a 374 send_rcom(mh, rc);
e7fd4179
DT
375 out:
376 return error;
377}
378
11519351
AA
379static void receive_rcom_lookup(struct dlm_ls *ls,
380 const struct dlm_rcom *rc_in, uint64_t seq)
e7fd4179
DT
381{
382 struct dlm_rcom *rc;
383 struct dlm_mhandle *mh;
3428785a
AA
384 int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
385 int len = le16_to_cpu(rc_in->rc_header.h_length) -
386 sizeof(struct dlm_rcom);
e7fd4179 387
9250e523 388 /* Old code would send this special id to trigger a debug dump. */
2f9dbeda 389 if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) {
c04fecb4
DT
390 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
391 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
392 return;
393 }
394
c4f4e135
AA
395 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh,
396 seq);
f6089981
CIK
397 if (error)
398 return;
399
c04fecb4
DT
400 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
401 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
e7fd4179
DT
402 if (error)
403 ret_nodeid = error;
2f9dbeda 404 rc->rc_result = cpu_to_le32(ret_nodeid);
e7fd4179 405 rc->rc_id = rc_in->rc_id;
38aa8b0c 406 rc->rc_seq_reply = rc_in->rc_seq;
e7fd4179 407
88aa023a 408 send_rcom(mh, rc);
e7fd4179
DT
409}
410
11519351
AA
411static void receive_rcom_lookup_reply(struct dlm_ls *ls,
412 const struct dlm_rcom *rc_in)
e7fd4179
DT
413{
414 dlm_recover_master_reply(ls, rc_in);
415}
416
417static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
418 struct rcom_lock *rl)
419{
420 memset(rl, 0, sizeof(*rl));
421
163a1859
AV
422 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
423 rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
424 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
8a39dcd9 425 rl->rl_flags = cpu_to_le32(dlm_dflags_val(lkb));
163a1859 426 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
e7fd4179
DT
427 rl->rl_rqmode = lkb->lkb_rqmode;
428 rl->rl_grmode = lkb->lkb_grmode;
429 rl->rl_status = lkb->lkb_status;
163a1859 430 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
e7fd4179 431
e5dae548 432 if (lkb->lkb_bastfn)
8304d6f2 433 rl->rl_asts |= DLM_CB_BAST;
e5dae548 434 if (lkb->lkb_astfn)
8304d6f2 435 rl->rl_asts |= DLM_CB_CAST;
e7fd4179 436
163a1859 437 rl->rl_namelen = cpu_to_le16(r->res_length);
e7fd4179
DT
438 memcpy(rl->rl_name, r->res_name, r->res_length);
439
440 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
441 If so, receive_rcom_lock_args() won't take this copy. */
442
443 if (lkb->lkb_lvbptr)
444 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
445}
446
c4f4e135 447int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
e7fd4179
DT
448{
449 struct dlm_ls *ls = r->res_ls;
450 struct dlm_rcom *rc;
451 struct dlm_mhandle *mh;
452 struct rcom_lock *rl;
453 int error, len = sizeof(struct rcom_lock);
454
455 if (lkb->lkb_lvbptr)
456 len += ls->ls_lvblen;
457
c4f4e135
AA
458 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
459 seq);
e7fd4179
DT
460 if (error)
461 goto out;
462
463 rl = (struct rcom_lock *) rc->rc_buf;
464 pack_rcom_lock(r, lkb, rl);
e425ac99 465 rc->rc_id = cpu_to_le64((uintptr_t)r);
e7fd4179 466
88aa023a 467 send_rcom(mh, rc);
e7fd4179
DT
468 out:
469 return error;
470}
471
ae773d0b 472/* needs at least dlm_rcom + rcom_lock */
11519351 473static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in,
c4f4e135 474 uint64_t seq)
e7fd4179 475{
b9d2f6ad
AA
476 __le32 rl_remid, rl_result;
477 struct rcom_lock *rl;
e7fd4179
DT
478 struct dlm_rcom *rc;
479 struct dlm_mhandle *mh;
3428785a 480 int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
e7fd4179 481
b9d2f6ad 482 dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result);
e7fd4179
DT
483
484 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
c4f4e135 485 sizeof(struct rcom_lock), &rc, &mh, seq);
e7fd4179
DT
486 if (error)
487 return;
488
e7fd4179 489 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
b9d2f6ad
AA
490 rl = (struct rcom_lock *)rc->rc_buf;
491 /* set rl_remid and rl_result from dlm_recover_master_copy() */
492 rl->rl_remid = rl_remid;
493 rl->rl_result = rl_result;
494
e7fd4179 495 rc->rc_id = rc_in->rc_id;
38aa8b0c 496 rc->rc_seq_reply = rc_in->rc_seq;
e7fd4179 497
88aa023a 498 send_rcom(mh, rc);
e7fd4179
DT
499}
500
c36258b5
DT
501/* If the lockspace doesn't exist then still send a status message
502 back; it's possible that it just doesn't have its global_id yet. */
503
11519351 504int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in)
e7fd4179
DT
505{
506 struct dlm_rcom *rc;
1babdb45 507 struct rcom_config *rf;
e7fd4179
DT
508 struct dlm_mhandle *mh;
509 char *mb;
1babdb45 510 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
e7fd4179 511
98808644 512 mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb);
e7fd4179
DT
513 if (!mh)
514 return -ENOBUFS;
e7fd4179
DT
515
516 rc = (struct dlm_rcom *) mb;
517
3428785a 518 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
8e2e4086 519 rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
3428785a
AA
520 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
521 rc->rc_header.h_length = cpu_to_le16(mb_len);
e7fd4179
DT
522 rc->rc_header.h_cmd = DLM_RCOM;
523
2f9dbeda 524 rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY);
f5888750 525 rc->rc_id = rc_in->rc_id;
38aa8b0c 526 rc->rc_seq_reply = rc_in->rc_seq;
2f9dbeda 527 rc->rc_result = cpu_to_le32(-ESRCH);
e7fd4179 528
1babdb45 529 rf = (struct rcom_config *) rc->rc_buf;
93ff2971 530 rf->rf_lvblen = cpu_to_le32(~0U);
1babdb45 531
e01c4b7b 532 dlm_midcomms_commit_mhandle(mh, NULL, 0);
e7fd4179
DT
533
534 return 0;
535}
536
c04fecb4
DT
537/*
538 * Ignore messages for stage Y before we set
539 * recover_status bit for stage X:
540 *
541 * recover_status = 0
542 *
543 * dlm_recover_members()
544 * - send nothing
545 * - recv nothing
546 * - ignore NAMES, NAMES_REPLY
547 * - ignore LOOKUP, LOOKUP_REPLY
548 * - ignore LOCK, LOCK_REPLY
549 *
550 * recover_status |= NODES
551 *
552 * dlm_recover_members_wait()
553 *
554 * dlm_recover_directory()
555 * - send NAMES
556 * - recv NAMES_REPLY
557 * - ignore LOOKUP, LOOKUP_REPLY
558 * - ignore LOCK, LOCK_REPLY
559 *
560 * recover_status |= DIR
561 *
562 * dlm_recover_directory_wait()
563 *
564 * dlm_recover_masters()
565 * - send LOOKUP
566 * - recv LOOKUP_REPLY
567 *
568 * dlm_recover_locks()
569 * - send LOCKS
570 * - recv LOCKS_REPLY
571 *
572 * recover_status |= LOCKS
573 *
574 * dlm_recover_locks_wait()
575 *
576 * recover_status |= DONE
577 */
578
d6e24788
DT
579/* Called by dlm_recv; corresponds to dlm_receive_message() but special
580 recovery-only comms are sent through here. */
581
11519351 582void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
38aa8b0c 583{
d6e24788 584 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
c04fecb4 585 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
4875647a 586 uint32_t status;
38aa8b0c 587 uint64_t seq;
38aa8b0c
DT
588
589 switch (rc->rc_type) {
2f9dbeda 590 case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
c04fecb4
DT
591 reply = 1;
592 break;
2f9dbeda 593 case cpu_to_le32(DLM_RCOM_NAMES):
c04fecb4
DT
594 names = 1;
595 break;
2f9dbeda 596 case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
c04fecb4
DT
597 names = 1;
598 reply = 1;
599 break;
2f9dbeda 600 case cpu_to_le32(DLM_RCOM_LOOKUP):
c04fecb4
DT
601 lookup = 1;
602 break;
2f9dbeda 603 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
c04fecb4
DT
604 lookup = 1;
605 reply = 1;
606 break;
2f9dbeda 607 case cpu_to_le32(DLM_RCOM_LOCK):
4875647a
DT
608 lock = 1;
609 break;
2f9dbeda 610 case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
4875647a
DT
611 lock = 1;
612 reply = 1;
613 break;
90db4f8b 614 }
38aa8b0c 615
578acf9a 616 spin_lock_bh(&ls->ls_recover_lock);
4875647a 617 status = ls->ls_recover_status;
3e973671 618 stop = dlm_recovery_stopped(ls);
d6e24788 619 seq = ls->ls_recover_seq;
578acf9a 620 spin_unlock_bh(&ls->ls_recover_lock);
ae773d0b 621
2f9dbeda 622 if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
c04fecb4
DT
623 goto ignore;
624
2f9dbeda 625 if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq))
c04fecb4
DT
626 goto ignore;
627
628 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
629 goto ignore;
630
631 if (!(status & DLM_RS_DIR) && (lookup || lock))
632 goto ignore;
e7fd4179 633
e7fd4179 634 switch (rc->rc_type) {
2f9dbeda 635 case cpu_to_le32(DLM_RCOM_STATUS):
c4f4e135 636 receive_rcom_status(ls, rc, seq);
e7fd4179
DT
637 break;
638
2f9dbeda 639 case cpu_to_le32(DLM_RCOM_NAMES):
c4f4e135 640 receive_rcom_names(ls, rc, seq);
e7fd4179
DT
641 break;
642
2f9dbeda 643 case cpu_to_le32(DLM_RCOM_LOOKUP):
c4f4e135 644 receive_rcom_lookup(ls, rc, seq);
e7fd4179
DT
645 break;
646
2f9dbeda 647 case cpu_to_le32(DLM_RCOM_LOCK):
3428785a 648 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
ae773d0b 649 goto Eshort;
c4f4e135 650 receive_rcom_lock(ls, rc, seq);
e7fd4179
DT
651 break;
652
2f9dbeda 653 case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
dbcfc347 654 receive_sync_reply(ls, rc);
e7fd4179
DT
655 break;
656
2f9dbeda 657 case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
dbcfc347 658 receive_sync_reply(ls, rc);
e7fd4179
DT
659 break;
660
2f9dbeda 661 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
e7fd4179
DT
662 receive_rcom_lookup_reply(ls, rc);
663 break;
664
2f9dbeda 665 case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
3428785a 666 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
ae773d0b 667 goto Eshort;
c4f4e135 668 dlm_recover_process_copy(ls, rc, seq);
e7fd4179
DT
669 break;
670
671 default:
2f9dbeda
AA
672 log_error(ls, "receive_rcom bad type %d",
673 le32_to_cpu(rc->rc_type));
e7fd4179 674 }
c04fecb4
DT
675 return;
676
677ignore:
678 log_limit(ls, "dlm_receive_rcom ignore msg %d "
679 "from %d %llu %llu recover seq %llu sts %x gen %u",
2f9dbeda 680 le32_to_cpu(rc->rc_type),
c04fecb4 681 nodeid,
2f9dbeda
AA
682 (unsigned long long)le64_to_cpu(rc->rc_seq),
683 (unsigned long long)le64_to_cpu(rc->rc_seq_reply),
c04fecb4
DT
684 (unsigned long long)seq,
685 status, ls->ls_generation);
c36258b5 686 return;
ae773d0b 687Eshort:
c04fecb4 688 log_error(ls, "recovery message %d from %d is too short",
2f9dbeda 689 le32_to_cpu(rc->rc_type), nodeid);
e7fd4179
DT
690}
691