Merge tag 'trace-v6.3-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/trace...
[linux-block.git] / fs / dlm / rcom.c
CommitLineData
2522fe45 1// SPDX-License-Identifier: GPL-2.0-only
e7fd4179
DT
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
dbcfc347 6** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
e7fd4179 7**
e7fd4179
DT
8**
9*******************************************************************************
10******************************************************************************/
11
12#include "dlm_internal.h"
13#include "lockspace.h"
14#include "member.h"
15#include "lowcomms.h"
16#include "midcomms.h"
17#include "rcom.h"
18#include "recover.h"
19#include "dir.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "util.h"
e7fd4179
DT
24
25static int rcom_response(struct dlm_ls *ls)
26{
27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28}
29
a070a91c
AA
30static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31 struct dlm_rcom **rc_ret, char *mb, int mb_len)
e7fd4179
DT
32{
33 struct dlm_rcom *rc;
e7fd4179
DT
34
35 rc = (struct dlm_rcom *) mb;
36
3428785a
AA
37 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
38 rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
39 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
40 rc->rc_header.h_length = cpu_to_le16(mb_len);
e7fd4179
DT
41 rc->rc_header.h_cmd = DLM_RCOM;
42
2f9dbeda 43 rc->rc_type = cpu_to_le32(type);
e7fd4179 44
38aa8b0c 45 spin_lock(&ls->ls_recover_lock);
2f9dbeda 46 rc->rc_seq = cpu_to_le64(ls->ls_recover_seq);
38aa8b0c
DT
47 spin_unlock(&ls->ls_recover_lock);
48
e7fd4179 49 *rc_ret = rc;
a070a91c
AA
50}
51
52static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
53 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
54{
55 int mb_len = sizeof(struct dlm_rcom) + len;
56 struct dlm_mhandle *mh;
57 char *mb;
58
59 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
60 if (!mh) {
61 log_print("%s to %d type %d len %d ENOBUFS",
62 __func__, to_nodeid, type, len);
63 return -ENOBUFS;
64 }
65
66 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
67 *mh_ret = mh;
e7fd4179
DT
68 return 0;
69}
70
a070a91c
AA
71static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
72 int len, struct dlm_rcom **rc_ret,
8f2dc78d 73 struct dlm_msg **msg_ret)
a070a91c
AA
74{
75 int mb_len = sizeof(struct dlm_rcom) + len;
8f2dc78d 76 struct dlm_msg *msg;
a070a91c
AA
77 char *mb;
78
8f2dc78d
AA
79 msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
80 NULL, NULL);
81 if (!msg) {
a070a91c
AA
82 log_print("create_rcom to %d type %d len %d ENOBUFS",
83 to_nodeid, type, len);
84 return -ENOBUFS;
85 }
86
87 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
8f2dc78d 88 *msg_ret = msg;
a070a91c
AA
89 return 0;
90}
91
88aa023a 92static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
a070a91c 93{
e01c4b7b 94 dlm_midcomms_commit_mhandle(mh, NULL, 0);
a070a91c
AA
95}
96
88aa023a 97static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
a070a91c 98{
8f2dc78d
AA
99 dlm_lowcomms_commit_msg(msg);
100 dlm_lowcomms_put_msg(msg);
e7fd4179
DT
101}
102
757a4271
DT
103static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
104 uint32_t flags)
105{
106 rs->rs_flags = cpu_to_le32(flags);
107}
108
e7fd4179
DT
109/* When replying to a status request, a node also sends back its
110 configuration values. The requesting node then checks that the remote
111 node is configured the same way as itself. */
112
757a4271
DT
113static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
114 uint32_t num_slots)
e7fd4179 115{
93ff2971
AV
116 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
117 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
757a4271
DT
118
119 rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
120 rf->rf_num_slots = cpu_to_le16(num_slots);
121 rf->rf_generation = cpu_to_le32(ls->ls_generation);
e7fd4179
DT
122}
123
757a4271 124static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
e7fd4179 125{
9e971b71
DT
126 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
127
3428785a 128 if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) {
9e971b71
DT
129 log_error(ls, "version mismatch: %x nodeid %d: %x",
130 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
3428785a 131 le32_to_cpu(rc->rc_header.h_version));
8b0e7b2c 132 return -EPROTO;
9e971b71
DT
133 }
134
93ff2971
AV
135 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
136 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
e7fd4179 137 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
93ff2971
AV
138 ls->ls_lvblen, ls->ls_exflags, nodeid,
139 le32_to_cpu(rf->rf_lvblen),
140 le32_to_cpu(rf->rf_lsflags));
8b0e7b2c 141 return -EPROTO;
e7fd4179
DT
142 }
143 return 0;
144}
145
2f9dbeda 146static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
98f176fb
DT
147{
148 spin_lock(&ls->ls_rcom_spin);
2f9dbeda 149 *new_seq = cpu_to_le64(++ls->ls_rcom_seq);
98f176fb
DT
150 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
151 spin_unlock(&ls->ls_rcom_spin);
152}
153
154static void disallow_sync_reply(struct dlm_ls *ls)
155{
156 spin_lock(&ls->ls_rcom_spin);
157 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
158 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
159 spin_unlock(&ls->ls_rcom_spin);
160}
161
757a4271
DT
162/*
163 * low nodeid gathers one slot value at a time from each node.
164 * it sets need_slots=0, and saves rf_our_slot returned from each
165 * rcom_config.
166 *
167 * other nodes gather all slot values at once from the low nodeid.
168 * they set need_slots=1, and ignore the rf_our_slot returned from each
169 * rcom_config. they use the rf_num_slots returned from the low
170 * node's rcom_config.
171 */
172
173int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
e7fd4179
DT
174{
175 struct dlm_rcom *rc;
8f2dc78d 176 struct dlm_msg *msg;
e7fd4179
DT
177 int error = 0;
178
faa0f267 179 ls->ls_recover_nodeid = nodeid;
e7fd4179
DT
180
181 if (nodeid == dlm_our_nodeid()) {
4007685c 182 rc = ls->ls_recover_buf;
2f9dbeda 183 rc->rc_result = cpu_to_le32(dlm_recover_status(ls));
e7fd4179
DT
184 goto out;
185 }
186
59661212 187retry:
a070a91c 188 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
8f2dc78d 189 sizeof(struct rcom_status), &rc, &msg);
e7fd4179
DT
190 if (error)
191 goto out;
98f176fb 192
757a4271
DT
193 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
194
98f176fb 195 allow_sync_reply(ls, &rc->rc_id);
d10a0b88 196 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
e7fd4179 197
88aa023a 198 send_rcom_stateless(msg, rc);
e7fd4179
DT
199
200 error = dlm_wait_function(ls, &rcom_response);
98f176fb 201 disallow_sync_reply(ls);
59661212 202 if (error == -ETIMEDOUT)
203 goto retry;
e7fd4179
DT
204 if (error)
205 goto out;
206
4007685c 207 rc = ls->ls_recover_buf;
e7fd4179 208
2f9dbeda 209 if (rc->rc_result == cpu_to_le32(-ESRCH)) {
e7fd4179
DT
210 /* we pretend the remote lockspace exists with 0 status */
211 log_debug(ls, "remote node %d not ready", nodeid);
212 rc->rc_result = 0;
757a4271
DT
213 error = 0;
214 } else {
215 error = check_rcom_config(ls, rc, nodeid);
216 }
217
e7fd4179
DT
218 /* the caller looks at rc_result for the remote recovery status */
219 out:
220 return error;
221}
222
223static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
224{
225 struct dlm_rcom *rc;
757a4271
DT
226 struct rcom_status *rs;
227 uint32_t status;
3428785a 228 int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
757a4271 229 int len = sizeof(struct rcom_config);
8f2dc78d 230 struct dlm_msg *msg;
757a4271
DT
231 int num_slots = 0;
232 int error;
233
234 if (!dlm_slots_version(&rc_in->rc_header)) {
235 status = dlm_recover_status(ls);
236 goto do_create;
237 }
238
239 rs = (struct rcom_status *)rc_in->rc_buf;
e7fd4179 240
c07127b4 241 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
757a4271
DT
242 status = dlm_recover_status(ls);
243 goto do_create;
244 }
245
246 spin_lock(&ls->ls_recover_lock);
247 status = ls->ls_recover_status;
248 num_slots = ls->ls_num_slots;
249 spin_unlock(&ls->ls_recover_lock);
250 len += num_slots * sizeof(struct rcom_slot);
251
252 do_create:
a070a91c 253 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
8f2dc78d 254 len, &rc, &msg);
e7fd4179
DT
255 if (error)
256 return;
757a4271 257
4a99c3d9 258 rc->rc_id = rc_in->rc_id;
38aa8b0c 259 rc->rc_seq_reply = rc_in->rc_seq;
2f9dbeda 260 rc->rc_result = cpu_to_le32(status);
757a4271
DT
261
262 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
263
264 if (!num_slots)
265 goto do_send;
266
267 spin_lock(&ls->ls_recover_lock);
268 if (ls->ls_num_slots != num_slots) {
269 spin_unlock(&ls->ls_recover_lock);
270 log_debug(ls, "receive_rcom_status num_slots %d to %d",
271 num_slots, ls->ls_num_slots);
272 rc->rc_result = 0;
273 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
274 goto do_send;
275 }
276
277 dlm_slots_copy_out(ls, rc);
278 spin_unlock(&ls->ls_recover_lock);
e7fd4179 279
757a4271 280 do_send:
88aa023a 281 send_rcom_stateless(msg, rc);
e7fd4179
DT
282}
283
4a99c3d9 284static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
e7fd4179 285{
98f176fb
DT
286 spin_lock(&ls->ls_rcom_spin);
287 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
2f9dbeda 288 le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
98f176fb 289 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
2f9dbeda 290 le32_to_cpu(rc_in->rc_type),
3428785a 291 le32_to_cpu(rc_in->rc_header.h_nodeid),
2f9dbeda 292 (unsigned long long)le64_to_cpu(rc_in->rc_id),
57adf7ee 293 (unsigned long long)ls->ls_rcom_seq);
98f176fb 294 goto out;
4a99c3d9 295 }
3428785a
AA
296 memcpy(ls->ls_recover_buf, rc_in,
297 le16_to_cpu(rc_in->rc_header.h_length));
e7fd4179 298 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
98f176fb 299 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
e7fd4179 300 wake_up(&ls->ls_wait_general);
98f176fb
DT
301 out:
302 spin_unlock(&ls->ls_rcom_spin);
e7fd4179
DT
303}
304
305int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
306{
307 struct dlm_rcom *rc;
8f2dc78d 308 struct dlm_msg *msg;
4007685c 309 int error = 0;
e7fd4179 310
faa0f267 311 ls->ls_recover_nodeid = nodeid;
e7fd4179 312
59661212 313retry:
a070a91c 314 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
8f2dc78d 315 &rc, &msg);
e7fd4179
DT
316 if (error)
317 goto out;
318 memcpy(rc->rc_buf, last_name, last_len);
98f176fb
DT
319
320 allow_sync_reply(ls, &rc->rc_id);
d10a0b88 321 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
e7fd4179 322
88aa023a 323 send_rcom_stateless(msg, rc);
e7fd4179
DT
324
325 error = dlm_wait_function(ls, &rcom_response);
98f176fb 326 disallow_sync_reply(ls);
59661212 327 if (error == -ETIMEDOUT)
328 goto retry;
e7fd4179
DT
329 out:
330 return error;
331}
332
333static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
334{
335 struct dlm_rcom *rc;
38aa8b0c 336 int error, inlen, outlen, nodeid;
8f2dc78d 337 struct dlm_msg *msg;
e7fd4179 338
3428785a
AA
339 nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
340 inlen = le16_to_cpu(rc_in->rc_header.h_length) -
341 sizeof(struct dlm_rcom);
d10a0b88 342 outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
e7fd4179 343
a070a91c 344 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
8f2dc78d 345 &rc, &msg);
e7fd4179
DT
346 if (error)
347 return;
4a99c3d9 348 rc->rc_id = rc_in->rc_id;
38aa8b0c 349 rc->rc_seq_reply = rc_in->rc_seq;
e7fd4179
DT
350
351 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
352 nodeid);
88aa023a 353 send_rcom_stateless(msg, rc);
e7fd4179
DT
354}
355
e7fd4179
DT
356int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
357{
358 struct dlm_rcom *rc;
359 struct dlm_mhandle *mh;
360 struct dlm_ls *ls = r->res_ls;
361 int error;
362
363 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
364 &rc, &mh);
365 if (error)
366 goto out;
367 memcpy(rc->rc_buf, r->res_name, r->res_length);
2f9dbeda 368 rc->rc_id = cpu_to_le64(r->res_id);
e7fd4179 369
88aa023a 370 send_rcom(mh, rc);
e7fd4179
DT
371 out:
372 return error;
373}
374
375static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
376{
377 struct dlm_rcom *rc;
378 struct dlm_mhandle *mh;
3428785a
AA
379 int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
380 int len = le16_to_cpu(rc_in->rc_header.h_length) -
381 sizeof(struct dlm_rcom);
e7fd4179 382
9250e523 383 /* Old code would send this special id to trigger a debug dump. */
2f9dbeda 384 if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) {
c04fecb4
DT
385 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
386 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
387 return;
388 }
389
f6089981
CIK
390 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
391 if (error)
392 return;
393
c04fecb4
DT
394 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
395 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
e7fd4179
DT
396 if (error)
397 ret_nodeid = error;
2f9dbeda 398 rc->rc_result = cpu_to_le32(ret_nodeid);
e7fd4179 399 rc->rc_id = rc_in->rc_id;
38aa8b0c 400 rc->rc_seq_reply = rc_in->rc_seq;
e7fd4179 401
88aa023a 402 send_rcom(mh, rc);
e7fd4179
DT
403}
404
405static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
406{
407 dlm_recover_master_reply(ls, rc_in);
408}
409
410static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
411 struct rcom_lock *rl)
412{
413 memset(rl, 0, sizeof(*rl));
414
163a1859
AV
415 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
416 rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
417 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
418 rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
419 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
e7fd4179
DT
420 rl->rl_rqmode = lkb->lkb_rqmode;
421 rl->rl_grmode = lkb->lkb_grmode;
422 rl->rl_status = lkb->lkb_status;
163a1859 423 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
e7fd4179 424
e5dae548 425 if (lkb->lkb_bastfn)
8304d6f2 426 rl->rl_asts |= DLM_CB_BAST;
e5dae548 427 if (lkb->lkb_astfn)
8304d6f2 428 rl->rl_asts |= DLM_CB_CAST;
e7fd4179 429
163a1859 430 rl->rl_namelen = cpu_to_le16(r->res_length);
e7fd4179
DT
431 memcpy(rl->rl_name, r->res_name, r->res_length);
432
433 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
434 If so, receive_rcom_lock_args() won't take this copy. */
435
436 if (lkb->lkb_lvbptr)
437 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
438}
439
440int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
441{
442 struct dlm_ls *ls = r->res_ls;
443 struct dlm_rcom *rc;
444 struct dlm_mhandle *mh;
445 struct rcom_lock *rl;
446 int error, len = sizeof(struct rcom_lock);
447
448 if (lkb->lkb_lvbptr)
449 len += ls->ls_lvblen;
450
451 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
452 if (error)
453 goto out;
454
455 rl = (struct rcom_lock *) rc->rc_buf;
456 pack_rcom_lock(r, lkb, rl);
e425ac99 457 rc->rc_id = cpu_to_le64((uintptr_t)r);
e7fd4179 458
88aa023a 459 send_rcom(mh, rc);
e7fd4179
DT
460 out:
461 return error;
462}
463
ae773d0b 464/* needs at least dlm_rcom + rcom_lock */
e7fd4179
DT
465static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
466{
467 struct dlm_rcom *rc;
468 struct dlm_mhandle *mh;
3428785a 469 int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
e7fd4179
DT
470
471 dlm_recover_master_copy(ls, rc_in);
472
473 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
474 sizeof(struct rcom_lock), &rc, &mh);
475 if (error)
476 return;
477
478 /* We send back the same rcom_lock struct we received, but
479 dlm_recover_master_copy() has filled in rl_remid and rl_result */
480
481 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
482 rc->rc_id = rc_in->rc_id;
38aa8b0c 483 rc->rc_seq_reply = rc_in->rc_seq;
e7fd4179 484
88aa023a 485 send_rcom(mh, rc);
e7fd4179
DT
486}
487
c36258b5
DT
488/* If the lockspace doesn't exist then still send a status message
489 back; it's possible that it just doesn't have its global_id yet. */
490
491int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
e7fd4179
DT
492{
493 struct dlm_rcom *rc;
1babdb45 494 struct rcom_config *rf;
e7fd4179
DT
495 struct dlm_mhandle *mh;
496 char *mb;
1babdb45 497 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
e7fd4179 498
a070a91c 499 mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
e7fd4179
DT
500 if (!mh)
501 return -ENOBUFS;
e7fd4179
DT
502
503 rc = (struct dlm_rcom *) mb;
504
3428785a 505 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
8e2e4086 506 rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
3428785a
AA
507 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
508 rc->rc_header.h_length = cpu_to_le16(mb_len);
e7fd4179
DT
509 rc->rc_header.h_cmd = DLM_RCOM;
510
2f9dbeda 511 rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY);
f5888750 512 rc->rc_id = rc_in->rc_id;
38aa8b0c 513 rc->rc_seq_reply = rc_in->rc_seq;
2f9dbeda 514 rc->rc_result = cpu_to_le32(-ESRCH);
e7fd4179 515
1babdb45 516 rf = (struct rcom_config *) rc->rc_buf;
93ff2971 517 rf->rf_lvblen = cpu_to_le32(~0U);
1babdb45 518
e01c4b7b 519 dlm_midcomms_commit_mhandle(mh, NULL, 0);
e7fd4179
DT
520
521 return 0;
522}
523
c04fecb4
DT
524/*
525 * Ignore messages for stage Y before we set
526 * recover_status bit for stage X:
527 *
528 * recover_status = 0
529 *
530 * dlm_recover_members()
531 * - send nothing
532 * - recv nothing
533 * - ignore NAMES, NAMES_REPLY
534 * - ignore LOOKUP, LOOKUP_REPLY
535 * - ignore LOCK, LOCK_REPLY
536 *
537 * recover_status |= NODES
538 *
539 * dlm_recover_members_wait()
540 *
541 * dlm_recover_directory()
542 * - send NAMES
543 * - recv NAMES_REPLY
544 * - ignore LOOKUP, LOOKUP_REPLY
545 * - ignore LOCK, LOCK_REPLY
546 *
547 * recover_status |= DIR
548 *
549 * dlm_recover_directory_wait()
550 *
551 * dlm_recover_masters()
552 * - send LOOKUP
553 * - recv LOOKUP_REPLY
554 *
555 * dlm_recover_locks()
556 * - send LOCKS
557 * - recv LOCKS_REPLY
558 *
559 * recover_status |= LOCKS
560 *
561 * dlm_recover_locks_wait()
562 *
563 * recover_status |= DONE
564 */
565
d6e24788
DT
566/* Called by dlm_recv; corresponds to dlm_receive_message() but special
567 recovery-only comms are sent through here. */
568
569void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
38aa8b0c 570{
d6e24788 571 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
c04fecb4 572 int stop, reply = 0, names = 0, lookup = 0, lock = 0;
4875647a 573 uint32_t status;
38aa8b0c 574 uint64_t seq;
38aa8b0c
DT
575
576 switch (rc->rc_type) {
2f9dbeda 577 case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
c04fecb4
DT
578 reply = 1;
579 break;
2f9dbeda 580 case cpu_to_le32(DLM_RCOM_NAMES):
c04fecb4
DT
581 names = 1;
582 break;
2f9dbeda 583 case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
c04fecb4
DT
584 names = 1;
585 reply = 1;
586 break;
2f9dbeda 587 case cpu_to_le32(DLM_RCOM_LOOKUP):
c04fecb4
DT
588 lookup = 1;
589 break;
2f9dbeda 590 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
c04fecb4
DT
591 lookup = 1;
592 reply = 1;
593 break;
2f9dbeda 594 case cpu_to_le32(DLM_RCOM_LOCK):
4875647a
DT
595 lock = 1;
596 break;
2f9dbeda 597 case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
4875647a
DT
598 lock = 1;
599 reply = 1;
600 break;
90db4f8b 601 }
38aa8b0c 602
d6e24788 603 spin_lock(&ls->ls_recover_lock);
4875647a 604 status = ls->ls_recover_status;
3e973671 605 stop = dlm_recovery_stopped(ls);
d6e24788
DT
606 seq = ls->ls_recover_seq;
607 spin_unlock(&ls->ls_recover_lock);
ae773d0b 608
2f9dbeda 609 if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
c04fecb4
DT
610 goto ignore;
611
2f9dbeda 612 if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq))
c04fecb4
DT
613 goto ignore;
614
615 if (!(status & DLM_RS_NODES) && (names || lookup || lock))
616 goto ignore;
617
618 if (!(status & DLM_RS_DIR) && (lookup || lock))
619 goto ignore;
e7fd4179 620
e7fd4179 621 switch (rc->rc_type) {
2f9dbeda 622 case cpu_to_le32(DLM_RCOM_STATUS):
e7fd4179
DT
623 receive_rcom_status(ls, rc);
624 break;
625
2f9dbeda 626 case cpu_to_le32(DLM_RCOM_NAMES):
e7fd4179
DT
627 receive_rcom_names(ls, rc);
628 break;
629
2f9dbeda 630 case cpu_to_le32(DLM_RCOM_LOOKUP):
e7fd4179
DT
631 receive_rcom_lookup(ls, rc);
632 break;
633
2f9dbeda 634 case cpu_to_le32(DLM_RCOM_LOCK):
3428785a 635 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
ae773d0b 636 goto Eshort;
e7fd4179
DT
637 receive_rcom_lock(ls, rc);
638 break;
639
2f9dbeda 640 case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
dbcfc347 641 receive_sync_reply(ls, rc);
e7fd4179
DT
642 break;
643
2f9dbeda 644 case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
dbcfc347 645 receive_sync_reply(ls, rc);
e7fd4179
DT
646 break;
647
2f9dbeda 648 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
e7fd4179
DT
649 receive_rcom_lookup_reply(ls, rc);
650 break;
651
2f9dbeda 652 case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
3428785a 653 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
ae773d0b 654 goto Eshort;
dbcfc347 655 dlm_recover_process_copy(ls, rc);
e7fd4179
DT
656 break;
657
658 default:
2f9dbeda
AA
659 log_error(ls, "receive_rcom bad type %d",
660 le32_to_cpu(rc->rc_type));
e7fd4179 661 }
c04fecb4
DT
662 return;
663
664ignore:
665 log_limit(ls, "dlm_receive_rcom ignore msg %d "
666 "from %d %llu %llu recover seq %llu sts %x gen %u",
2f9dbeda 667 le32_to_cpu(rc->rc_type),
c04fecb4 668 nodeid,
2f9dbeda
AA
669 (unsigned long long)le64_to_cpu(rc->rc_seq),
670 (unsigned long long)le64_to_cpu(rc->rc_seq_reply),
c04fecb4
DT
671 (unsigned long long)seq,
672 status, ls->ls_generation);
c36258b5 673 return;
ae773d0b 674Eshort:
c04fecb4 675 log_error(ls, "recovery message %d from %d is too short",
2f9dbeda 676 le32_to_cpu(rc->rc_type), nodeid);
e7fd4179
DT
677}
678