Merge tag 'locking-core-2023-05-05' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-block.git] / fs / ceph / mds_client.c
CommitLineData
b2441318 1// SPDX-License-Identifier: GPL-2.0
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
2f2dc053 3
496e5955 4#include <linux/fs.h>
2f2dc053 5#include <linux/wait.h>
5a0e3ad6 6#include <linux/slab.h>
54008399 7#include <linux/gfp.h>
2f2dc053 8#include <linux/sched.h>
3d14c5d2
YS
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
3e0708b9 11#include <linux/ratelimit.h>
9ba1e224 12#include <linux/bits.h>
70c94820 13#include <linux/ktime.h>
d517b398 14#include <linux/bitmap.h>
2f2dc053 15
2f2dc053 16#include "super.h"
3d14c5d2
YS
17#include "mds_client.h"
18
1fe60e51 19#include <linux/ceph/ceph_features.h>
3d14c5d2
YS
20#include <linux/ceph/messenger.h>
21#include <linux/ceph/decode.h>
22#include <linux/ceph/pagelist.h>
23#include <linux/ceph/auth.h>
24#include <linux/ceph/debugfs.h>
2f2dc053 25
81c5a148
YZ
26#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
27
2f2dc053
SW
28/*
29 * A cluster of MDS (metadata server) daemons is responsible for
30 * managing the file system namespace (the directory hierarchy and
31 * inodes) and for coordinating shared access to storage. Metadata is
32 * partitioning hierarchically across a number of servers, and that
33 * partition varies over time as the cluster adjusts the distribution
34 * in order to balance load.
35 *
36 * The MDS client is primarily responsible to managing synchronous
37 * metadata requests for operations like open, unlink, and so forth.
38 * If there is a MDS failure, we find out about it when we (possibly
39 * request and) receive a new MDS map, and can resubmit affected
40 * requests.
41 *
42 * For the most part, though, we take advantage of a lossless
43 * communications channel to the MDS, and do not need to worry about
44 * timing out or resubmitting requests.
45 *
46 * We maintain a stateful "session" with each MDS we interact with.
47 * Within each session, we sent periodic heartbeat messages to ensure
48 * any capabilities or leases we have been issues remain valid. If
49 * the session times out and goes stale, our leases and capabilities
50 * are no longer valid.
51 */
52
20cb34ae 53struct ceph_reconnect_state {
81c5a148
YZ
54 struct ceph_mds_session *session;
55 int nr_caps, nr_realms;
20cb34ae 56 struct ceph_pagelist *pagelist;
121f22a1 57 unsigned msg_version;
81c5a148 58 bool allow_multi;
20cb34ae
SW
59};
60
2f2dc053
SW
61static void __wake_requests(struct ceph_mds_client *mdsc,
62 struct list_head *head);
e3ec8d68 63static void ceph_cap_release_work(struct work_struct *work);
37c4efc1 64static void ceph_cap_reclaim_work(struct work_struct *work);
2f2dc053 65
9e32789f 66static const struct ceph_connection_operations mds_con_ops;
2f2dc053
SW
67
68
69/*
70 * mds reply parsing
71 */
72
b37fe1f9
YZ
73static int parse_reply_info_quota(void **p, void *end,
74 struct ceph_mds_reply_info_in *info)
75{
76 u8 struct_v, struct_compat;
77 u32 struct_len;
78
79 ceph_decode_8_safe(p, end, struct_v, bad);
80 ceph_decode_8_safe(p, end, struct_compat, bad);
81 /* struct_v is expected to be >= 1. we only
82 * understand encoding with struct_compat == 1. */
83 if (!struct_v || struct_compat != 1)
84 goto bad;
85 ceph_decode_32_safe(p, end, struct_len, bad);
86 ceph_decode_need(p, end, struct_len, bad);
87 end = *p + struct_len;
88 ceph_decode_64_safe(p, end, info->max_bytes, bad);
89 ceph_decode_64_safe(p, end, info->max_files, bad);
90 *p = end;
91 return 0;
92bad:
93 return -EIO;
94}
95
2f2dc053
SW
96/*
97 * parse individual inode info
98 */
99static int parse_reply_info_in(void **p, void *end,
14303d20 100 struct ceph_mds_reply_info_in *info,
12b4629a 101 u64 features)
2f2dc053 102{
b37fe1f9
YZ
103 int err = 0;
104 u8 struct_v = 0;
2f2dc053 105
b37fe1f9
YZ
106 if (features == (u64)-1) {
107 u32 struct_len;
108 u8 struct_compat;
109 ceph_decode_8_safe(p, end, struct_v, bad);
110 ceph_decode_8_safe(p, end, struct_compat, bad);
111 /* struct_v is expected to be >= 1. we only understand
112 * encoding with struct_compat == 1. */
113 if (!struct_v || struct_compat != 1)
114 goto bad;
115 ceph_decode_32_safe(p, end, struct_len, bad);
116 ceph_decode_need(p, end, struct_len, bad);
117 end = *p + struct_len;
118 }
119
120 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
2f2dc053
SW
121 info->in = *p;
122 *p += sizeof(struct ceph_mds_reply_inode) +
123 sizeof(*info->in->fragtree.splits) *
124 le32_to_cpu(info->in->fragtree.nsplits);
125
126 ceph_decode_32_safe(p, end, info->symlink_len, bad);
127 ceph_decode_need(p, end, info->symlink_len, bad);
128 info->symlink = *p;
129 *p += info->symlink_len;
130
23c625ce
ID
131 ceph_decode_copy_safe(p, end, &info->dir_layout,
132 sizeof(info->dir_layout), bad);
2f2dc053
SW
133 ceph_decode_32_safe(p, end, info->xattr_len, bad);
134 ceph_decode_need(p, end, info->xattr_len, bad);
135 info->xattr_data = *p;
136 *p += info->xattr_len;
fb01d1f8 137
b37fe1f9
YZ
138 if (features == (u64)-1) {
139 /* inline data */
fb01d1f8
YZ
140 ceph_decode_64_safe(p, end, info->inline_version, bad);
141 ceph_decode_32_safe(p, end, info->inline_len, bad);
142 ceph_decode_need(p, end, info->inline_len, bad);
143 info->inline_data = *p;
144 *p += info->inline_len;
b37fe1f9
YZ
145 /* quota */
146 err = parse_reply_info_quota(p, end, info);
147 if (err < 0)
148 goto out_bad;
149 /* pool namespace */
150 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
151 if (info->pool_ns_len > 0) {
152 ceph_decode_need(p, end, info->pool_ns_len, bad);
153 info->pool_ns_data = *p;
154 *p += info->pool_ns_len;
155 }
245ce991
JL
156
157 /* btime */
158 ceph_decode_need(p, end, sizeof(info->btime), bad);
159 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
160
161 /* change attribute */
a35ead31 162 ceph_decode_64_safe(p, end, info->change_attr, bad);
fb01d1f8 163
08796873
YZ
164 /* dir pin */
165 if (struct_v >= 2) {
166 ceph_decode_32_safe(p, end, info->dir_pin, bad);
167 } else {
168 info->dir_pin = -ENODATA;
169 }
170
193e7b37
DD
171 /* snapshot birth time, remains zero for v<=2 */
172 if (struct_v >= 3) {
173 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
174 ceph_decode_copy(p, &info->snap_btime,
175 sizeof(info->snap_btime));
176 } else {
177 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
178 }
179
e7f72952
YC
180 /* snapshot count, remains zero for v<=3 */
181 if (struct_v >= 4) {
182 ceph_decode_64_safe(p, end, info->rsnaps, bad);
183 } else {
184 info->rsnaps = 0;
185 }
186
b37fe1f9
YZ
187 *p = end;
188 } else {
189 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
190 ceph_decode_64_safe(p, end, info->inline_version, bad);
191 ceph_decode_32_safe(p, end, info->inline_len, bad);
192 ceph_decode_need(p, end, info->inline_len, bad);
193 info->inline_data = *p;
194 *p += info->inline_len;
195 } else
196 info->inline_version = CEPH_INLINE_NONE;
197
198 if (features & CEPH_FEATURE_MDS_QUOTA) {
199 err = parse_reply_info_quota(p, end, info);
200 if (err < 0)
201 goto out_bad;
202 } else {
203 info->max_bytes = 0;
204 info->max_files = 0;
205 }
206
207 info->pool_ns_len = 0;
208 info->pool_ns_data = NULL;
209 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
210 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
211 if (info->pool_ns_len > 0) {
212 ceph_decode_need(p, end, info->pool_ns_len, bad);
213 info->pool_ns_data = *p;
214 *p += info->pool_ns_len;
215 }
216 }
08796873 217
245ce991
JL
218 if (features & CEPH_FEATURE_FS_BTIME) {
219 ceph_decode_need(p, end, sizeof(info->btime), bad);
220 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
a35ead31 221 ceph_decode_64_safe(p, end, info->change_attr, bad);
245ce991
JL
222 }
223
08796873 224 info->dir_pin = -ENODATA;
e7f72952 225 /* info->snap_btime and info->rsnaps remain zero */
b37fe1f9
YZ
226 }
227 return 0;
228bad:
229 err = -EIO;
230out_bad:
231 return err;
232}
233
234static int parse_reply_info_dir(void **p, void *end,
235 struct ceph_mds_reply_dirfrag **dirfrag,
236 u64 features)
237{
238 if (features == (u64)-1) {
fb18a575
LH
239 u8 struct_v, struct_compat;
240 u32 struct_len;
fb18a575
LH
241 ceph_decode_8_safe(p, end, struct_v, bad);
242 ceph_decode_8_safe(p, end, struct_compat, bad);
b37fe1f9
YZ
243 /* struct_v is expected to be >= 1. we only understand
244 * encoding whose struct_compat == 1. */
245 if (!struct_v || struct_compat != 1)
fb18a575
LH
246 goto bad;
247 ceph_decode_32_safe(p, end, struct_len, bad);
248 ceph_decode_need(p, end, struct_len, bad);
b37fe1f9 249 end = *p + struct_len;
fb18a575
LH
250 }
251
b37fe1f9
YZ
252 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
253 *dirfrag = *p;
254 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
255 if (unlikely(*p > end))
256 goto bad;
257 if (features == (u64)-1)
258 *p = end;
259 return 0;
260bad:
261 return -EIO;
262}
263
264static int parse_reply_info_lease(void **p, void *end,
265 struct ceph_mds_reply_lease **lease,
266 u64 features)
267{
268 if (features == (u64)-1) {
269 u8 struct_v, struct_compat;
270 u32 struct_len;
271 ceph_decode_8_safe(p, end, struct_v, bad);
272 ceph_decode_8_safe(p, end, struct_compat, bad);
273 /* struct_v is expected to be >= 1. we only understand
274 * encoding whose struct_compat == 1. */
275 if (!struct_v || struct_compat != 1)
276 goto bad;
277 ceph_decode_32_safe(p, end, struct_len, bad);
278 ceph_decode_need(p, end, struct_len, bad);
279 end = *p + struct_len;
5ea5c5e0
YZ
280 }
281
b37fe1f9
YZ
282 ceph_decode_need(p, end, sizeof(**lease), bad);
283 *lease = *p;
284 *p += sizeof(**lease);
285 if (features == (u64)-1)
286 *p = end;
2f2dc053
SW
287 return 0;
288bad:
b37fe1f9 289 return -EIO;
2f2dc053
SW
290}
291
292/*
293 * parse a normal reply, which may contain a (dir+)dentry and/or a
294 * target inode.
295 */
296static int parse_reply_info_trace(void **p, void *end,
14303d20 297 struct ceph_mds_reply_info_parsed *info,
12b4629a 298 u64 features)
2f2dc053
SW
299{
300 int err;
301
302 if (info->head->is_dentry) {
14303d20 303 err = parse_reply_info_in(p, end, &info->diri, features);
2f2dc053
SW
304 if (err < 0)
305 goto out_bad;
306
b37fe1f9
YZ
307 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
308 if (err < 0)
309 goto out_bad;
2f2dc053
SW
310
311 ceph_decode_32_safe(p, end, info->dname_len, bad);
312 ceph_decode_need(p, end, info->dname_len, bad);
313 info->dname = *p;
314 *p += info->dname_len;
b37fe1f9
YZ
315
316 err = parse_reply_info_lease(p, end, &info->dlease, features);
317 if (err < 0)
318 goto out_bad;
2f2dc053
SW
319 }
320
321 if (info->head->is_target) {
14303d20 322 err = parse_reply_info_in(p, end, &info->targeti, features);
2f2dc053
SW
323 if (err < 0)
324 goto out_bad;
325 }
326
327 if (unlikely(*p != end))
328 goto bad;
329 return 0;
330
331bad:
332 err = -EIO;
333out_bad:
334 pr_err("problem parsing mds trace %d\n", err);
335 return err;
336}
337
338/*
339 * parse readdir results
340 */
b37fe1f9 341static int parse_reply_info_readdir(void **p, void *end,
14303d20 342 struct ceph_mds_reply_info_parsed *info,
12b4629a 343 u64 features)
2f2dc053
SW
344{
345 u32 num, i = 0;
346 int err;
347
b37fe1f9
YZ
348 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
349 if (err < 0)
350 goto out_bad;
2f2dc053
SW
351
352 ceph_decode_need(p, end, sizeof(num) + 2, bad);
c89136ea 353 num = ceph_decode_32(p);
956d39d6
YZ
354 {
355 u16 flags = ceph_decode_16(p);
356 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
357 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
f3c4ebe6 358 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
79162547 359 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
956d39d6 360 }
2f2dc053
SW
361 if (num == 0)
362 goto done;
363
2a5beea3
YZ
364 BUG_ON(!info->dir_entries);
365 if ((unsigned long)(info->dir_entries + num) >
366 (unsigned long)info->dir_entries + info->dir_buf_size) {
54008399
YZ
367 pr_err("dir contents are larger than expected\n");
368 WARN_ON(1);
369 goto bad;
370 }
2f2dc053 371
54008399 372 info->dir_nr = num;
2f2dc053 373 while (num) {
2a5beea3 374 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
2f2dc053 375 /* dentry */
b37fe1f9 376 ceph_decode_32_safe(p, end, rde->name_len, bad);
2a5beea3
YZ
377 ceph_decode_need(p, end, rde->name_len, bad);
378 rde->name = *p;
379 *p += rde->name_len;
380 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
2f2dc053 381
b37fe1f9
YZ
382 /* dentry lease */
383 err = parse_reply_info_lease(p, end, &rde->lease, features);
384 if (err)
385 goto out_bad;
2f2dc053 386 /* inode */
2a5beea3 387 err = parse_reply_info_in(p, end, &rde->inode, features);
2f2dc053
SW
388 if (err < 0)
389 goto out_bad;
8974eebd
YZ
390 /* ceph_readdir_prepopulate() will update it */
391 rde->offset = 0;
2f2dc053
SW
392 i++;
393 num--;
394 }
395
396done:
1d3f8723
JL
397 /* Skip over any unrecognized fields */
398 *p = end;
2f2dc053
SW
399 return 0;
400
401bad:
402 err = -EIO;
403out_bad:
404 pr_err("problem parsing dir contents %d\n", err);
405 return err;
406}
407
25933abd
HS
408/*
409 * parse fcntl F_GETLK results
410 */
411static int parse_reply_info_filelock(void **p, void *end,
14303d20 412 struct ceph_mds_reply_info_parsed *info,
12b4629a 413 u64 features)
25933abd
HS
414{
415 if (*p + sizeof(*info->filelock_reply) > end)
416 goto bad;
417
418 info->filelock_reply = *p;
25933abd 419
1d3f8723
JL
420 /* Skip over any unrecognized fields */
421 *p = end;
25933abd 422 return 0;
25933abd
HS
423bad:
424 return -EIO;
425}
426
d4846487
JL
427
428#if BITS_PER_LONG == 64
429
430#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
431
432static int ceph_parse_deleg_inos(void **p, void *end,
433 struct ceph_mds_session *s)
434{
435 u32 sets;
436
437 ceph_decode_32_safe(p, end, sets, bad);
438 dout("got %u sets of delegated inodes\n", sets);
439 while (sets--) {
2ecd0edd 440 u64 start, len;
d4846487
JL
441
442 ceph_decode_64_safe(p, end, start, bad);
443 ceph_decode_64_safe(p, end, len, bad);
d4f6b31d
JL
444
445 /* Don't accept a delegation of system inodes */
446 if (start < CEPH_INO_SYSTEM_BASE) {
447 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
448 start, len);
449 continue;
450 }
d4846487 451 while (len--) {
2ecd0edd 452 int err = xa_insert(&s->s_delegated_inos, start++,
d4846487
JL
453 DELEGATED_INO_AVAILABLE,
454 GFP_KERNEL);
455 if (!err) {
456 dout("added delegated inode 0x%llx\n",
457 start - 1);
458 } else if (err == -EBUSY) {
4868e537 459 pr_warn("MDS delegated inode 0x%llx more than once.\n",
d4846487
JL
460 start - 1);
461 } else {
462 return err;
463 }
464 }
465 }
466 return 0;
467bad:
468 return -EIO;
469}
470
471u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
472{
473 unsigned long ino;
474 void *val;
475
476 xa_for_each(&s->s_delegated_inos, ino, val) {
477 val = xa_erase(&s->s_delegated_inos, ino);
478 if (val == DELEGATED_INO_AVAILABLE)
479 return ino;
480 }
481 return 0;
482}
483
484int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
485{
486 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
487 GFP_KERNEL);
488}
489#else /* BITS_PER_LONG == 64 */
490/*
491 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
492 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
493 * and bottom words?
494 */
495static int ceph_parse_deleg_inos(void **p, void *end,
496 struct ceph_mds_session *s)
497{
498 u32 sets;
499
500 ceph_decode_32_safe(p, end, sets, bad);
501 if (sets)
502 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
503 return 0;
504bad:
505 return -EIO;
506}
507
508u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
509{
510 return 0;
511}
512
513int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
514{
515 return 0;
516}
517#endif /* BITS_PER_LONG == 64 */
518
6e8575fa
SL
519/*
520 * parse create results
521 */
522static int parse_reply_info_create(void **p, void *end,
523 struct ceph_mds_reply_info_parsed *info,
d4846487 524 u64 features, struct ceph_mds_session *s)
6e8575fa 525{
d4846487
JL
526 int ret;
527
b37fe1f9
YZ
528 if (features == (u64)-1 ||
529 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
6e8575fa 530 if (*p == end) {
d4846487 531 /* Malformed reply? */
6e8575fa 532 info->has_create_ino = false;
d4846487 533 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
6e8575fa 534 info->has_create_ino = true;
06a1ad43
JL
535 /* struct_v, struct_compat, and len */
536 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
d4846487
JL
537 ceph_decode_64_safe(p, end, info->ino, bad);
538 ret = ceph_parse_deleg_inos(p, end, s);
539 if (ret)
540 return ret;
541 } else {
542 /* legacy */
1d3f8723 543 ceph_decode_64_safe(p, end, info->ino, bad);
d4846487 544 info->has_create_ino = true;
6e8575fa 545 }
1d3f8723
JL
546 } else {
547 if (*p != end)
548 goto bad;
6e8575fa
SL
549 }
550
1d3f8723
JL
551 /* Skip over any unrecognized fields */
552 *p = end;
6e8575fa 553 return 0;
6e8575fa
SL
554bad:
555 return -EIO;
556}
557
6ddf5f16
MC
558static int parse_reply_info_getvxattr(void **p, void *end,
559 struct ceph_mds_reply_info_parsed *info,
560 u64 features)
561{
562 u32 value_len;
563
564 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
565 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
566 ceph_decode_skip_32(p, end, bad); /* skip payload length */
567
568 ceph_decode_32_safe(p, end, value_len, bad);
569
570 if (value_len == end - *p) {
571 info->xattr_info.xattr_value = *p;
572 info->xattr_info.xattr_value_len = value_len;
573 *p = end;
574 return value_len;
575 }
576bad:
577 return -EIO;
578}
579
25933abd
HS
580/*
581 * parse extra results
582 */
583static int parse_reply_info_extra(void **p, void *end,
14303d20 584 struct ceph_mds_reply_info_parsed *info,
d4846487 585 u64 features, struct ceph_mds_session *s)
25933abd 586{
6df8c9d8
JL
587 u32 op = le32_to_cpu(info->head->op);
588
589 if (op == CEPH_MDS_OP_GETFILELOCK)
14303d20 590 return parse_reply_info_filelock(p, end, info, features);
6df8c9d8 591 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
b37fe1f9 592 return parse_reply_info_readdir(p, end, info, features);
6df8c9d8 593 else if (op == CEPH_MDS_OP_CREATE)
d4846487 594 return parse_reply_info_create(p, end, info, features, s);
6ddf5f16
MC
595 else if (op == CEPH_MDS_OP_GETVXATTR)
596 return parse_reply_info_getvxattr(p, end, info, features);
6e8575fa
SL
597 else
598 return -EIO;
25933abd
HS
599}
600
2f2dc053
SW
601/*
602 * parse entire mds reply
603 */
d4846487 604static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
14303d20 605 struct ceph_mds_reply_info_parsed *info,
12b4629a 606 u64 features)
2f2dc053
SW
607{
608 void *p, *end;
609 u32 len;
610 int err;
611
612 info->head = msg->front.iov_base;
613 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
614 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
615
616 /* trace */
617 ceph_decode_32_safe(&p, end, len, bad);
618 if (len > 0) {
32852a81 619 ceph_decode_need(&p, end, len, bad);
14303d20 620 err = parse_reply_info_trace(&p, p+len, info, features);
2f2dc053
SW
621 if (err < 0)
622 goto out_bad;
623 }
624
25933abd 625 /* extra */
2f2dc053
SW
626 ceph_decode_32_safe(&p, end, len, bad);
627 if (len > 0) {
32852a81 628 ceph_decode_need(&p, end, len, bad);
d4846487 629 err = parse_reply_info_extra(&p, p+len, info, features, s);
2f2dc053
SW
630 if (err < 0)
631 goto out_bad;
632 }
633
634 /* snap blob */
635 ceph_decode_32_safe(&p, end, len, bad);
636 info->snapblob_len = len;
637 info->snapblob = p;
638 p += len;
639
640 if (p != end)
641 goto bad;
642 return 0;
643
644bad:
645 err = -EIO;
646out_bad:
647 pr_err("mds parse_reply err %d\n", err);
648 return err;
649}
650
651static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
652{
2a5beea3 653 if (!info->dir_entries)
54008399 654 return;
2a5beea3 655 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
2f2dc053
SW
656}
657
4868e537
XL
658/*
659 * In async unlink case the kclient won't wait for the first reply
660 * from MDS and just drop all the links and unhash the dentry and then
661 * succeeds immediately.
662 *
663 * For any new create/link/rename,etc requests followed by using the
664 * same file names we must wait for the first reply of the inflight
665 * unlink request, or the MDS possibly will fail these following
666 * requests with -EEXIST if the inflight async unlink request was
667 * delayed for some reasons.
668 *
669 * And the worst case is that for the none async openc request it will
670 * successfully open the file if the CDentry hasn't been unlinked yet,
671 * but later the previous delayed async unlink request will remove the
672 * CDenty. That means the just created file is possiblly deleted later
673 * by accident.
674 *
675 * We need to wait for the inflight async unlink requests to finish
676 * when creating new files/directories by using the same file names.
677 */
678int ceph_wait_on_conflict_unlink(struct dentry *dentry)
679{
680 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
681 struct dentry *pdentry = dentry->d_parent;
682 struct dentry *udentry, *found = NULL;
683 struct ceph_dentry_info *di;
684 struct qstr dname;
685 u32 hash = dentry->d_name.hash;
686 int err;
687
688 dname.name = dentry->d_name.name;
689 dname.len = dentry->d_name.len;
690
691 rcu_read_lock();
692 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
693 hnode, hash) {
694 udentry = di->dentry;
695
696 spin_lock(&udentry->d_lock);
697 if (udentry->d_name.hash != hash)
698 goto next;
699 if (unlikely(udentry->d_parent != pdentry))
700 goto next;
701 if (!hash_hashed(&di->hnode))
702 goto next;
703
704 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
705 pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
706 __func__, dentry, dentry);
707
708 if (!d_same_name(udentry, pdentry, &dname))
709 goto next;
710
711 spin_unlock(&udentry->d_lock);
712 found = dget(udentry);
713 break;
714next:
715 spin_unlock(&udentry->d_lock);
716 }
717 rcu_read_unlock();
718
719 if (likely(!found))
720 return 0;
721
722 dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
723 dentry, dentry, found, found);
724
725 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
726 TASK_KILLABLE);
727 dput(found);
728 return err;
729}
730
2f2dc053
SW
731
732/*
733 * sessions
734 */
a687ecaf 735const char *ceph_session_state_name(int s)
2f2dc053
SW
736{
737 switch (s) {
738 case CEPH_MDS_SESSION_NEW: return "new";
739 case CEPH_MDS_SESSION_OPENING: return "opening";
740 case CEPH_MDS_SESSION_OPEN: return "open";
741 case CEPH_MDS_SESSION_HUNG: return "hung";
742 case CEPH_MDS_SESSION_CLOSING: return "closing";
4d681c2f 743 case CEPH_MDS_SESSION_CLOSED: return "closed";
44ca18f2 744 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
2f2dc053 745 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
fcff415c 746 case CEPH_MDS_SESSION_REJECTED: return "rejected";
2f2dc053
SW
747 default: return "???";
748 }
749}
750
5b3248c6 751struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
2f2dc053 752{
9f358999 753 if (refcount_inc_not_zero(&s->s_ref))
2f2dc053 754 return s;
9f358999 755 return NULL;
2f2dc053
SW
756}
757
758void ceph_put_mds_session(struct ceph_mds_session *s)
759{
7e65624d
JL
760 if (IS_ERR_OR_NULL(s))
761 return;
762
3997c01d 763 if (refcount_dec_and_test(&s->s_ref)) {
6c4a1915 764 if (s->s_auth.authorizer)
6c1ea260 765 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
88828190 766 WARN_ON(mutex_is_locked(&s->s_mutex));
d4846487 767 xa_destroy(&s->s_delegated_inos);
2f2dc053 768 kfree(s);
4e7a5dcd 769 }
2f2dc053
SW
770}
771
772/*
773 * called under mdsc->mutex
774 */
775struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
776 int mds)
777{
d37b1d99 778 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
2f2dc053 779 return NULL;
5b3248c6 780 return ceph_get_mds_session(mdsc->sessions[mds]);
2f2dc053
SW
781}
782
783static bool __have_session(struct ceph_mds_client *mdsc, int mds)
784{
98cfda81 785 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
2f2dc053 786 return false;
98cfda81
CX
787 else
788 return true;
2f2dc053
SW
789}
790
2600d2dd
SW
791static int __verify_registered_session(struct ceph_mds_client *mdsc,
792 struct ceph_mds_session *s)
793{
794 if (s->s_mds >= mdsc->max_sessions ||
795 mdsc->sessions[s->s_mds] != s)
796 return -ENOENT;
797 return 0;
798}
799
2f2dc053
SW
800/*
801 * create+register a new session for given mds.
802 * called under mdsc->mutex.
803 */
804static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
805 int mds)
806{
807 struct ceph_mds_session *s;
808
a68e564a
XL
809 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
810 return ERR_PTR(-EIO);
811
b38c9eb4 812 if (mds >= mdsc->mdsmap->possible_max_rank)
c338c07c
NY
813 return ERR_PTR(-EINVAL);
814
2f2dc053 815 s = kzalloc(sizeof(*s), GFP_NOFS);
4736b009
DC
816 if (!s)
817 return ERR_PTR(-ENOMEM);
47474d0b
CX
818
819 if (mds >= mdsc->max_sessions) {
820 int newmax = 1 << get_count_order(mds + 1);
821 struct ceph_mds_session **sa;
822
823 dout("%s: realloc to %d\n", __func__, newmax);
824 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
825 if (!sa)
826 goto fail_realloc;
827 if (mdsc->sessions) {
828 memcpy(sa, mdsc->sessions,
829 mdsc->max_sessions * sizeof(void *));
830 kfree(mdsc->sessions);
831 }
832 mdsc->sessions = sa;
833 mdsc->max_sessions = newmax;
834 }
835
836 dout("%s: mds%d\n", __func__, mds);
2f2dc053
SW
837 s->s_mdsc = mdsc;
838 s->s_mds = mds;
839 s->s_state = CEPH_MDS_SESSION_NEW;
2f2dc053
SW
840 mutex_init(&s->s_mutex);
841
b7a9e5dd 842 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
2f2dc053 843
52d60f8e 844 atomic_set(&s->s_cap_gen, 1);
1ce208a6 845 s->s_cap_ttl = jiffies - 1;
d8fb02ab
AE
846
847 spin_lock_init(&s->s_cap_lock);
2f2dc053 848 INIT_LIST_HEAD(&s->s_caps);
3997c01d 849 refcount_set(&s->s_ref, 1);
2f2dc053
SW
850 INIT_LIST_HEAD(&s->s_waiting);
851 INIT_LIST_HEAD(&s->s_unsafe);
d4846487 852 xa_init(&s->s_delegated_inos);
2f2dc053 853 INIT_LIST_HEAD(&s->s_cap_releases);
e3ec8d68
YZ
854 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
855
1cf03a68 856 INIT_LIST_HEAD(&s->s_cap_dirty);
2f2dc053 857 INIT_LIST_HEAD(&s->s_cap_flushing);
2f2dc053 858
2f2dc053 859 mdsc->sessions[mds] = s;
86d8f67b 860 atomic_inc(&mdsc->num_sessions);
3997c01d 861 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
42ce56e5 862
b7a9e5dd
SW
863 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
864 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
42ce56e5 865
2f2dc053 866 return s;
42ce56e5
SW
867
868fail_realloc:
869 kfree(s);
870 return ERR_PTR(-ENOMEM);
2f2dc053
SW
871}
872
873/*
874 * called under mdsc->mutex
875 */
2600d2dd 876static void __unregister_session(struct ceph_mds_client *mdsc,
42ce56e5 877 struct ceph_mds_session *s)
2f2dc053 878{
2600d2dd
SW
879 dout("__unregister_session mds%d %p\n", s->s_mds, s);
880 BUG_ON(mdsc->sessions[s->s_mds] != s);
42ce56e5
SW
881 mdsc->sessions[s->s_mds] = NULL;
882 ceph_con_close(&s->s_con);
883 ceph_put_mds_session(s);
86d8f67b 884 atomic_dec(&mdsc->num_sessions);
2f2dc053
SW
885}
886
887/*
888 * drop session refs in request.
889 *
890 * should be last request ref, or hold mdsc->mutex
891 */
892static void put_request_session(struct ceph_mds_request *req)
893{
894 if (req->r_session) {
895 ceph_put_mds_session(req->r_session);
896 req->r_session = NULL;
897 }
898}
899
59b312f3
XL
900void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
901 void (*cb)(struct ceph_mds_session *),
902 bool check_state)
903{
904 int mds;
905
906 mutex_lock(&mdsc->mutex);
907 for (mds = 0; mds < mdsc->max_sessions; ++mds) {
908 struct ceph_mds_session *s;
909
910 s = __ceph_lookup_mds_session(mdsc, mds);
911 if (!s)
912 continue;
913
914 if (check_state && !check_session_state(s)) {
915 ceph_put_mds_session(s);
916 continue;
917 }
918
919 mutex_unlock(&mdsc->mutex);
920 cb(s);
921 ceph_put_mds_session(s);
922 mutex_lock(&mdsc->mutex);
923 }
924 mutex_unlock(&mdsc->mutex);
925}
926
153c8e6b 927void ceph_mdsc_release_request(struct kref *kref)
2f2dc053 928{
153c8e6b
SW
929 struct ceph_mds_request *req = container_of(kref,
930 struct ceph_mds_request,
931 r_kref);
e64f44a8 932 ceph_mdsc_release_dir_caps_no_check(req);
54008399 933 destroy_reply_info(&req->r_reply_info);
153c8e6b
SW
934 if (req->r_request)
935 ceph_msg_put(req->r_request);
54008399 936 if (req->r_reply)
153c8e6b 937 ceph_msg_put(req->r_reply);
153c8e6b 938 if (req->r_inode) {
41b02e1f 939 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
23c2c76e 940 iput(req->r_inode);
153c8e6b 941 }
9c1c2b35 942 if (req->r_parent) {
3dd69aab 943 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
23c2c76e 944 iput(req->r_parent);
9c1c2b35 945 }
23c2c76e 946 iput(req->r_target_inode);
153c8e6b
SW
947 if (req->r_dentry)
948 dput(req->r_dentry);
844d87c3
SW
949 if (req->r_old_dentry)
950 dput(req->r_old_dentry);
951 if (req->r_old_dentry_dir) {
41b02e1f
SW
952 /*
953 * track (and drop pins for) r_old_dentry_dir
954 * separately, since r_old_dentry's d_parent may have
955 * changed between the dir mutex being dropped and
956 * this request being freed.
957 */
958 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
959 CEPH_CAP_PIN);
23c2c76e 960 iput(req->r_old_dentry_dir);
2f2dc053 961 }
153c8e6b
SW
962 kfree(req->r_path1);
963 kfree(req->r_path2);
7fe0cdeb 964 put_cred(req->r_cred);
25e6bae3
YZ
965 if (req->r_pagelist)
966 ceph_pagelist_release(req->r_pagelist);
153c8e6b 967 put_request_session(req);
37151668 968 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
428138c9 969 WARN_ON_ONCE(!list_empty(&req->r_wait));
058daab7 970 kmem_cache_free(ceph_mds_request_cachep, req);
2f2dc053
SW
971}
972
fcd00b68
ID
973DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
974
2f2dc053
SW
975/*
976 * lookup session, bump ref if found.
977 *
978 * called under mdsc->mutex.
979 */
fcd00b68
ID
980static struct ceph_mds_request *
981lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
2f2dc053
SW
982{
983 struct ceph_mds_request *req;
44ca18f2 984
fcd00b68
ID
985 req = lookup_request(&mdsc->request_tree, tid);
986 if (req)
987 ceph_mdsc_get_request(req);
44ca18f2 988
fcd00b68 989 return req;
2f2dc053
SW
990}
991
992/*
993 * Register an in-flight request, and assign a tid. Link to directory
994 * are modifying (if any).
995 *
996 * Called under mdsc->mutex.
997 */
998static void __register_request(struct ceph_mds_client *mdsc,
999 struct ceph_mds_request *req,
1000 struct inode *dir)
1001{
e30ee581
ZZ
1002 int ret = 0;
1003
2f2dc053 1004 req->r_tid = ++mdsc->last_tid;
e30ee581
ZZ
1005 if (req->r_num_caps) {
1006 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1007 req->r_num_caps);
1008 if (ret < 0) {
1009 pr_err("__register_request %p "
1010 "failed to reserve caps: %d\n", req, ret);
1011 /* set req->r_err to fail early from __do_request */
1012 req->r_err = ret;
1013 return;
1014 }
1015 }
2f2dc053
SW
1016 dout("__register_request %p tid %lld\n", req, req->r_tid);
1017 ceph_mdsc_get_request(req);
fcd00b68 1018 insert_request(&mdsc->request_tree, req);
2f2dc053 1019
7fe0cdeb 1020 req->r_cred = get_current_cred();
cb4276cc 1021
e8a7b8b1
YZ
1022 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1023 mdsc->oldest_tid = req->r_tid;
1024
2f2dc053 1025 if (dir) {
3db0a2fc
JL
1026 struct ceph_inode_info *ci = ceph_inode(dir);
1027
3b663780 1028 ihold(dir);
2f2dc053 1029 req->r_unsafe_dir = dir;
3db0a2fc
JL
1030 spin_lock(&ci->i_unsafe_lock);
1031 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1032 spin_unlock(&ci->i_unsafe_lock);
2f2dc053
SW
1033 }
1034}
1035
1036static void __unregister_request(struct ceph_mds_client *mdsc,
1037 struct ceph_mds_request *req)
1038{
1039 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
e8a7b8b1 1040
df963ea8
JL
1041 /* Never leave an unregistered request on an unsafe list! */
1042 list_del_init(&req->r_unsafe_item);
1043
e8a7b8b1
YZ
1044 if (req->r_tid == mdsc->oldest_tid) {
1045 struct rb_node *p = rb_next(&req->r_node);
1046 mdsc->oldest_tid = 0;
1047 while (p) {
1048 struct ceph_mds_request *next_req =
1049 rb_entry(p, struct ceph_mds_request, r_node);
1050 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1051 mdsc->oldest_tid = next_req->r_tid;
1052 break;
1053 }
1054 p = rb_next(p);
1055 }
1056 }
1057
fcd00b68 1058 erase_request(&mdsc->request_tree, req);
2f2dc053 1059
3db0a2fc 1060 if (req->r_unsafe_dir) {
2f2dc053 1061 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
2f2dc053
SW
1062 spin_lock(&ci->i_unsafe_lock);
1063 list_del_init(&req->r_unsafe_dir_item);
1064 spin_unlock(&ci->i_unsafe_lock);
4c06ace8 1065 }
bc2de10d
JL
1066 if (req->r_target_inode &&
1067 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
68cd5b4b
YZ
1068 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1069 spin_lock(&ci->i_unsafe_lock);
1070 list_del_init(&req->r_unsafe_target_item);
1071 spin_unlock(&ci->i_unsafe_lock);
1072 }
3b663780 1073
4c06ace8 1074 if (req->r_unsafe_dir) {
23c2c76e 1075 iput(req->r_unsafe_dir);
3b663780 1076 req->r_unsafe_dir = NULL;
2f2dc053 1077 }
94aa8ae1 1078
fc55d2c9
YZ
1079 complete_all(&req->r_safe_completion);
1080
94aa8ae1 1081 ceph_mdsc_put_request(req);
2f2dc053
SW
1082}
1083
30c71233
JL
1084/*
1085 * Walk back up the dentry tree until we hit a dentry representing a
1086 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1087 * when calling this) to ensure that the objects won't disappear while we're
1088 * working with them. Once we hit a candidate dentry, we attempt to take a
1089 * reference to it, and return that as the result.
1090 */
f1075480
DC
1091static struct inode *get_nonsnap_parent(struct dentry *dentry)
1092{
1093 struct inode *inode = NULL;
30c71233
JL
1094
1095 while (dentry && !IS_ROOT(dentry)) {
1096 inode = d_inode_rcu(dentry);
1097 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1098 break;
1099 dentry = dentry->d_parent;
1100 }
1101 if (inode)
1102 inode = igrab(inode);
1103 return inode;
1104}
1105
2f2dc053
SW
1106/*
1107 * Choose mds to send request to next. If there is a hint set in the
1108 * request (e.g., due to a prior forward hint from the mds), use that.
1109 * Otherwise, consult frag tree and/or caps to identify the
1110 * appropriate mds. If all else fails, choose randomly.
1111 *
1112 * Called under mdsc->mutex.
1113 */
1114static int __choose_mds(struct ceph_mds_client *mdsc,
c4853e97
XL
1115 struct ceph_mds_request *req,
1116 bool *random)
2f2dc053
SW
1117{
1118 struct inode *inode;
1119 struct ceph_inode_info *ci;
1120 struct ceph_cap *cap;
1121 int mode = req->r_direct_mode;
1122 int mds = -1;
1123 u32 hash = req->r_direct_hash;
bc2de10d 1124 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
2f2dc053 1125
c4853e97
XL
1126 if (random)
1127 *random = false;
1128
2f2dc053
SW
1129 /*
1130 * is there a specific mds we should try? ignore hint if we have
1131 * no session and the mds is not up (active or recovering).
1132 */
1133 if (req->r_resend_mds >= 0 &&
1134 (__have_session(mdsc, req->r_resend_mds) ||
1135 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
3c802092 1136 dout("%s using resend_mds mds%d\n", __func__,
2f2dc053
SW
1137 req->r_resend_mds);
1138 return req->r_resend_mds;
1139 }
1140
1141 if (mode == USE_RANDOM_MDS)
1142 goto random;
1143
1144 inode = NULL;
1145 if (req->r_inode) {
5d37ca14
YZ
1146 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1147 inode = req->r_inode;
1148 ihold(inode);
1149 } else {
38f340cc
YZ
1150 /* req->r_dentry is non-null for LSSNAP request */
1151 rcu_read_lock();
1152 inode = get_nonsnap_parent(req->r_dentry);
1153 rcu_read_unlock();
3c802092 1154 dout("%s using snapdir's parent %p\n", __func__, inode);
5d37ca14 1155 }
38f340cc 1156 } else if (req->r_dentry) {
d79698da 1157 /* ignore race with rename; old or new d_parent is okay */
30c71233
JL
1158 struct dentry *parent;
1159 struct inode *dir;
1160
1161 rcu_read_lock();
41883ba8 1162 parent = READ_ONCE(req->r_dentry->d_parent);
3dd69aab 1163 dir = req->r_parent ? : d_inode_rcu(parent);
eb6bb1c5 1164
30c71233
JL
1165 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1166 /* not this fs or parent went negative */
2b0143b5 1167 inode = d_inode(req->r_dentry);
30c71233
JL
1168 if (inode)
1169 ihold(inode);
eb6bb1c5
SW
1170 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1171 /* direct snapped/virtual snapdir requests
1172 * based on parent dir inode */
30c71233 1173 inode = get_nonsnap_parent(parent);
3c802092 1174 dout("%s using nonsnap parent %p\n", __func__, inode);
ca18bede 1175 } else {
eb6bb1c5 1176 /* dentry target */
2b0143b5 1177 inode = d_inode(req->r_dentry);
ca18bede
YZ
1178 if (!inode || mode == USE_AUTH_MDS) {
1179 /* dir + name */
30c71233 1180 inode = igrab(dir);
ca18bede
YZ
1181 hash = ceph_dentry_hash(dir, req->r_dentry);
1182 is_hash = true;
30c71233
JL
1183 } else {
1184 ihold(inode);
ca18bede 1185 }
2f2dc053 1186 }
30c71233 1187 rcu_read_unlock();
2f2dc053 1188 }
eb6bb1c5 1189
3c802092
XL
1190 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1191 hash, mode);
2f2dc053
SW
1192 if (!inode)
1193 goto random;
1194 ci = ceph_inode(inode);
1195
1196 if (is_hash && S_ISDIR(inode->i_mode)) {
1197 struct ceph_inode_frag frag;
1198 int found;
1199
1200 ceph_choose_frag(ci, hash, &frag, &found);
1201 if (found) {
1202 if (mode == USE_ANY_MDS && frag.ndist > 0) {
1203 u8 r;
1204
1205 /* choose a random replica */
1206 get_random_bytes(&r, 1);
1207 r %= frag.ndist;
1208 mds = frag.dist[r];
3c802092
XL
1209 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1210 __func__, inode, ceph_vinop(inode),
1211 frag.frag, mds, (int)r, frag.ndist);
d66bbd44 1212 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
5d47648f
XL
1213 CEPH_MDS_STATE_ACTIVE &&
1214 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
30c71233 1215 goto out;
2f2dc053
SW
1216 }
1217
1218 /* since this file/dir wasn't known to be
1219 * replicated, then we want to look for the
1220 * authoritative mds. */
2f2dc053
SW
1221 if (frag.mds >= 0) {
1222 /* choose auth mds */
1223 mds = frag.mds;
3c802092
XL
1224 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1225 __func__, inode, ceph_vinop(inode),
1226 frag.frag, mds);
d66bbd44 1227 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
5d47648f 1228 CEPH_MDS_STATE_ACTIVE) {
224c7b67 1229 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
5d47648f
XL
1230 mds))
1231 goto out;
1232 }
2f2dc053 1233 }
5d47648f 1234 mode = USE_AUTH_MDS;
2f2dc053
SW
1235 }
1236 }
1237
be655596 1238 spin_lock(&ci->i_ceph_lock);
2f2dc053
SW
1239 cap = NULL;
1240 if (mode == USE_AUTH_MDS)
1241 cap = ci->i_auth_cap;
1242 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1243 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1244 if (!cap) {
be655596 1245 spin_unlock(&ci->i_ceph_lock);
23c2c76e 1246 iput(inode);
2f2dc053
SW
1247 goto random;
1248 }
1249 mds = cap->session->s_mds;
3c802092 1250 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
2f2dc053
SW
1251 inode, ceph_vinop(inode), mds,
1252 cap == ci->i_auth_cap ? "auth " : "", cap);
be655596 1253 spin_unlock(&ci->i_ceph_lock);
30c71233 1254out:
23c2c76e 1255 iput(inode);
2f2dc053
SW
1256 return mds;
1257
1258random:
c4853e97
XL
1259 if (random)
1260 *random = true;
1261
2f2dc053 1262 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
3c802092 1263 dout("%s chose random mds%d\n", __func__, mds);
2f2dc053
SW
1264 return mds;
1265}
1266
1267
1268/*
1269 * session messages
1270 */
fba97e80 1271struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
2f2dc053
SW
1272{
1273 struct ceph_msg *msg;
1274 struct ceph_mds_session_head *h;
1275
b61c2763
SW
1276 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1277 false);
a79832f2 1278 if (!msg) {
fba97e80
XL
1279 pr_err("ENOMEM creating session %s msg\n",
1280 ceph_session_op_name(op));
a79832f2 1281 return NULL;
2f2dc053
SW
1282 }
1283 h = msg->front.iov_base;
1284 h->op = cpu_to_le32(op);
1285 h->seq = cpu_to_le64(seq);
dbd0c8bf
JS
1286
1287 return msg;
1288}
1289
9ba1e224
XL
1290static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1291#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
b682c6d4 1292static int encode_supported_features(void **p, void *end)
342ce182 1293{
9ba1e224 1294 static const size_t count = ARRAY_SIZE(feature_bits);
342ce182
YZ
1295
1296 if (count > 0) {
1297 size_t i;
9ba1e224 1298 size_t size = FEATURE_BYTES(count);
fea013e0 1299 unsigned long bit;
342ce182 1300
b682c6d4
XL
1301 if (WARN_ON_ONCE(*p + 4 + size > end))
1302 return -ERANGE;
1303
342ce182
YZ
1304 ceph_encode_32(p, size);
1305 memset(*p, 0, size);
fea013e0
LH
1306 for (i = 0; i < count; i++) {
1307 bit = feature_bits[i];
1308 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1309 }
342ce182
YZ
1310 *p += size;
1311 } else {
b682c6d4
XL
1312 if (WARN_ON_ONCE(*p + 4 > end))
1313 return -ERANGE;
1314
342ce182
YZ
1315 ceph_encode_32(p, 0);
1316 }
b682c6d4
XL
1317
1318 return 0;
342ce182
YZ
1319}
1320
3b4168dd
XL
1321static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1322#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1323static int encode_metric_spec(void **p, void *end)
1324{
1325 static const size_t count = ARRAY_SIZE(metric_bits);
1326
1327 /* header */
1328 if (WARN_ON_ONCE(*p + 2 > end))
1329 return -ERANGE;
1330
1331 ceph_encode_8(p, 1); /* version */
1332 ceph_encode_8(p, 1); /* compat */
1333
1334 if (count > 0) {
1335 size_t i;
1336 size_t size = METRIC_BYTES(count);
1337
1338 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1339 return -ERANGE;
1340
1341 /* metric spec info length */
1342 ceph_encode_32(p, 4 + size);
1343
1344 /* metric spec */
1345 ceph_encode_32(p, size);
1346 memset(*p, 0, size);
1347 for (i = 0; i < count; i++)
1348 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1349 *p += size;
1350 } else {
1351 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1352 return -ERANGE;
1353
1354 /* metric spec info length */
1355 ceph_encode_32(p, 4);
1356 /* metric spec */
1357 ceph_encode_32(p, 0);
1358 }
1359
1360 return 0;
1361}
1362
dbd0c8bf
JS
1363/*
1364 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1365 * to include additional client metadata fields.
1366 */
1367static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1368{
1369 struct ceph_msg *msg;
1370 struct ceph_mds_session_head *h;
4a756db2 1371 int i;
342ce182 1372 int extra_bytes = 0;
dbd0c8bf
JS
1373 int metadata_key_count = 0;
1374 struct ceph_options *opt = mdsc->fsc->client->options;
3f384954 1375 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
9ba1e224 1376 size_t size, count;
342ce182 1377 void *p, *end;
b682c6d4 1378 int ret;
dbd0c8bf 1379
a6a5ce4f 1380 const char* metadata[][2] = {
717e6f28
YZ
1381 {"hostname", mdsc->nodename},
1382 {"kernel_version", init_utsname()->release},
3f384954
YZ
1383 {"entity_id", opt->name ? : ""},
1384 {"root", fsopt->server_path ? : "/"},
dbd0c8bf
JS
1385 {NULL, NULL}
1386 };
1387
1388 /* Calculate serialized length of metadata */
342ce182 1389 extra_bytes = 4; /* map length */
d37b1d99 1390 for (i = 0; metadata[i][0]; ++i) {
342ce182 1391 extra_bytes += 8 + strlen(metadata[i][0]) +
dbd0c8bf
JS
1392 strlen(metadata[i][1]);
1393 metadata_key_count++;
1394 }
9ba1e224 1395
342ce182 1396 /* supported feature */
9ba1e224
XL
1397 size = 0;
1398 count = ARRAY_SIZE(feature_bits);
1399 if (count > 0)
1400 size = FEATURE_BYTES(count);
1401 extra_bytes += 4 + size;
dbd0c8bf 1402
3b4168dd
XL
1403 /* metric spec */
1404 size = 0;
1405 count = ARRAY_SIZE(metric_bits);
1406 if (count > 0)
1407 size = METRIC_BYTES(count);
1408 extra_bytes += 2 + 4 + 4 + size;
1409
dbd0c8bf 1410 /* Allocate the message */
342ce182 1411 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
dbd0c8bf
JS
1412 GFP_NOFS, false);
1413 if (!msg) {
fba97e80 1414 pr_err("ENOMEM creating session open msg\n");
b682c6d4 1415 return ERR_PTR(-ENOMEM);
dbd0c8bf 1416 }
342ce182
YZ
1417 p = msg->front.iov_base;
1418 end = p + msg->front.iov_len;
1419
1420 h = p;
dbd0c8bf
JS
1421 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1422 h->seq = cpu_to_le64(seq);
1423
1424 /*
1425 * Serialize client metadata into waiting buffer space, using
1426 * the format that userspace expects for map<string, string>
7cfa0313 1427 *
3b4168dd 1428 * ClientSession messages with metadata are v4
dbd0c8bf 1429 */
3b4168dd 1430 msg->hdr.version = cpu_to_le16(4);
7cfa0313 1431 msg->hdr.compat_version = cpu_to_le16(1);
dbd0c8bf
JS
1432
1433 /* The write pointer, following the session_head structure */
342ce182 1434 p += sizeof(*h);
dbd0c8bf
JS
1435
1436 /* Number of entries in the map */
1437 ceph_encode_32(&p, metadata_key_count);
1438
1439 /* Two length-prefixed strings for each entry in the map */
d37b1d99 1440 for (i = 0; metadata[i][0]; ++i) {
dbd0c8bf
JS
1441 size_t const key_len = strlen(metadata[i][0]);
1442 size_t const val_len = strlen(metadata[i][1]);
1443
1444 ceph_encode_32(&p, key_len);
1445 memcpy(p, metadata[i][0], key_len);
1446 p += key_len;
1447 ceph_encode_32(&p, val_len);
1448 memcpy(p, metadata[i][1], val_len);
1449 p += val_len;
1450 }
1451
b682c6d4
XL
1452 ret = encode_supported_features(&p, end);
1453 if (ret) {
1454 pr_err("encode_supported_features failed!\n");
1455 ceph_msg_put(msg);
1456 return ERR_PTR(ret);
1457 }
1458
3b4168dd
XL
1459 ret = encode_metric_spec(&p, end);
1460 if (ret) {
1461 pr_err("encode_metric_spec failed!\n");
1462 ceph_msg_put(msg);
1463 return ERR_PTR(ret);
1464 }
1465
342ce182
YZ
1466 msg->front.iov_len = p - msg->front.iov_base;
1467 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1468
2f2dc053
SW
1469 return msg;
1470}
1471
1472/*
1473 * send session open request.
1474 *
1475 * called under mdsc->mutex
1476 */
1477static int __open_session(struct ceph_mds_client *mdsc,
1478 struct ceph_mds_session *session)
1479{
1480 struct ceph_msg *msg;
1481 int mstate;
1482 int mds = session->s_mds;
2f2dc053 1483
a68e564a
XL
1484 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1485 return -EIO;
1486
2f2dc053
SW
1487 /* wait for mds to go active? */
1488 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1489 dout("open_session to mds%d (%s)\n", mds,
1490 ceph_mds_state_name(mstate));
1491 session->s_state = CEPH_MDS_SESSION_OPENING;
1492 session->s_renew_requested = jiffies;
1493
1494 /* send connect message */
dbd0c8bf 1495 msg = create_session_open_msg(mdsc, session->s_seq);
b682c6d4
XL
1496 if (IS_ERR(msg))
1497 return PTR_ERR(msg);
2f2dc053 1498 ceph_con_send(&session->s_con, msg);
2f2dc053
SW
1499 return 0;
1500}
1501
ed0552a1
SW
1502/*
1503 * open sessions for any export targets for the given mds
1504 *
1505 * called under mdsc->mutex
1506 */
5d72d13c
YZ
1507static struct ceph_mds_session *
1508__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1509{
1510 struct ceph_mds_session *session;
b682c6d4 1511 int ret;
5d72d13c
YZ
1512
1513 session = __ceph_lookup_mds_session(mdsc, target);
1514 if (!session) {
1515 session = register_session(mdsc, target);
1516 if (IS_ERR(session))
1517 return session;
1518 }
1519 if (session->s_state == CEPH_MDS_SESSION_NEW ||
b682c6d4
XL
1520 session->s_state == CEPH_MDS_SESSION_CLOSING) {
1521 ret = __open_session(mdsc, session);
1522 if (ret)
1523 return ERR_PTR(ret);
1524 }
5d72d13c
YZ
1525
1526 return session;
1527}
1528
1529struct ceph_mds_session *
1530ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1531{
1532 struct ceph_mds_session *session;
1533
1534 dout("open_export_target_session to mds%d\n", target);
1535
1536 mutex_lock(&mdsc->mutex);
1537 session = __open_export_target_session(mdsc, target);
1538 mutex_unlock(&mdsc->mutex);
1539
1540 return session;
1541}
1542
ed0552a1
SW
1543static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1544 struct ceph_mds_session *session)
1545{
1546 struct ceph_mds_info *mi;
1547 struct ceph_mds_session *ts;
1548 int i, mds = session->s_mds;
ed0552a1 1549
b38c9eb4 1550 if (mds >= mdsc->mdsmap->possible_max_rank)
ed0552a1 1551 return;
5d72d13c 1552
ed0552a1
SW
1553 mi = &mdsc->mdsmap->m_info[mds];
1554 dout("open_export_target_sessions for mds%d (%d targets)\n",
1555 session->s_mds, mi->num_export_targets);
1556
1557 for (i = 0; i < mi->num_export_targets; i++) {
5d72d13c 1558 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
7e65624d 1559 ceph_put_mds_session(ts);
ed0552a1
SW
1560 }
1561}
1562
154f42c2
SW
1563void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1564 struct ceph_mds_session *session)
1565{
1566 mutex_lock(&mdsc->mutex);
1567 __open_export_target_sessions(mdsc, session);
1568 mutex_unlock(&mdsc->mutex);
1569}
1570
2f2dc053
SW
1571/*
1572 * session caps
1573 */
1574
c8a96a31
JL
1575static void detach_cap_releases(struct ceph_mds_session *session,
1576 struct list_head *target)
2f2dc053 1577{
c8a96a31
JL
1578 lockdep_assert_held(&session->s_cap_lock);
1579
1580 list_splice_init(&session->s_cap_releases, target);
745a8e3b 1581 session->s_num_cap_releases = 0;
c8a96a31
JL
1582 dout("dispose_cap_releases mds%d\n", session->s_mds);
1583}
2f2dc053 1584
c8a96a31
JL
1585static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1586 struct list_head *dispose)
1587{
1588 while (!list_empty(dispose)) {
745a8e3b
YZ
1589 struct ceph_cap *cap;
1590 /* zero out the in-progress message */
c8a96a31 1591 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
745a8e3b
YZ
1592 list_del(&cap->session_caps);
1593 ceph_put_cap(mdsc, cap);
2f2dc053 1594 }
2f2dc053
SW
1595}
1596
1c841a96
YZ
1597static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1598 struct ceph_mds_session *session)
1599{
1600 struct ceph_mds_request *req;
1601 struct rb_node *p;
1602
1603 dout("cleanup_session_requests mds%d\n", session->s_mds);
1604 mutex_lock(&mdsc->mutex);
1605 while (!list_empty(&session->s_unsafe)) {
1606 req = list_first_entry(&session->s_unsafe,
1607 struct ceph_mds_request, r_unsafe_item);
3e0708b9
YZ
1608 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1609 req->r_tid);
1bd85aa6
JL
1610 if (req->r_target_inode)
1611 mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1612 if (req->r_unsafe_dir)
1613 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1c841a96
YZ
1614 __unregister_request(mdsc, req);
1615 }
1616 /* zero r_attempts, so kick_requests() will re-send requests */
1617 p = rb_first(&mdsc->request_tree);
1618 while (p) {
1619 req = rb_entry(p, struct ceph_mds_request, r_node);
1620 p = rb_next(p);
1621 if (req->r_session &&
1622 req->r_session->s_mds == session->s_mds)
1623 req->r_attempts = 0;
1624 }
1625 mutex_unlock(&mdsc->mutex);
1626}
1627
2f2dc053 1628/*
f818a736
SW
1629 * Helper to safely iterate over all caps associated with a session, with
1630 * special care taken to handle a racing __ceph_remove_cap().
2f2dc053 1631 *
f818a736 1632 * Caller must hold session s_mutex.
2f2dc053 1633 */
f5d77269 1634int ceph_iterate_session_caps(struct ceph_mds_session *session,
aaf67de7
XL
1635 int (*cb)(struct inode *, int mds, void *),
1636 void *arg)
2f2dc053 1637{
7c1332b8
SW
1638 struct list_head *p;
1639 struct ceph_cap *cap;
1640 struct inode *inode, *last_inode = NULL;
1641 struct ceph_cap *old_cap = NULL;
2f2dc053
SW
1642 int ret;
1643
1644 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1645 spin_lock(&session->s_cap_lock);
7c1332b8
SW
1646 p = session->s_caps.next;
1647 while (p != &session->s_caps) {
aaf67de7
XL
1648 int mds;
1649
7c1332b8 1650 cap = list_entry(p, struct ceph_cap, session_caps);
874c8ca1 1651 inode = igrab(&cap->ci->netfs.inode);
7c1332b8
SW
1652 if (!inode) {
1653 p = p->next;
2f2dc053 1654 continue;
7c1332b8
SW
1655 }
1656 session->s_cap_iterator = cap;
aaf67de7 1657 mds = cap->mds;
2f2dc053 1658 spin_unlock(&session->s_cap_lock);
7c1332b8
SW
1659
1660 if (last_inode) {
23c2c76e 1661 iput(last_inode);
7c1332b8
SW
1662 last_inode = NULL;
1663 }
1664 if (old_cap) {
37151668 1665 ceph_put_cap(session->s_mdsc, old_cap);
7c1332b8
SW
1666 old_cap = NULL;
1667 }
1668
aaf67de7 1669 ret = cb(inode, mds, arg);
7c1332b8
SW
1670 last_inode = inode;
1671
2f2dc053 1672 spin_lock(&session->s_cap_lock);
7c1332b8 1673 p = p->next;
d37b1d99 1674 if (!cap->ci) {
7c1332b8
SW
1675 dout("iterate_session_caps finishing cap %p removal\n",
1676 cap);
1677 BUG_ON(cap->session != session);
745a8e3b 1678 cap->session = NULL;
7c1332b8
SW
1679 list_del_init(&cap->session_caps);
1680 session->s_nr_caps--;
4f1d756d 1681 atomic64_dec(&session->s_mdsc->metric.total_caps);
e3ec8d68
YZ
1682 if (cap->queue_release)
1683 __ceph_queue_cap_release(session, cap);
1684 else
745a8e3b 1685 old_cap = cap; /* put_cap it w/o locks held */
7c1332b8 1686 }
5dacf091
SW
1687 if (ret < 0)
1688 goto out;
2f2dc053 1689 }
5dacf091
SW
1690 ret = 0;
1691out:
7c1332b8 1692 session->s_cap_iterator = NULL;
2f2dc053 1693 spin_unlock(&session->s_cap_lock);
7c1332b8 1694
23c2c76e 1695 iput(last_inode);
7c1332b8 1696 if (old_cap)
37151668 1697 ceph_put_cap(session->s_mdsc, old_cap);
7c1332b8 1698
5dacf091 1699 return ret;
2f2dc053
SW
1700}
1701
aaf67de7 1702static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
2f2dc053
SW
1703{
1704 struct ceph_inode_info *ci = ceph_inode(inode);
6c93df5d 1705 bool invalidate = false;
aaf67de7
XL
1706 struct ceph_cap *cap;
1707 int iputs = 0;
6c99f254 1708
be655596 1709 spin_lock(&ci->i_ceph_lock);
aaf67de7
XL
1710 cap = __get_cap_for_mds(ci, mds);
1711 if (cap) {
1712 dout(" removing cap %p, ci is %p, inode is %p\n",
1713 cap, ci, &ci->netfs.inode);
1714
1715 iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1716 }
be655596 1717 spin_unlock(&ci->i_ceph_lock);
77310320 1718
aaf67de7
XL
1719 if (cap)
1720 wake_up_all(&ci->i_cap_wq);
6c93df5d
YZ
1721 if (invalidate)
1722 ceph_queue_invalidate(inode);
36e6da98 1723 while (iputs--)
a6d37ccd 1724 iput(inode);
2f2dc053
SW
1725 return 0;
1726}
1727
1728/*
1729 * caller must hold session s_mutex
1730 */
1731static void remove_session_caps(struct ceph_mds_session *session)
1732{
6c93df5d
YZ
1733 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1734 struct super_block *sb = fsc->sb;
c8a96a31
JL
1735 LIST_HEAD(dispose);
1736
2f2dc053 1737 dout("remove_session_caps on %p\n", session);
f5d77269 1738 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
6f60f889 1739
c8799fc4
YZ
1740 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1741
6f60f889
YZ
1742 spin_lock(&session->s_cap_lock);
1743 if (session->s_nr_caps > 0) {
6f60f889
YZ
1744 struct inode *inode;
1745 struct ceph_cap *cap, *prev = NULL;
1746 struct ceph_vino vino;
1747 /*
1748 * iterate_session_caps() skips inodes that are being
1749 * deleted, we need to wait until deletions are complete.
1750 * __wait_on_freeing_inode() is designed for the job,
1751 * but it is not exported, so use lookup inode function
1752 * to access it.
1753 */
1754 while (!list_empty(&session->s_caps)) {
1755 cap = list_entry(session->s_caps.next,
1756 struct ceph_cap, session_caps);
1757 if (cap == prev)
1758 break;
1759 prev = cap;
1760 vino = cap->ci->i_vino;
1761 spin_unlock(&session->s_cap_lock);
1762
ed284c49 1763 inode = ceph_find_inode(sb, vino);
23c2c76e 1764 iput(inode);
6f60f889
YZ
1765
1766 spin_lock(&session->s_cap_lock);
1767 }
1768 }
745a8e3b
YZ
1769
1770 // drop cap expires and unlock s_cap_lock
c8a96a31 1771 detach_cap_releases(session, &dispose);
6f60f889 1772
2f2dc053 1773 BUG_ON(session->s_nr_caps > 0);
6c99f254 1774 BUG_ON(!list_empty(&session->s_cap_flushing));
c8a96a31
JL
1775 spin_unlock(&session->s_cap_lock);
1776 dispose_cap_releases(session->s_mdsc, &dispose);
2f2dc053
SW
1777}
1778
d2f8bb27
YZ
1779enum {
1780 RECONNECT,
1781 RENEWCAPS,
1782 FORCE_RO,
1783};
1784
2f2dc053
SW
1785/*
1786 * wake up any threads waiting on this session's caps. if the cap is
1787 * old (didn't get renewed on the client reconnect), remove it now.
1788 *
1789 * caller must hold s_mutex.
1790 */
aaf67de7 1791static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
2f2dc053 1792{
0dc2570f 1793 struct ceph_inode_info *ci = ceph_inode(inode);
d2f8bb27 1794 unsigned long ev = (unsigned long)arg;
0dc2570f 1795
d2f8bb27 1796 if (ev == RECONNECT) {
be655596 1797 spin_lock(&ci->i_ceph_lock);
0dc2570f
SW
1798 ci->i_wanted_max_size = 0;
1799 ci->i_requested_max_size = 0;
be655596 1800 spin_unlock(&ci->i_ceph_lock);
d2f8bb27 1801 } else if (ev == RENEWCAPS) {
aaf67de7
XL
1802 struct ceph_cap *cap;
1803
1804 spin_lock(&ci->i_ceph_lock);
1805 cap = __get_cap_for_mds(ci, mds);
1806 /* mds did not re-issue stale cap */
1807 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
d2f8bb27 1808 cap->issued = cap->implemented = CEPH_CAP_PIN;
aaf67de7 1809 spin_unlock(&ci->i_ceph_lock);
d2f8bb27 1810 } else if (ev == FORCE_RO) {
0dc2570f 1811 }
e5360309 1812 wake_up_all(&ci->i_cap_wq);
2f2dc053
SW
1813 return 0;
1814}
1815
d2f8bb27 1816static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
2f2dc053
SW
1817{
1818 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
f5d77269
JL
1819 ceph_iterate_session_caps(session, wake_up_session_cb,
1820 (void *)(unsigned long)ev);
2f2dc053
SW
1821}
1822
1823/*
1824 * Send periodic message to MDS renewing all currently held caps. The
1825 * ack will reset the expiration for all caps from this session.
1826 *
1827 * caller holds s_mutex
1828 */
1829static int send_renew_caps(struct ceph_mds_client *mdsc,
1830 struct ceph_mds_session *session)
1831{
1832 struct ceph_msg *msg;
1833 int state;
1834
1835 if (time_after_eq(jiffies, session->s_cap_ttl) &&
1836 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1837 pr_info("mds%d caps stale\n", session->s_mds);
e4cb4cb8 1838 session->s_renew_requested = jiffies;
2f2dc053
SW
1839
1840 /* do not try to renew caps until a recovering mds has reconnected
1841 * with its clients. */
1842 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1843 if (state < CEPH_MDS_STATE_RECONNECT) {
1844 dout("send_renew_caps ignoring mds%d (%s)\n",
1845 session->s_mds, ceph_mds_state_name(state));
1846 return 0;
1847 }
1848
1849 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1850 ceph_mds_state_name(state));
fba97e80
XL
1851 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1852 ++session->s_renew_seq);
a79832f2
SW
1853 if (!msg)
1854 return -ENOMEM;
2f2dc053
SW
1855 ceph_con_send(&session->s_con, msg);
1856 return 0;
1857}
1858
186e4f7a
YZ
1859static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1860 struct ceph_mds_session *session, u64 seq)
1861{
1862 struct ceph_msg *msg;
1863
1864 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
a687ecaf 1865 session->s_mds, ceph_session_state_name(session->s_state), seq);
fba97e80 1866 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
186e4f7a
YZ
1867 if (!msg)
1868 return -ENOMEM;
1869 ceph_con_send(&session->s_con, msg);
1870 return 0;
1871}
1872
1873
2f2dc053
SW
1874/*
1875 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
0dc2570f
SW
1876 *
1877 * Called under session->s_mutex
2f2dc053
SW
1878 */
1879static void renewed_caps(struct ceph_mds_client *mdsc,
1880 struct ceph_mds_session *session, int is_renew)
1881{
1882 int was_stale;
1883 int wake = 0;
1884
1885 spin_lock(&session->s_cap_lock);
1ce208a6 1886 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2f2dc053
SW
1887
1888 session->s_cap_ttl = session->s_renew_requested +
1889 mdsc->mdsmap->m_session_timeout*HZ;
1890
1891 if (was_stale) {
1892 if (time_before(jiffies, session->s_cap_ttl)) {
1893 pr_info("mds%d caps renewed\n", session->s_mds);
1894 wake = 1;
1895 } else {
1896 pr_info("mds%d caps still stale\n", session->s_mds);
1897 }
1898 }
1899 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1900 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1901 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1902 spin_unlock(&session->s_cap_lock);
1903
1904 if (wake)
d2f8bb27 1905 wake_up_session_caps(session, RENEWCAPS);
2f2dc053
SW
1906}
1907
1908/*
1909 * send a session close request
1910 */
3e699bd8 1911static int request_close_session(struct ceph_mds_session *session)
2f2dc053
SW
1912{
1913 struct ceph_msg *msg;
2f2dc053
SW
1914
1915 dout("request_close_session mds%d state %s seq %lld\n",
a687ecaf 1916 session->s_mds, ceph_session_state_name(session->s_state),
2f2dc053 1917 session->s_seq);
fba97e80
XL
1918 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1919 session->s_seq);
a79832f2
SW
1920 if (!msg)
1921 return -ENOMEM;
1922 ceph_con_send(&session->s_con, msg);
fcff415c 1923 return 1;
2f2dc053
SW
1924}
1925
1926/*
1927 * Called with s_mutex held.
1928 */
1929static int __close_session(struct ceph_mds_client *mdsc,
1930 struct ceph_mds_session *session)
1931{
1932 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1933 return 0;
1934 session->s_state = CEPH_MDS_SESSION_CLOSING;
3e699bd8 1935 return request_close_session(session);
2f2dc053
SW
1936}
1937
040d7860
YZ
1938static bool drop_negative_children(struct dentry *dentry)
1939{
1940 struct dentry *child;
1941 bool all_negative = true;
1942
1943 if (!d_is_dir(dentry))
1944 goto out;
1945
1946 spin_lock(&dentry->d_lock);
1947 list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1948 if (d_really_is_positive(child)) {
1949 all_negative = false;
1950 break;
1951 }
1952 }
1953 spin_unlock(&dentry->d_lock);
1954
1955 if (all_negative)
1956 shrink_dcache_parent(dentry);
1957out:
1958 return all_negative;
1959}
1960
2f2dc053
SW
1961/*
1962 * Trim old(er) caps.
1963 *
1964 * Because we can't cache an inode without one or more caps, we do
1965 * this indirectly: if a cap is unused, we prune its aliases, at which
1966 * point the inode will hopefully get dropped to.
1967 *
1968 * Yes, this is a bit sloppy. Our only real goal here is to respond to
1969 * memory pressure from the MDS, though, so it needn't be perfect.
1970 */
aaf67de7 1971static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2f2dc053 1972{
533a2818 1973 int *remaining = arg;
2f2dc053 1974 struct ceph_inode_info *ci = ceph_inode(inode);
979abfdd 1975 int used, wanted, oissued, mine;
aaf67de7 1976 struct ceph_cap *cap;
2f2dc053 1977
533a2818 1978 if (*remaining <= 0)
2f2dc053
SW
1979 return -1;
1980
be655596 1981 spin_lock(&ci->i_ceph_lock);
aaf67de7
XL
1982 cap = __get_cap_for_mds(ci, mds);
1983 if (!cap) {
1984 spin_unlock(&ci->i_ceph_lock);
1985 return 0;
1986 }
2f2dc053
SW
1987 mine = cap->issued | cap->implemented;
1988 used = __ceph_caps_used(ci);
979abfdd 1989 wanted = __ceph_caps_file_wanted(ci);
2f2dc053
SW
1990 oissued = __ceph_caps_issued_other(ci, cap);
1991
979abfdd 1992 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2f2dc053 1993 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
979abfdd
YZ
1994 ceph_cap_string(used), ceph_cap_string(wanted));
1995 if (cap == ci->i_auth_cap) {
622f3e25
YZ
1996 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1997 !list_empty(&ci->i_cap_snaps))
979abfdd
YZ
1998 goto out;
1999 if ((used | wanted) & CEPH_CAP_ANY_WR)
2000 goto out;
89aa5930
YZ
2001 /* Note: it's possible that i_filelock_ref becomes non-zero
2002 * after dropping auth caps. It doesn't hurt because reply
2003 * of lock mds request will re-add auth caps. */
2004 if (atomic_read(&ci->i_filelock_ref) > 0)
2005 goto out;
979abfdd 2006 }
5e804ac4
YZ
2007 /* The inode has cached pages, but it's no longer used.
2008 * we can safely drop it */
525d15e8
YZ
2009 if (S_ISREG(inode->i_mode) &&
2010 wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
5e804ac4
YZ
2011 !(oissued & CEPH_CAP_FILE_CACHE)) {
2012 used = 0;
2013 oissued = 0;
2014 }
979abfdd 2015 if ((used | wanted) & ~oissued & mine)
2f2dc053
SW
2016 goto out; /* we need these caps */
2017
2f2dc053
SW
2018 if (oissued) {
2019 /* we aren't the only cap.. just remove us */
a76d0a9c 2020 ceph_remove_cap(cap, true);
533a2818 2021 (*remaining)--;
2f2dc053 2022 } else {
040d7860 2023 struct dentry *dentry;
5e804ac4 2024 /* try dropping referring dentries */
be655596 2025 spin_unlock(&ci->i_ceph_lock);
040d7860
YZ
2026 dentry = d_find_any_alias(inode);
2027 if (dentry && drop_negative_children(dentry)) {
2028 int count;
2029 dput(dentry);
2030 d_prune_aliases(inode);
2031 count = atomic_read(&inode->i_count);
2032 if (count == 1)
533a2818 2033 (*remaining)--;
040d7860
YZ
2034 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2035 inode, cap, count);
2036 } else {
2037 dput(dentry);
2038 }
2f2dc053
SW
2039 return 0;
2040 }
2041
2042out:
be655596 2043 spin_unlock(&ci->i_ceph_lock);
2f2dc053
SW
2044 return 0;
2045}
2046
2047/*
2048 * Trim session cap count down to some max number.
2049 */
e30ee581
ZZ
2050int ceph_trim_caps(struct ceph_mds_client *mdsc,
2051 struct ceph_mds_session *session,
2052 int max_caps)
2f2dc053
SW
2053{
2054 int trim_caps = session->s_nr_caps - max_caps;
2055
2056 dout("trim_caps mds%d start: %d / %d, trim %d\n",
2057 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2058 if (trim_caps > 0) {
533a2818
JL
2059 int remaining = trim_caps;
2060
2061 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2f2dc053
SW
2062 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2063 session->s_mds, session->s_nr_caps, max_caps,
533a2818 2064 trim_caps - remaining);
2f2dc053 2065 }
a56371d9 2066
e3ec8d68 2067 ceph_flush_cap_releases(mdsc, session);
2f2dc053
SW
2068 return 0;
2069}
2070
8310b089
YZ
2071static int check_caps_flush(struct ceph_mds_client *mdsc,
2072 u64 want_flush_tid)
2073{
8310b089
YZ
2074 int ret = 1;
2075
2076 spin_lock(&mdsc->cap_dirty_lock);
e4500b5e
YZ
2077 if (!list_empty(&mdsc->cap_flush_list)) {
2078 struct ceph_cap_flush *cf =
2079 list_first_entry(&mdsc->cap_flush_list,
2080 struct ceph_cap_flush, g_list);
2081 if (cf->tid <= want_flush_tid) {
2082 dout("check_caps_flush still flushing tid "
2083 "%llu <= %llu\n", cf->tid, want_flush_tid);
2084 ret = 0;
2085 }
8310b089
YZ
2086 }
2087 spin_unlock(&mdsc->cap_dirty_lock);
2088 return ret;
d3383a8e
YZ
2089}
2090
2f2dc053
SW
2091/*
2092 * flush all dirty inode data to disk.
2093 *
8310b089 2094 * returns true if we've flushed through want_flush_tid
2f2dc053 2095 */
affbc19a 2096static void wait_caps_flush(struct ceph_mds_client *mdsc,
0e294387 2097 u64 want_flush_tid)
2f2dc053 2098{
0e294387 2099 dout("check_caps_flush want %llu\n", want_flush_tid);
8310b089
YZ
2100
2101 wait_event(mdsc->cap_flushing_wq,
2102 check_caps_flush(mdsc, want_flush_tid));
2103
2104 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2f2dc053
SW
2105}
2106
2107/*
2108 * called under s_mutex
2109 */
e3ec8d68
YZ
2110static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2111 struct ceph_mds_session *session)
2f2dc053 2112{
745a8e3b
YZ
2113 struct ceph_msg *msg = NULL;
2114 struct ceph_mds_cap_release *head;
2115 struct ceph_mds_cap_item *item;
92475f05 2116 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
745a8e3b
YZ
2117 struct ceph_cap *cap;
2118 LIST_HEAD(tmp_list);
2119 int num_cap_releases;
92475f05
JL
2120 __le32 barrier, *cap_barrier;
2121
2122 down_read(&osdc->lock);
2123 barrier = cpu_to_le32(osdc->epoch_barrier);
2124 up_read(&osdc->lock);
2f2dc053 2125
0f8605f2 2126 spin_lock(&session->s_cap_lock);
745a8e3b
YZ
2127again:
2128 list_splice_init(&session->s_cap_releases, &tmp_list);
2129 num_cap_releases = session->s_num_cap_releases;
2130 session->s_num_cap_releases = 0;
2f2dc053 2131 spin_unlock(&session->s_cap_lock);
e01a5946 2132
745a8e3b
YZ
2133 while (!list_empty(&tmp_list)) {
2134 if (!msg) {
2135 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
09cbfeaf 2136 PAGE_SIZE, GFP_NOFS, false);
745a8e3b
YZ
2137 if (!msg)
2138 goto out_err;
2139 head = msg->front.iov_base;
2140 head->num = cpu_to_le32(0);
2141 msg->front.iov_len = sizeof(*head);
92475f05
JL
2142
2143 msg->hdr.version = cpu_to_le16(2);
2144 msg->hdr.compat_version = cpu_to_le16(1);
745a8e3b 2145 }
92475f05 2146
745a8e3b
YZ
2147 cap = list_first_entry(&tmp_list, struct ceph_cap,
2148 session_caps);
2149 list_del(&cap->session_caps);
2150 num_cap_releases--;
e01a5946 2151
00bd8edb 2152 head = msg->front.iov_base;
4198aba4
JL
2153 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2154 &head->num);
745a8e3b
YZ
2155 item = msg->front.iov_base + msg->front.iov_len;
2156 item->ino = cpu_to_le64(cap->cap_ino);
2157 item->cap_id = cpu_to_le64(cap->cap_id);
2158 item->migrate_seq = cpu_to_le32(cap->mseq);
2159 item->seq = cpu_to_le32(cap->issue_seq);
2160 msg->front.iov_len += sizeof(*item);
2161
2162 ceph_put_cap(mdsc, cap);
2163
2164 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
92475f05
JL
2165 // Append cap_barrier field
2166 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2167 *cap_barrier = barrier;
2168 msg->front.iov_len += sizeof(*cap_barrier);
2169
745a8e3b
YZ
2170 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2171 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2172 ceph_con_send(&session->s_con, msg);
2173 msg = NULL;
2174 }
00bd8edb 2175 }
e01a5946 2176
745a8e3b 2177 BUG_ON(num_cap_releases != 0);
e01a5946 2178
745a8e3b
YZ
2179 spin_lock(&session->s_cap_lock);
2180 if (!list_empty(&session->s_cap_releases))
2181 goto again;
2182 spin_unlock(&session->s_cap_lock);
2183
2184 if (msg) {
92475f05
JL
2185 // Append cap_barrier field
2186 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2187 *cap_barrier = barrier;
2188 msg->front.iov_len += sizeof(*cap_barrier);
2189
745a8e3b
YZ
2190 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2191 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2192 ceph_con_send(&session->s_con, msg);
e01a5946 2193 }
745a8e3b
YZ
2194 return;
2195out_err:
2196 pr_err("send_cap_releases mds%d, failed to allocate message\n",
2197 session->s_mds);
2198 spin_lock(&session->s_cap_lock);
2199 list_splice(&tmp_list, &session->s_cap_releases);
2200 session->s_num_cap_releases += num_cap_releases;
2201 spin_unlock(&session->s_cap_lock);
e01a5946
SW
2202}
2203
e3ec8d68
YZ
2204static void ceph_cap_release_work(struct work_struct *work)
2205{
2206 struct ceph_mds_session *session =
2207 container_of(work, struct ceph_mds_session, s_cap_release_work);
2208
2209 mutex_lock(&session->s_mutex);
2210 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2211 session->s_state == CEPH_MDS_SESSION_HUNG)
2212 ceph_send_cap_releases(session->s_mdsc, session);
2213 mutex_unlock(&session->s_mutex);
2214 ceph_put_mds_session(session);
2215}
2216
2217void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2218 struct ceph_mds_session *session)
2219{
2220 if (mdsc->stopping)
2221 return;
2222
5b3248c6 2223 ceph_get_mds_session(session);
e3ec8d68
YZ
2224 if (queue_work(mdsc->fsc->cap_wq,
2225 &session->s_cap_release_work)) {
2226 dout("cap release work queued\n");
2227 } else {
2228 ceph_put_mds_session(session);
2229 dout("failed to queue cap release work\n");
2230 }
2231}
2232
2233/*
2234 * caller holds session->s_cap_lock
2235 */
2236void __ceph_queue_cap_release(struct ceph_mds_session *session,
2237 struct ceph_cap *cap)
2238{
2239 list_add_tail(&cap->session_caps, &session->s_cap_releases);
2240 session->s_num_cap_releases++;
2241
2242 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2243 ceph_flush_cap_releases(session->s_mdsc, session);
2244}
2245
37c4efc1
YZ
2246static void ceph_cap_reclaim_work(struct work_struct *work)
2247{
2248 struct ceph_mds_client *mdsc =
2249 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2250 int ret = ceph_trim_dentries(mdsc);
2251 if (ret == -EAGAIN)
2252 ceph_queue_cap_reclaim_work(mdsc);
2253}
2254
2255void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2256{
2257 if (mdsc->stopping)
2258 return;
2259
2260 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2261 dout("caps reclaim work queued\n");
2262 } else {
2263 dout("failed to queue caps release work\n");
2264 }
2265}
2266
fe33032d
YZ
2267void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2268{
2269 int val;
2270 if (!nr)
2271 return;
2272 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
bba1560b 2273 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
fe33032d
YZ
2274 atomic_set(&mdsc->cap_reclaim_pending, 0);
2275 ceph_queue_cap_reclaim_work(mdsc);
2276 }
2277}
2278
2f2dc053
SW
2279/*
2280 * requests
2281 */
2282
54008399
YZ
2283int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2284 struct inode *dir)
2285{
2286 struct ceph_inode_info *ci = ceph_inode(dir);
2287 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2288 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2a5beea3 2289 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
ad8c28a9
JL
2290 unsigned int num_entries;
2291 int order;
54008399
YZ
2292
2293 spin_lock(&ci->i_ceph_lock);
2294 num_entries = ci->i_files + ci->i_subdirs;
2295 spin_unlock(&ci->i_ceph_lock);
ad8c28a9 2296 num_entries = max(num_entries, 1U);
54008399
YZ
2297 num_entries = min(num_entries, opt->max_readdir);
2298
2299 order = get_order(size * num_entries);
2300 while (order >= 0) {
2a5beea3 2301 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2941bf53
XL
2302 __GFP_NOWARN |
2303 __GFP_ZERO,
2a5beea3
YZ
2304 order);
2305 if (rinfo->dir_entries)
54008399
YZ
2306 break;
2307 order--;
2308 }
2a5beea3 2309 if (!rinfo->dir_entries)
54008399
YZ
2310 return -ENOMEM;
2311
2312 num_entries = (PAGE_SIZE << order) / size;
2313 num_entries = min(num_entries, opt->max_readdir);
2314
2315 rinfo->dir_buf_size = PAGE_SIZE << order;
2316 req->r_num_caps = num_entries + 1;
2317 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2318 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2319 return 0;
2320}
2321
2f2dc053
SW
2322/*
2323 * Create an mds request.
2324 */
2325struct ceph_mds_request *
2326ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2327{
058daab7 2328 struct ceph_mds_request *req;
2f2dc053 2329
058daab7 2330 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2f2dc053
SW
2331 if (!req)
2332 return ERR_PTR(-ENOMEM);
2333
b4556396 2334 mutex_init(&req->r_fill_mutex);
37151668 2335 req->r_mdsc = mdsc;
2f2dc053 2336 req->r_started = jiffies;
70c94820 2337 req->r_start_latency = ktime_get();
2f2dc053
SW
2338 req->r_resend_mds = -1;
2339 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
68cd5b4b 2340 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2f2dc053 2341 req->r_fmode = -1;
6eb06c46 2342 req->r_feature_needed = -1;
153c8e6b 2343 kref_init(&req->r_kref);
fcd00b68 2344 RB_CLEAR_NODE(&req->r_node);
2f2dc053
SW
2345 INIT_LIST_HEAD(&req->r_wait);
2346 init_completion(&req->r_completion);
2347 init_completion(&req->r_safe_completion);
2348 INIT_LIST_HEAD(&req->r_unsafe_item);
2349
668c9a61 2350 ktime_get_coarse_real_ts64(&req->r_stamp);
b8e69066 2351
2f2dc053
SW
2352 req->r_op = op;
2353 req->r_direct_mode = mode;
2354 return req;
2355}
2356
2357/*
44ca18f2 2358 * return oldest (lowest) request, tid in request tree, 0 if none.
2f2dc053
SW
2359 *
2360 * called under mdsc->mutex.
2361 */
44ca18f2
SW
2362static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2363{
2364 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2365 return NULL;
2366 return rb_entry(rb_first(&mdsc->request_tree),
2367 struct ceph_mds_request, r_node);
2368}
2369
e8a7b8b1 2370static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2f2dc053 2371{
e8a7b8b1 2372 return mdsc->oldest_tid;
2f2dc053
SW
2373}
2374
2375/*
2376 * Build a dentry's path. Allocate on heap; caller must kfree. Based
2377 * on build_path_from_dentry in fs/cifs/dir.c.
2378 *
2379 * If @stop_on_nosnap, generate path relative to the first non-snapped
2380 * inode.
2381 *
2382 * Encode hidden .snap dirs as a double /, i.e.
2383 * foo/.snap/bar -> foo//bar
2384 */
69a10fb3 2385char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2f2dc053
SW
2386 int stop_on_nosnap)
2387{
2388 struct dentry *temp;
2389 char *path;
f77f21bb 2390 int pos;
1b71fe2e 2391 unsigned seq;
69a10fb3 2392 u64 base;
2f2dc053 2393
d37b1d99 2394 if (!dentry)
2f2dc053
SW
2395 return ERR_PTR(-EINVAL);
2396
f77f21bb 2397 path = __getname();
d37b1d99 2398 if (!path)
2f2dc053 2399 return ERR_PTR(-ENOMEM);
f77f21bb
JL
2400retry:
2401 pos = PATH_MAX - 1;
2402 path[pos] = '\0';
2403
2404 seq = read_seqbegin(&rename_lock);
1b71fe2e 2405 rcu_read_lock();
f77f21bb
JL
2406 temp = dentry;
2407 for (;;) {
1b71fe2e 2408 struct inode *inode;
2f2dc053 2409
1b71fe2e 2410 spin_lock(&temp->d_lock);
2b0143b5 2411 inode = d_inode(temp);
2f2dc053 2412 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
104648ad 2413 dout("build_path path+%d: %p SNAPDIR\n",
2f2dc053 2414 pos, temp);
d6b8bd67 2415 } else if (stop_on_nosnap && inode && dentry != temp &&
2f2dc053 2416 ceph_snap(inode) == CEPH_NOSNAP) {
9d5a09e6 2417 spin_unlock(&temp->d_lock);
d6b8bd67 2418 pos++; /* get rid of any prepended '/' */
2f2dc053
SW
2419 break;
2420 } else {
2421 pos -= temp->d_name.len;
1b71fe2e
AV
2422 if (pos < 0) {
2423 spin_unlock(&temp->d_lock);
2f2dc053 2424 break;
1b71fe2e 2425 }
f77f21bb 2426 memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2f2dc053 2427 }
1b71fe2e 2428 spin_unlock(&temp->d_lock);
41883ba8 2429 temp = READ_ONCE(temp->d_parent);
f77f21bb
JL
2430
2431 /* Are we at the root? */
2432 if (IS_ROOT(temp))
2433 break;
2434
2435 /* Are we out of buffer? */
2436 if (--pos < 0)
2437 break;
2438
2439 path[pos] = '/';
2f2dc053 2440 }
69a10fb3 2441 base = ceph_ino(d_inode(temp));
1b71fe2e 2442 rcu_read_unlock();
f5946bcc
JL
2443
2444 if (read_seqretry(&rename_lock, seq))
2445 goto retry;
2446
2447 if (pos < 0) {
2448 /*
2449 * A rename didn't occur, but somehow we didn't end up where
2450 * we thought we would. Throw a warning and try again.
2451 */
2452 pr_warn("build_path did not end path lookup where "
2453 "expected, pos is %d\n", pos);
2f2dc053
SW
2454 goto retry;
2455 }
2456
69a10fb3 2457 *pbase = base;
f77f21bb 2458 *plen = PATH_MAX - 1 - pos;
104648ad 2459 dout("build_path on %p %d built %llx '%.*s'\n",
f77f21bb
JL
2460 dentry, d_count(dentry), base, *plen, path + pos);
2461 return path + pos;
2f2dc053
SW
2462}
2463
fd36a717 2464static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2f2dc053 2465 const char **ppath, int *ppathlen, u64 *pino,
1bcb3440 2466 bool *pfreepath, bool parent_locked)
2f2dc053
SW
2467{
2468 char *path;
2469
c6b0b656 2470 rcu_read_lock();
fd36a717
JL
2471 if (!dir)
2472 dir = d_inode_rcu(dentry->d_parent);
964fff74 2473 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
c6b0b656
JL
2474 *pino = ceph_ino(dir);
2475 rcu_read_unlock();
964fff74
JL
2476 *ppath = dentry->d_name.name;
2477 *ppathlen = dentry->d_name.len;
2f2dc053
SW
2478 return 0;
2479 }
c6b0b656 2480 rcu_read_unlock();
2f2dc053
SW
2481 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2482 if (IS_ERR(path))
2483 return PTR_ERR(path);
2484 *ppath = path;
1bcb3440 2485 *pfreepath = true;
2f2dc053
SW
2486 return 0;
2487}
2488
2489static int build_inode_path(struct inode *inode,
2490 const char **ppath, int *ppathlen, u64 *pino,
1bcb3440 2491 bool *pfreepath)
2f2dc053
SW
2492{
2493 struct dentry *dentry;
2494 char *path;
2495
2496 if (ceph_snap(inode) == CEPH_NOSNAP) {
2497 *pino = ceph_ino(inode);
2498 *ppathlen = 0;
2499 return 0;
2500 }
2501 dentry = d_find_alias(inode);
2502 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2503 dput(dentry);
2504 if (IS_ERR(path))
2505 return PTR_ERR(path);
2506 *ppath = path;
1bcb3440 2507 *pfreepath = true;
2f2dc053
SW
2508 return 0;
2509}
2510
2511/*
2512 * request arguments may be specified via an inode *, a dentry *, or
2513 * an explicit ino+path.
2514 */
2515static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
fd36a717
JL
2516 struct inode *rdiri, const char *rpath,
2517 u64 rino, const char **ppath, int *pathlen,
1bcb3440 2518 u64 *ino, bool *freepath, bool parent_locked)
2f2dc053
SW
2519{
2520 int r = 0;
2521
2522 if (rinode) {
2523 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2524 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2525 ceph_snap(rinode));
2526 } else if (rdentry) {
fd36a717 2527 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
1bcb3440 2528 freepath, parent_locked);
2f2dc053
SW
2529 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2530 *ppath);
795858db 2531 } else if (rpath || rino) {
2f2dc053
SW
2532 *ino = rino;
2533 *ppath = rpath;
b000056a 2534 *pathlen = rpath ? strlen(rpath) : 0;
2f2dc053
SW
2535 dout(" path %.*s\n", *pathlen, rpath);
2536 }
2537
2538 return r;
2539}
2540
60267ba3
ID
2541static void encode_timestamp_and_gids(void **p,
2542 const struct ceph_mds_request *req)
2543{
2544 struct ceph_timespec ts;
2545 int i;
2546
2547 ceph_encode_timespec64(&ts, &req->r_stamp);
2548 ceph_encode_copy(p, &ts, sizeof(ts));
2549
2550 /* gid_list */
2551 ceph_encode_32(p, req->r_cred->group_info->ngroups);
2552 for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2553 ceph_encode_64(p, from_kgid(&init_user_ns,
2554 req->r_cred->group_info->gid[i]));
2555}
2556
2f2dc053
SW
2557/*
2558 * called under mdsc->mutex
2559 */
4f1ddb1e 2560static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2f2dc053 2561 struct ceph_mds_request *req,
4f1ddb1e 2562 bool drop_cap_releases)
2f2dc053 2563{
4f1ddb1e
JL
2564 int mds = session->s_mds;
2565 struct ceph_mds_client *mdsc = session->s_mdsc;
2f2dc053 2566 struct ceph_msg *msg;
4f1ddb1e 2567 struct ceph_mds_request_head_old *head;
2f2dc053
SW
2568 const char *path1 = NULL;
2569 const char *path2 = NULL;
2570 u64 ino1 = 0, ino2 = 0;
2571 int pathlen1 = 0, pathlen2 = 0;
1bcb3440 2572 bool freepath1 = false, freepath2 = false;
a5ffd7b6 2573 struct dentry *old_dentry = NULL;
60267ba3 2574 int len;
2f2dc053
SW
2575 u16 releases;
2576 void *p, *end;
2577 int ret;
4f1ddb1e 2578 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2f2dc053
SW
2579
2580 ret = set_request_path_attr(req->r_inode, req->r_dentry,
3dd69aab 2581 req->r_parent, req->r_path1, req->r_ino1.ino,
1bcb3440
JL
2582 &path1, &pathlen1, &ino1, &freepath1,
2583 test_bit(CEPH_MDS_R_PARENT_LOCKED,
2584 &req->r_req_flags));
2f2dc053
SW
2585 if (ret < 0) {
2586 msg = ERR_PTR(ret);
2587 goto out;
2588 }
2589
1bcb3440 2590 /* If r_old_dentry is set, then assume that its parent is locked */
a5ffd7b6
XL
2591 if (req->r_old_dentry &&
2592 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
2593 old_dentry = req->r_old_dentry;
2594 ret = set_request_path_attr(NULL, old_dentry,
fd36a717 2595 req->r_old_dentry_dir,
2f2dc053 2596 req->r_path2, req->r_ino2.ino,
1bcb3440 2597 &path2, &pathlen2, &ino2, &freepath2, true);
2f2dc053
SW
2598 if (ret < 0) {
2599 msg = ERR_PTR(ret);
2600 goto out_free1;
2601 }
2602
60267ba3 2603 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
4f1ddb1e 2604 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
777d738a 2605 sizeof(struct ceph_timespec);
60267ba3 2606 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2f2dc053
SW
2607
2608 /* calculate (max) length for cap releases */
2609 len += sizeof(struct ceph_mds_request_release) *
2610 (!!req->r_inode_drop + !!req->r_dentry_drop +
2611 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
4f1ddb1e 2612
2f2dc053 2613 if (req->r_dentry_drop)
c1dfc277 2614 len += pathlen1;
2f2dc053 2615 if (req->r_old_dentry_drop)
c1dfc277 2616 len += pathlen2;
2f2dc053 2617
0d9c1ab3 2618 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
a79832f2
SW
2619 if (!msg) {
2620 msg = ERR_PTR(-ENOMEM);
2f2dc053 2621 goto out_free2;
a79832f2 2622 }
2f2dc053 2623
6df058c0
SW
2624 msg->hdr.tid = cpu_to_le64(req->r_tid);
2625
4f1ddb1e 2626 /*
60267ba3 2627 * The old ceph_mds_request_head didn't contain a version field, and
4f1ddb1e
JL
2628 * one was added when we moved the message version from 3->4.
2629 */
2630 if (legacy) {
2631 msg->hdr.version = cpu_to_le16(3);
2632 head = msg->front.iov_base;
2633 p = msg->front.iov_base + sizeof(*head);
2634 } else {
2635 struct ceph_mds_request_head *new_head = msg->front.iov_base;
2636
2637 msg->hdr.version = cpu_to_le16(4);
2638 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2639 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2640 p = msg->front.iov_base + sizeof(*new_head);
2641 }
2642
2f2dc053
SW
2643 end = msg->front.iov_base + msg->front.iov_len;
2644
2645 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2646 head->op = cpu_to_le32(req->r_op);
7fe0cdeb
JL
2647 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2648 req->r_cred->fsuid));
2649 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2650 req->r_cred->fsgid));
6deb8008 2651 head->ino = cpu_to_le64(req->r_deleg_ino);
2f2dc053
SW
2652 head->args = req->r_args;
2653
2654 ceph_encode_filepath(&p, end, ino1, path1);
2655 ceph_encode_filepath(&p, end, ino2, path2);
2656
e979cf50
SW
2657 /* make note of release offset, in case we need to replay */
2658 req->r_request_release_offset = p - msg->front.iov_base;
2659
2f2dc053
SW
2660 /* cap releases */
2661 releases = 0;
2662 if (req->r_inode_drop)
2663 releases += ceph_encode_inode_release(&p,
2b0143b5 2664 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
719a2514
YZ
2665 mds, req->r_inode_drop, req->r_inode_unless,
2666 req->r_op == CEPH_MDS_OP_READDIR);
2f2dc053
SW
2667 if (req->r_dentry_drop)
2668 releases += ceph_encode_dentry_release(&p, req->r_dentry,
3dd69aab 2669 req->r_parent, mds, req->r_dentry_drop,
ca6c8ae0 2670 req->r_dentry_unless);
2f2dc053
SW
2671 if (req->r_old_dentry_drop)
2672 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
ca6c8ae0
JL
2673 req->r_old_dentry_dir, mds,
2674 req->r_old_dentry_drop,
2675 req->r_old_dentry_unless);
2f2dc053
SW
2676 if (req->r_old_inode_drop)
2677 releases += ceph_encode_inode_release(&p,
2b0143b5 2678 d_inode(req->r_old_dentry),
2f2dc053 2679 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
6e6f0923
YZ
2680
2681 if (drop_cap_releases) {
2682 releases = 0;
2683 p = msg->front.iov_base + req->r_request_release_offset;
2684 }
2685
2f2dc053
SW
2686 head->num_releases = cpu_to_le16(releases);
2687
60267ba3 2688 encode_timestamp_and_gids(&p, req);
4f1ddb1e 2689
b682c6d4
XL
2690 if (WARN_ON_ONCE(p > end)) {
2691 ceph_msg_put(msg);
2692 msg = ERR_PTR(-ERANGE);
2693 goto out_free2;
2694 }
2695
2f2dc053
SW
2696 msg->front.iov_len = p - msg->front.iov_base;
2697 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2698
25e6bae3
YZ
2699 if (req->r_pagelist) {
2700 struct ceph_pagelist *pagelist = req->r_pagelist;
25e6bae3
YZ
2701 ceph_msg_data_add_pagelist(msg, pagelist);
2702 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2703 } else {
2704 msg->hdr.data_len = 0;
ebf18f47 2705 }
02afca6c 2706
2f2dc053
SW
2707 msg->hdr.data_off = cpu_to_le16(0);
2708
2709out_free2:
2710 if (freepath2)
f77f21bb 2711 ceph_mdsc_free_path((char *)path2, pathlen2);
2f2dc053
SW
2712out_free1:
2713 if (freepath1)
f77f21bb 2714 ceph_mdsc_free_path((char *)path1, pathlen1);
2f2dc053
SW
2715out:
2716 return msg;
2717}
2718
2719/*
2720 * called under mdsc->mutex if error, under no mutex if
2721 * success.
2722 */
2723static void complete_request(struct ceph_mds_client *mdsc,
2724 struct ceph_mds_request *req)
2725{
70c94820
XL
2726 req->r_end_latency = ktime_get();
2727
2f2dc053
SW
2728 if (req->r_callback)
2729 req->r_callback(mdsc, req);
111c7081 2730 complete_all(&req->r_completion);
2f2dc053
SW
2731}
2732
4f1ddb1e
JL
2733static struct ceph_mds_request_head_old *
2734find_old_request_head(void *p, u64 features)
2735{
2736 bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2737 struct ceph_mds_request_head *new_head;
2738
2739 if (legacy)
2740 return (struct ceph_mds_request_head_old *)p;
2741 new_head = (struct ceph_mds_request_head *)p;
2742 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2743}
2744
2f2dc053
SW
2745/*
2746 * called under mdsc->mutex
2747 */
396bd62c 2748static int __prepare_send_request(struct ceph_mds_session *session,
2f2dc053 2749 struct ceph_mds_request *req,
396bd62c 2750 bool drop_cap_releases)
2f2dc053 2751{
396bd62c
JL
2752 int mds = session->s_mds;
2753 struct ceph_mds_client *mdsc = session->s_mdsc;
4f1ddb1e 2754 struct ceph_mds_request_head_old *rhead;
2f2dc053 2755 struct ceph_msg *msg;
546a5d61
XL
2756 int flags = 0, max_retry;
2757
2758 /*
2759 * The type of 'r_attempts' in kernel 'ceph_mds_request'
2760 * is 'int', while in 'ceph_mds_request_head' the type of
2761 * 'num_retry' is '__u8'. So in case the request retries
2762 * exceeding 256 times, the MDS will receive a incorrect
2763 * retry seq.
2764 *
2765 * In this case it's ususally a bug in MDS and continue
2766 * retrying the request makes no sense.
2767 *
2768 * In future this could be fixed in ceph code, so avoid
2769 * using the hardcode here.
2770 */
2771 max_retry = sizeof_field(struct ceph_mds_request_head, num_retry);
2772 max_retry = 1 << (max_retry * BITS_PER_BYTE);
2773 if (req->r_attempts >= max_retry) {
2774 pr_warn_ratelimited("%s request tid %llu seq overflow\n",
2775 __func__, req->r_tid);
2776 return -EMULTIHOP;
2777 }
2f2dc053 2778
2f2dc053 2779 req->r_attempts++;
e55b71f8
GF
2780 if (req->r_inode) {
2781 struct ceph_cap *cap =
2782 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2783
2784 if (cap)
2785 req->r_sent_on_mseq = cap->mseq;
2786 else
2787 req->r_sent_on_mseq = -1;
2788 }
546a5d61 2789 dout("%s %p tid %lld %s (attempt %d)\n", __func__, req,
2f2dc053
SW
2790 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2791
bc2de10d 2792 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
c5c9a0bf 2793 void *p;
4f1ddb1e 2794
01a92f17
SW
2795 /*
2796 * Replay. Do not regenerate message (and rebuild
2797 * paths, etc.); just use the original message.
2798 * Rebuilding paths will break for renames because
2799 * d_move mangles the src name.
2800 */
2801 msg = req->r_request;
4f1ddb1e
JL
2802 rhead = find_old_request_head(msg->front.iov_base,
2803 session->s_con.peer_features);
01a92f17
SW
2804
2805 flags = le32_to_cpu(rhead->flags);
2806 flags |= CEPH_MDS_FLAG_REPLAY;
2807 rhead->flags = cpu_to_le32(flags);
2808
2809 if (req->r_target_inode)
2810 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2811
2812 rhead->num_retry = req->r_attempts - 1;
e979cf50
SW
2813
2814 /* remove cap/dentry releases from message */
2815 rhead->num_releases = 0;
c5c9a0bf 2816
c5c9a0bf 2817 p = msg->front.iov_base + req->r_request_release_offset;
60267ba3 2818 encode_timestamp_and_gids(&p, req);
c5c9a0bf
YZ
2819
2820 msg->front.iov_len = p - msg->front.iov_base;
2821 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
01a92f17
SW
2822 return 0;
2823 }
2824
2f2dc053
SW
2825 if (req->r_request) {
2826 ceph_msg_put(req->r_request);
2827 req->r_request = NULL;
2828 }
4f1ddb1e 2829 msg = create_request_message(session, req, drop_cap_releases);
2f2dc053 2830 if (IS_ERR(msg)) {
e1518c7c 2831 req->r_err = PTR_ERR(msg);
a79832f2 2832 return PTR_ERR(msg);
2f2dc053
SW
2833 }
2834 req->r_request = msg;
2835
4f1ddb1e
JL
2836 rhead = find_old_request_head(msg->front.iov_base,
2837 session->s_con.peer_features);
2f2dc053 2838 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
bc2de10d 2839 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2f2dc053 2840 flags |= CEPH_MDS_FLAG_REPLAY;
3bb48b41
JL
2841 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2842 flags |= CEPH_MDS_FLAG_ASYNC;
3dd69aab 2843 if (req->r_parent)
2f2dc053
SW
2844 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2845 rhead->flags = cpu_to_le32(flags);
2846 rhead->num_fwd = req->r_num_fwd;
2847 rhead->num_retry = req->r_attempts - 1;
2848
3dd69aab 2849 dout(" r_parent = %p\n", req->r_parent);
2f2dc053
SW
2850 return 0;
2851}
2852
9cf54563
XL
2853/*
2854 * called under mdsc->mutex
2855 */
396bd62c 2856static int __send_request(struct ceph_mds_session *session,
9cf54563
XL
2857 struct ceph_mds_request *req,
2858 bool drop_cap_releases)
2859{
2860 int err;
2861
396bd62c 2862 err = __prepare_send_request(session, req, drop_cap_releases);
9cf54563
XL
2863 if (!err) {
2864 ceph_msg_get(req->r_request);
2865 ceph_con_send(&session->s_con, req->r_request);
2866 }
2867
2868 return err;
2869}
2870
2f2dc053
SW
2871/*
2872 * send request, or put it on the appropriate wait list.
2873 */
d5548492 2874static void __do_request(struct ceph_mds_client *mdsc,
2f2dc053
SW
2875 struct ceph_mds_request *req)
2876{
2877 struct ceph_mds_session *session = NULL;
2878 int mds = -1;
48fec5d0 2879 int err = 0;
c4853e97 2880 bool random;
2f2dc053 2881
bc2de10d
JL
2882 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2883 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
eb1b8af3 2884 __unregister_request(mdsc, req);
d5548492 2885 return;
eb1b8af3 2886 }
2f2dc053 2887
a68e564a
XL
2888 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
2889 dout("do_request metadata corrupted\n");
2890 err = -EIO;
2891 goto finish;
2892 }
2f2dc053
SW
2893 if (req->r_timeout &&
2894 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2895 dout("do_request timed out\n");
8ccf7fcc 2896 err = -ETIMEDOUT;
2f2dc053
SW
2897 goto finish;
2898 }
52953d55 2899 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
48fec5d0
YZ
2900 dout("do_request forced umount\n");
2901 err = -EIO;
2902 goto finish;
2903 }
52953d55 2904 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
e9e427f0
YZ
2905 if (mdsc->mdsmap_err) {
2906 err = mdsc->mdsmap_err;
2907 dout("do_request mdsmap err %d\n", err);
2908 goto finish;
2909 }
cc8e8342
YZ
2910 if (mdsc->mdsmap->m_epoch == 0) {
2911 dout("do_request no mdsmap, waiting for map\n");
2912 list_add(&req->r_wait, &mdsc->waiting_for_map);
d5548492 2913 return;
cc8e8342 2914 }
e9e427f0
YZ
2915 if (!(mdsc->fsc->mount_options->flags &
2916 CEPH_MOUNT_OPT_MOUNTWAIT) &&
2917 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
97820058 2918 err = -EHOSTUNREACH;
e9e427f0
YZ
2919 goto finish;
2920 }
2921 }
2f2dc053 2922
dc69e2e9
SW
2923 put_request_session(req);
2924
c4853e97 2925 mds = __choose_mds(mdsc, req, &random);
2f2dc053
SW
2926 if (mds < 0 ||
2927 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3bb48b41
JL
2928 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2929 err = -EJUKEBOX;
2930 goto finish;
2931 }
2f2dc053
SW
2932 dout("do_request no mds or not active, waiting for map\n");
2933 list_add(&req->r_wait, &mdsc->waiting_for_map);
d5548492 2934 return;
2f2dc053
SW
2935 }
2936
2937 /* get, open session */
2938 session = __ceph_lookup_mds_session(mdsc, mds);
9c423956 2939 if (!session) {
2f2dc053 2940 session = register_session(mdsc, mds);
9c423956
SW
2941 if (IS_ERR(session)) {
2942 err = PTR_ERR(session);
2943 goto finish;
2944 }
2945 }
5b3248c6 2946 req->r_session = ceph_get_mds_session(session);
dc69e2e9 2947
2f2dc053 2948 dout("do_request mds%d session %p state %s\n", mds, session,
a687ecaf 2949 ceph_session_state_name(session->s_state));
6eb06c46
XL
2950
2951 /*
2952 * The old ceph will crash the MDSs when see unknown OPs
2953 */
2954 if (req->r_feature_needed > 0 &&
2955 !test_bit(req->r_feature_needed, &session->s_features)) {
2956 err = -EOPNOTSUPP;
2957 goto out_session;
2958 }
2959
2f2dc053
SW
2960 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2961 session->s_state != CEPH_MDS_SESSION_HUNG) {
3bb48b41
JL
2962 /*
2963 * We cannot queue async requests since the caps and delegated
2964 * inodes are bound to the session. Just return -EJUKEBOX and
2965 * let the caller retry a sync request in that case.
2966 */
2967 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2968 err = -EJUKEBOX;
2969 goto out_session;
2970 }
4ae3713f
JL
2971
2972 /*
2973 * If the session has been REJECTED, then return a hard error,
2974 * unless it's a CLEANRECOVER mount, in which case we'll queue
2975 * it to the mdsc queue.
2976 */
2977 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2978 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
2979 list_add(&req->r_wait, &mdsc->waiting_for_map);
2980 else
2981 err = -EACCES;
2982 goto out_session;
2983 }
2984
2f2dc053 2985 if (session->s_state == CEPH_MDS_SESSION_NEW ||
c4853e97 2986 session->s_state == CEPH_MDS_SESSION_CLOSING) {
b682c6d4
XL
2987 err = __open_session(mdsc, session);
2988 if (err)
2989 goto out_session;
c4853e97
XL
2990 /* retry the same mds later */
2991 if (random)
2992 req->r_resend_mds = mds;
2993 }
2f2dc053
SW
2994 list_add(&req->r_wait, &session->s_waiting);
2995 goto out_session;
2996 }
2997
2998 /* send request */
2f2dc053
SW
2999 req->r_resend_mds = -1; /* forget any previous mds hint */
3000
3001 if (req->r_request_started == 0) /* note request start time */
3002 req->r_request_started = jiffies;
3003
00061645
XL
3004 /*
3005 * For async create we will choose the auth MDS of frag in parent
3006 * directory to send the request and ususally this works fine, but
3007 * if the migrated the dirtory to another MDS before it could handle
3008 * it the request will be forwarded.
3009 *
3010 * And then the auth cap will be changed.
3011 */
3012 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3013 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3014 struct ceph_inode_info *ci;
3015 struct ceph_cap *cap;
3016
3017 /*
3018 * The request maybe handled very fast and the new inode
3019 * hasn't been linked to the dentry yet. We need to wait
3020 * for the ceph_finish_async_create(), which shouldn't be
3021 * stuck too long or fail in thoery, to finish when forwarding
3022 * the request.
3023 */
3024 if (!d_inode(req->r_dentry)) {
3025 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3026 TASK_KILLABLE);
3027 if (err) {
3028 mutex_lock(&req->r_fill_mutex);
3029 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3030 mutex_unlock(&req->r_fill_mutex);
3031 goto out_session;
3032 }
3033 }
3034
3035 ci = ceph_inode(d_inode(req->r_dentry));
3036
3037 spin_lock(&ci->i_ceph_lock);
3038 cap = ci->i_auth_cap;
3039 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3040 dout("do_request session changed for auth cap %d -> %d\n",
3041 cap->session->s_mds, session->s_mds);
3042
3043 /* Remove the auth cap from old session */
3044 spin_lock(&cap->session->s_cap_lock);
3045 cap->session->s_nr_caps--;
3046 list_del_init(&cap->session_caps);
3047 spin_unlock(&cap->session->s_cap_lock);
3048
3049 /* Add the auth cap to the new session */
3050 cap->mds = mds;
3051 cap->session = session;
3052 spin_lock(&session->s_cap_lock);
3053 session->s_nr_caps++;
3054 list_add_tail(&cap->session_caps, &session->s_caps);
3055 spin_unlock(&session->s_cap_lock);
3056
3057 change_auth_cap_ses(ci, session);
3058 }
3059 spin_unlock(&ci->i_ceph_lock);
3060 }
3061
396bd62c 3062 err = __send_request(session, req, false);
2f2dc053
SW
3063
3064out_session:
3065 ceph_put_mds_session(session);
48fec5d0
YZ
3066finish:
3067 if (err) {
3068 dout("__do_request early error %d\n", err);
3069 req->r_err = err;
3070 complete_request(mdsc, req);
3071 __unregister_request(mdsc, req);
3072 }
d5548492 3073 return;
2f2dc053
SW
3074}
3075
3076/*
3077 * called under mdsc->mutex
3078 */
3079static void __wake_requests(struct ceph_mds_client *mdsc,
3080 struct list_head *head)
3081{
ed75ec2c
YZ
3082 struct ceph_mds_request *req;
3083 LIST_HEAD(tmp_list);
3084
3085 list_splice_init(head, &tmp_list);
2f2dc053 3086
ed75ec2c
YZ
3087 while (!list_empty(&tmp_list)) {
3088 req = list_entry(tmp_list.next,
3089 struct ceph_mds_request, r_wait);
2f2dc053 3090 list_del_init(&req->r_wait);
7971bd92 3091 dout(" wake request %p tid %llu\n", req, req->r_tid);
2f2dc053
SW
3092 __do_request(mdsc, req);
3093 }
3094}
3095
3096/*
3097 * Wake up threads with requests pending for @mds, so that they can
29790f26 3098 * resubmit their requests to a possibly different mds.
2f2dc053 3099 */
29790f26 3100static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2f2dc053 3101{
44ca18f2 3102 struct ceph_mds_request *req;
282c1052 3103 struct rb_node *p = rb_first(&mdsc->request_tree);
2f2dc053
SW
3104
3105 dout("kick_requests mds%d\n", mds);
282c1052 3106 while (p) {
44ca18f2 3107 req = rb_entry(p, struct ceph_mds_request, r_node);
282c1052 3108 p = rb_next(p);
bc2de10d 3109 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
44ca18f2 3110 continue;
3de22be6
YZ
3111 if (req->r_attempts > 0)
3112 continue; /* only new requests */
44ca18f2
SW
3113 if (req->r_session &&
3114 req->r_session->s_mds == mds) {
3115 dout(" kicking tid %llu\n", req->r_tid);
03974e81 3116 list_del_init(&req->r_wait);
44ca18f2 3117 __do_request(mdsc, req);
2f2dc053
SW
3118 }
3119 }
3120}
3121
86bda539 3122int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2f2dc053
SW
3123 struct ceph_mds_request *req)
3124{
891f3f5a 3125 int err = 0;
86bda539
JL
3126
3127 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3128 if (req->r_inode)
3129 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
9c1c2b35 3130 if (req->r_parent) {
719a2514
YZ
3131 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3132 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3133 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3134 spin_lock(&ci->i_ceph_lock);
3135 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3136 __ceph_touch_fmode(ci, mdsc, fmode);
3137 spin_unlock(&ci->i_ceph_lock);
9c1c2b35 3138 }
86bda539
JL
3139 if (req->r_old_dentry_dir)
3140 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3141 CEPH_CAP_PIN);
3142
891f3f5a
JL
3143 if (req->r_inode) {
3144 err = ceph_wait_on_async_create(req->r_inode);
3145 if (err) {
3146 dout("%s: wait for async create returned: %d\n",
3147 __func__, err);
3148 return err;
3149 }
3150 }
3151
3152 if (!err && req->r_old_inode) {
3153 err = ceph_wait_on_async_create(req->r_old_inode);
3154 if (err) {
3155 dout("%s: wait for async create returned: %d\n",
3156 __func__, err);
3157 return err;
3158 }
3159 }
3160
86bda539 3161 dout("submit_request on %p for inode %p\n", req, dir);
2f2dc053 3162 mutex_lock(&mdsc->mutex);
86bda539 3163 __register_request(mdsc, req, dir);
2f2dc053 3164 __do_request(mdsc, req);
86bda539 3165 err = req->r_err;
2f2dc053 3166 mutex_unlock(&mdsc->mutex);
86bda539 3167 return err;
2f2dc053
SW
3168}
3169
9eaa7b79
JL
3170int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3171 struct ceph_mds_request *req,
3172 ceph_mds_request_wait_callback_t wait_func)
2f2dc053
SW
3173{
3174 int err;
3175
e1518c7c 3176 /* wait */
e1518c7c 3177 dout("do_request waiting\n");
9eaa7b79
JL
3178 if (wait_func) {
3179 err = wait_func(mdsc, req);
e1518c7c 3180 } else {
5be73034
ID
3181 long timeleft = wait_for_completion_killable_timeout(
3182 &req->r_completion,
3183 ceph_timeout_jiffies(req->r_timeout));
3184 if (timeleft > 0)
3185 err = 0;
3186 else if (!timeleft)
8ccf7fcc 3187 err = -ETIMEDOUT; /* timed out */
5be73034
ID
3188 else
3189 err = timeleft; /* killed */
e1518c7c
SW
3190 }
3191 dout("do_request waited, got %d\n", err);
3192 mutex_lock(&mdsc->mutex);
5b1daecd 3193
e1518c7c 3194 /* only abort if we didn't race with a real reply */
bc2de10d 3195 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
e1518c7c
SW
3196 err = le32_to_cpu(req->r_reply_info.head->result);
3197 } else if (err < 0) {
3198 dout("aborted request %lld with %d\n", req->r_tid, err);
b4556396
SW
3199
3200 /*
3201 * ensure we aren't running concurrently with
3202 * ceph_fill_trace or ceph_readdir_prepopulate, which
3203 * rely on locks (dir mutex) held by our caller.
3204 */
3205 mutex_lock(&req->r_fill_mutex);
e1518c7c 3206 req->r_err = err;
bc2de10d 3207 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
b4556396 3208 mutex_unlock(&req->r_fill_mutex);
5b1daecd 3209
3dd69aab 3210 if (req->r_parent &&
167c9e35
SW
3211 (req->r_op & CEPH_MDS_OP_WRITE))
3212 ceph_invalidate_dir_request(req);
2f2dc053 3213 } else {
e1518c7c 3214 err = req->r_err;
2f2dc053 3215 }
2f2dc053 3216
e1518c7c 3217 mutex_unlock(&mdsc->mutex);
8340f22c
JL
3218 return err;
3219}
3220
3221/*
3222 * Synchrously perform an mds request. Take care of all of the
3223 * session setup, forwarding, retry details.
3224 */
3225int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3226 struct inode *dir,
3227 struct ceph_mds_request *req)
3228{
3229 int err;
3230
3231 dout("do_request on %p\n", req);
3232
3233 /* issue */
3234 err = ceph_mdsc_submit_request(mdsc, dir, req);
3235 if (!err)
9eaa7b79 3236 err = ceph_mdsc_wait_request(mdsc, req, NULL);
2f2dc053
SW
3237 dout("do_request %p done, result %d\n", req, err);
3238 return err;
3239}
3240
167c9e35 3241/*
2f276c51 3242 * Invalidate dir's completeness, dentry lease state on an aborted MDS
167c9e35
SW
3243 * namespace request.
3244 */
3245void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3246{
8d8f371c
YZ
3247 struct inode *dir = req->r_parent;
3248 struct inode *old_dir = req->r_old_dentry_dir;
167c9e35 3249
8d8f371c 3250 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
167c9e35 3251
8d8f371c
YZ
3252 ceph_dir_clear_complete(dir);
3253 if (old_dir)
3254 ceph_dir_clear_complete(old_dir);
167c9e35
SW
3255 if (req->r_dentry)
3256 ceph_invalidate_dentry_lease(req->r_dentry);
3257 if (req->r_old_dentry)
3258 ceph_invalidate_dentry_lease(req->r_old_dentry);
3259}
3260
2f2dc053
SW
3261/*
3262 * Handle mds reply.
3263 *
3264 * We take the session mutex and parse and process the reply immediately.
3265 * This preserves the logical ordering of replies, capabilities, etc., sent
3266 * by the MDS as they are applied to our local cache.
3267 */
3268static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3269{
3270 struct ceph_mds_client *mdsc = session->s_mdsc;
3271 struct ceph_mds_request *req;
3272 struct ceph_mds_reply_head *head = msg->front.iov_base;
3273 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
982d6011 3274 struct ceph_snap_realm *realm;
2f2dc053
SW
3275 u64 tid;
3276 int err, result;
2600d2dd 3277 int mds = session->s_mds;
a68e564a 3278 bool close_sessions = false;
2f2dc053 3279
2f2dc053
SW
3280 if (msg->front.iov_len < sizeof(*head)) {
3281 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
9ec7cab1 3282 ceph_msg_dump(msg);
2f2dc053
SW
3283 return;
3284 }
3285
3286 /* get request, session */
6df058c0 3287 tid = le64_to_cpu(msg->hdr.tid);
2f2dc053 3288 mutex_lock(&mdsc->mutex);
fcd00b68 3289 req = lookup_get_request(mdsc, tid);
2f2dc053
SW
3290 if (!req) {
3291 dout("handle_reply on unknown tid %llu\n", tid);
3292 mutex_unlock(&mdsc->mutex);
3293 return;
3294 }
3295 dout("handle_reply %p\n", req);
2f2dc053
SW
3296
3297 /* correct session? */
d96d6049 3298 if (req->r_session != session) {
2f2dc053
SW
3299 pr_err("mdsc_handle_reply got %llu on session mds%d"
3300 " not mds%d\n", tid, session->s_mds,
3301 req->r_session ? req->r_session->s_mds : -1);
3302 mutex_unlock(&mdsc->mutex);
3303 goto out;
3304 }
3305
3306 /* dup? */
bc2de10d
JL
3307 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3308 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
f3ae1b97 3309 pr_warn("got a dup %s reply on %llu from mds%d\n",
2f2dc053
SW
3310 head->safe ? "safe" : "unsafe", tid, mds);
3311 mutex_unlock(&mdsc->mutex);
3312 goto out;
3313 }
bc2de10d 3314 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
f3ae1b97 3315 pr_warn("got unsafe after safe on %llu from mds%d\n",
85792d0d
SW
3316 tid, mds);
3317 mutex_unlock(&mdsc->mutex);
3318 goto out;
3319 }
2f2dc053
SW
3320
3321 result = le32_to_cpu(head->result);
3322
2f2dc053 3323 if (head->safe) {
bc2de10d 3324 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2f2dc053 3325 __unregister_request(mdsc, req);
2f2dc053 3326
07edc057
XL
3327 /* last request during umount? */
3328 if (mdsc->stopping && !__get_oldest_req(mdsc))
3329 complete_all(&mdsc->safe_umount_waiters);
3330
bc2de10d 3331 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2f2dc053
SW
3332 /*
3333 * We already handled the unsafe response, now do the
3334 * cleanup. No need to examine the response; the MDS
3335 * doesn't include any result info in the safe
3336 * response. And even if it did, there is nothing
3337 * useful we could do with a revised return value.
3338 */
3339 dout("got safe reply %llu, mds%d\n", tid, mds);
2f2dc053 3340
2f2dc053
SW
3341 mutex_unlock(&mdsc->mutex);
3342 goto out;
3343 }
e1518c7c 3344 } else {
bc2de10d 3345 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2f2dc053
SW
3346 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3347 }
3348
3349 dout("handle_reply tid %lld result %d\n", tid, result);
3350 rinfo = &req->r_reply_info;
b37fe1f9 3351 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
d4846487 3352 err = parse_reply_info(session, msg, rinfo, (u64)-1);
b37fe1f9 3353 else
d4846487 3354 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
2f2dc053
SW
3355 mutex_unlock(&mdsc->mutex);
3356
bca9fc14
JL
3357 /* Must find target inode outside of mutexes to avoid deadlocks */
3358 if ((err >= 0) && rinfo->head->is_target) {
3359 struct inode *in;
3360 struct ceph_vino tvino = {
3361 .ino = le64_to_cpu(rinfo->targeti.in->ino),
3362 .snap = le64_to_cpu(rinfo->targeti.in->snapid)
3363 };
3364
3365 in = ceph_get_inode(mdsc->fsc->sb, tvino);
3366 if (IS_ERR(in)) {
3367 err = PTR_ERR(in);
3368 mutex_lock(&session->s_mutex);
3369 goto out_err;
3370 }
3371 req->r_target_inode = in;
3372 }
3373
2f2dc053
SW
3374 mutex_lock(&session->s_mutex);
3375 if (err < 0) {
25933abd 3376 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
9ec7cab1 3377 ceph_msg_dump(msg);
2f2dc053
SW
3378 goto out_err;
3379 }
3380
3381 /* snap trace */
982d6011 3382 realm = NULL;
2f2dc053
SW
3383 if (rinfo->snapblob_len) {
3384 down_write(&mdsc->snap_rwsem);
a68e564a 3385 err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
982d6011
YZ
3386 rinfo->snapblob + rinfo->snapblob_len,
3387 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3388 &realm);
a68e564a
XL
3389 if (err) {
3390 up_write(&mdsc->snap_rwsem);
3391 close_sessions = true;
3392 if (err == -EIO)
3393 ceph_msg_dump(msg);
3394 goto out_err;
3395 }
2f2dc053
SW
3396 downgrade_write(&mdsc->snap_rwsem);
3397 } else {
3398 down_read(&mdsc->snap_rwsem);
3399 }
3400
3401 /* insert trace into our cache */
b4556396 3402 mutex_lock(&req->r_fill_mutex);
315f2408 3403 current->journal_info = req;
f5a03b08 3404 err = ceph_fill_trace(mdsc->fsc->sb, req);
2f2dc053 3405 if (err == 0) {
6e8575fa 3406 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
81c6aea5 3407 req->r_op == CEPH_MDS_OP_LSSNAP))
2f2dc053 3408 ceph_readdir_prepopulate(req, req->r_session);
2f2dc053 3409 }
315f2408 3410 current->journal_info = NULL;
b4556396 3411 mutex_unlock(&req->r_fill_mutex);
2f2dc053
SW
3412
3413 up_read(&mdsc->snap_rwsem);
982d6011
YZ
3414 if (realm)
3415 ceph_put_snap_realm(mdsc, realm);
68cd5b4b 3416
fe33032d
YZ
3417 if (err == 0) {
3418 if (req->r_target_inode &&
3419 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3420 struct ceph_inode_info *ci =
3421 ceph_inode(req->r_target_inode);
3422 spin_lock(&ci->i_unsafe_lock);
3423 list_add_tail(&req->r_unsafe_target_item,
3424 &ci->i_unsafe_iops);
3425 spin_unlock(&ci->i_unsafe_lock);
3426 }
3427
3428 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
68cd5b4b 3429 }
2f2dc053 3430out_err:
e1518c7c 3431 mutex_lock(&mdsc->mutex);
bc2de10d 3432 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
e1518c7c
SW
3433 if (err) {
3434 req->r_err = err;
3435 } else {
5fdb1389 3436 req->r_reply = ceph_msg_get(msg);
bc2de10d 3437 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
e1518c7c 3438 }
2f2dc053 3439 } else {
e1518c7c 3440 dout("reply arrived after request %lld was aborted\n", tid);
2f2dc053 3441 }
e1518c7c 3442 mutex_unlock(&mdsc->mutex);
2f2dc053 3443
2f2dc053
SW
3444 mutex_unlock(&session->s_mutex);
3445
3446 /* kick calling process */
3447 complete_request(mdsc, req);
70c94820 3448
8ae99ae2 3449 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
70c94820 3450 req->r_end_latency, err);
2f2dc053
SW
3451out:
3452 ceph_mdsc_put_request(req);
a68e564a
XL
3453
3454 /* Defer closing the sessions after s_mutex lock being released */
3455 if (close_sessions)
3456 ceph_mdsc_close_sessions(mdsc);
2f2dc053
SW
3457 return;
3458}
3459
3460
3461
3462/*
3463 * handle mds notification that our request has been forwarded.
3464 */
2600d2dd
SW
3465static void handle_forward(struct ceph_mds_client *mdsc,
3466 struct ceph_mds_session *session,
3467 struct ceph_msg *msg)
2f2dc053
SW
3468{
3469 struct ceph_mds_request *req;
a1ea787c 3470 u64 tid = le64_to_cpu(msg->hdr.tid);
2f2dc053
SW
3471 u32 next_mds;
3472 u32 fwd_seq;
2f2dc053
SW
3473 int err = -EINVAL;
3474 void *p = msg->front.iov_base;
3475 void *end = p + msg->front.iov_len;
1980b1bf 3476 bool aborted = false;
2f2dc053 3477
a1ea787c 3478 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
3479 next_mds = ceph_decode_32(&p);
3480 fwd_seq = ceph_decode_32(&p);
2f2dc053
SW
3481
3482 mutex_lock(&mdsc->mutex);
fcd00b68 3483 req = lookup_get_request(mdsc, tid);
2f2dc053 3484 if (!req) {
1980b1bf 3485 mutex_unlock(&mdsc->mutex);
2a8e5e36 3486 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
1980b1bf 3487 return; /* dup reply? */
2f2dc053
SW
3488 }
3489
bc2de10d 3490 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2a8e5e36
SW
3491 dout("forward tid %llu aborted, unregistering\n", tid);
3492 __unregister_request(mdsc, req);
3493 } else if (fwd_seq <= req->r_num_fwd) {
1980b1bf
XL
3494 /*
3495 * The type of 'num_fwd' in ceph 'MClientRequestForward'
3496 * is 'int32_t', while in 'ceph_mds_request_head' the
3497 * type is '__u8'. So in case the request bounces between
3498 * MDSes exceeding 256 times, the client will get stuck.
3499 *
3500 * In this case it's ususally a bug in MDS and continue
3501 * bouncing the request makes no sense.
3502 *
3503 * In future this could be fixed in ceph code, so avoid
3504 * using the hardcode here.
3505 */
3506 int max = sizeof_field(struct ceph_mds_request_head, num_fwd);
3507 max = 1 << (max * BITS_PER_BYTE);
3508 if (req->r_num_fwd >= max) {
3509 mutex_lock(&req->r_fill_mutex);
3510 req->r_err = -EMULTIHOP;
3511 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3512 mutex_unlock(&req->r_fill_mutex);
3513 aborted = true;
3514 pr_warn_ratelimited("forward tid %llu seq overflow\n",
3515 tid);
3516 } else {
3517 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3518 tid, next_mds, req->r_num_fwd, fwd_seq);
3519 }
2f2dc053
SW
3520 } else {
3521 /* resend. forward race not possible; mds would drop */
2a8e5e36
SW
3522 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3523 BUG_ON(req->r_err);
bc2de10d 3524 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3de22be6 3525 req->r_attempts = 0;
2f2dc053
SW
3526 req->r_num_fwd = fwd_seq;
3527 req->r_resend_mds = next_mds;
3528 put_request_session(req);
3529 __do_request(mdsc, req);
3530 }
2f2dc053 3531 mutex_unlock(&mdsc->mutex);
1980b1bf
XL
3532
3533 /* kick calling process */
3534 if (aborted)
3535 complete_request(mdsc, req);
3536 ceph_mdsc_put_request(req);
2f2dc053
SW
3537 return;
3538
3539bad:
3540 pr_err("mdsc_handle_forward decode error err=%d\n", err);
3541}
3542
131d7eb4 3543static int __decode_session_metadata(void **p, void *end,
0b98acd6 3544 bool *blocklisted)
84bf3950
YZ
3545{
3546 /* map<string,string> */
3547 u32 n;
131d7eb4 3548 bool err_str;
84bf3950
YZ
3549 ceph_decode_32_safe(p, end, n, bad);
3550 while (n-- > 0) {
3551 u32 len;
3552 ceph_decode_32_safe(p, end, len, bad);
3553 ceph_decode_need(p, end, len, bad);
131d7eb4 3554 err_str = !strncmp(*p, "error_string", len);
84bf3950
YZ
3555 *p += len;
3556 ceph_decode_32_safe(p, end, len, bad);
3557 ceph_decode_need(p, end, len, bad);
4bb926e8
ID
3558 /*
3559 * Match "blocklisted (blacklisted)" from newer MDSes,
3560 * or "blacklisted" from older MDSes.
3561 */
131d7eb4 3562 if (err_str && strnstr(*p, "blacklisted", len))
0b98acd6 3563 *blocklisted = true;
84bf3950
YZ
3564 *p += len;
3565 }
3566 return 0;
3567bad:
3568 return -1;
3569}
3570
2f2dc053
SW
3571/*
3572 * handle a mds session control message
3573 */
3574static void handle_session(struct ceph_mds_session *session,
3575 struct ceph_msg *msg)
3576{
3577 struct ceph_mds_client *mdsc = session->s_mdsc;
84bf3950
YZ
3578 int mds = session->s_mds;
3579 int msg_version = le16_to_cpu(msg->hdr.version);
3580 void *p = msg->front.iov_base;
3581 void *end = p + msg->front.iov_len;
3582 struct ceph_mds_session_head *h;
2f2dc053 3583 u32 op;
0fa82633 3584 u64 seq, features = 0;
2f2dc053 3585 int wake = 0;
0b98acd6 3586 bool blocklisted = false;
2f2dc053 3587
2f2dc053 3588 /* decode */
84bf3950
YZ
3589 ceph_decode_need(&p, end, sizeof(*h), bad);
3590 h = p;
3591 p += sizeof(*h);
3592
2f2dc053
SW
3593 op = le32_to_cpu(h->op);
3594 seq = le64_to_cpu(h->seq);
3595
84bf3950
YZ
3596 if (msg_version >= 3) {
3597 u32 len;
e1c9788c
KH
3598 /* version >= 2 and < 5, decode metadata, skip otherwise
3599 * as it's handled via flags.
3600 */
3601 if (msg_version >= 5)
3602 ceph_decode_skip_map(&p, end, string, string, bad);
3603 else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
84bf3950 3604 goto bad;
e1c9788c 3605
84bf3950
YZ
3606 /* version >= 3, feature bits */
3607 ceph_decode_32_safe(&p, end, len, bad);
02e37571
JL
3608 if (len) {
3609 ceph_decode_64_safe(&p, end, features, bad);
3610 p += len - sizeof(features);
3611 }
84bf3950
YZ
3612 }
3613
e1c9788c 3614 if (msg_version >= 5) {
ea16567f
LH
3615 u32 flags, len;
3616
3617 /* version >= 4 */
3618 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
3619 ceph_decode_32_safe(&p, end, len, bad); /* len */
3620 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
3621
e1c9788c 3622 /* version >= 5, flags */
ea16567f 3623 ceph_decode_32_safe(&p, end, flags, bad);
e1c9788c 3624 if (flags & CEPH_SESSION_BLOCKLISTED) {
ea16567f 3625 pr_warn("mds%d session blocklisted\n", session->s_mds);
e1c9788c
KH
3626 blocklisted = true;
3627 }
3628 }
3629
2f2dc053 3630 mutex_lock(&mdsc->mutex);
0a07fc8c 3631 if (op == CEPH_SESSION_CLOSE) {
5b3248c6 3632 ceph_get_mds_session(session);
2600d2dd 3633 __unregister_session(mdsc, session);
0a07fc8c 3634 }
2f2dc053
SW
3635 /* FIXME: this ttl calculation is generous */
3636 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3637 mutex_unlock(&mdsc->mutex);
3638
3639 mutex_lock(&session->s_mutex);
3640
3641 dout("handle_session mds%d %s %p state %s seq %llu\n",
3642 mds, ceph_session_op_name(op), session,
a687ecaf 3643 ceph_session_state_name(session->s_state), seq);
2f2dc053
SW
3644
3645 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3646 session->s_state = CEPH_MDS_SESSION_OPEN;
3647 pr_info("mds%d came back\n", session->s_mds);
3648 }
3649
3650 switch (op) {
3651 case CEPH_SESSION_OPEN:
29790f26
SW
3652 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3653 pr_info("mds%d reconnect success\n", session->s_mds);
300e42a2
XL
3654
3655 if (session->s_state == CEPH_MDS_SESSION_OPEN) {
3656 pr_notice("mds%d is already opened\n", session->s_mds);
3657 } else {
3658 session->s_state = CEPH_MDS_SESSION_OPEN;
3659 session->s_features = features;
3660 renewed_caps(mdsc, session, 0);
3661 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
3662 &session->s_features))
3663 metric_schedule_delayed(&mdsc->metric);
3664 }
3665
3666 /*
3667 * The connection maybe broken and the session in client
3668 * side has been reinitialized, need to update the seq
3669 * anyway.
3670 */
3671 if (!session->s_seq && seq)
3672 session->s_seq = seq;
3673
2f2dc053
SW
3674 wake = 1;
3675 if (mdsc->stopping)
3676 __close_session(mdsc, session);
3677 break;
3678
3679 case CEPH_SESSION_RENEWCAPS:
3680 if (session->s_renew_seq == seq)
3681 renewed_caps(mdsc, session, 1);
3682 break;
3683
3684 case CEPH_SESSION_CLOSE:
29790f26
SW
3685 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3686 pr_info("mds%d reconnect denied\n", session->s_mds);
4d681c2f 3687 session->s_state = CEPH_MDS_SESSION_CLOSED;
1c841a96 3688 cleanup_session_requests(mdsc, session);
2f2dc053 3689 remove_session_caps(session);
656e4382 3690 wake = 2; /* for good measure */
f3c60c59 3691 wake_up_all(&mdsc->session_close_wq);
2f2dc053
SW
3692 break;
3693
3694 case CEPH_SESSION_STALE:
3695 pr_info("mds%d caps went stale, renewing\n",
3696 session->s_mds);
52d60f8e 3697 atomic_inc(&session->s_cap_gen);
1ce208a6 3698 session->s_cap_ttl = jiffies - 1;
2f2dc053
SW
3699 send_renew_caps(mdsc, session);
3700 break;
3701
3702 case CEPH_SESSION_RECALL_STATE:
e30ee581 3703 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2f2dc053
SW
3704 break;
3705
186e4f7a 3706 case CEPH_SESSION_FLUSHMSG:
e7d84c6a
XL
3707 /* flush cap releases */
3708 spin_lock(&session->s_cap_lock);
3709 if (session->s_num_cap_releases)
3710 ceph_flush_cap_releases(mdsc, session);
3711 spin_unlock(&session->s_cap_lock);
3712
186e4f7a
YZ
3713 send_flushmsg_ack(mdsc, session, seq);
3714 break;
3715
03f4fcb0
YZ
3716 case CEPH_SESSION_FORCE_RO:
3717 dout("force_session_readonly %p\n", session);
3718 spin_lock(&session->s_cap_lock);
3719 session->s_readonly = true;
3720 spin_unlock(&session->s_cap_lock);
d2f8bb27 3721 wake_up_session_caps(session, FORCE_RO);
03f4fcb0
YZ
3722 break;
3723
fcff415c
YZ
3724 case CEPH_SESSION_REJECT:
3725 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3726 pr_info("mds%d rejected session\n", session->s_mds);
3727 session->s_state = CEPH_MDS_SESSION_REJECTED;
3728 cleanup_session_requests(mdsc, session);
3729 remove_session_caps(session);
0b98acd6
ID
3730 if (blocklisted)
3731 mdsc->fsc->blocklisted = true;
fcff415c
YZ
3732 wake = 2; /* for good measure */
3733 break;
3734
2f2dc053
SW
3735 default:
3736 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3737 WARN_ON(1);
3738 }
3739
3740 mutex_unlock(&session->s_mutex);
3741 if (wake) {
3742 mutex_lock(&mdsc->mutex);
3743 __wake_requests(mdsc, &session->s_waiting);
656e4382
YZ
3744 if (wake == 2)
3745 kick_requests(mdsc, mds);
2f2dc053
SW
3746 mutex_unlock(&mdsc->mutex);
3747 }
0a07fc8c
YZ
3748 if (op == CEPH_SESSION_CLOSE)
3749 ceph_put_mds_session(session);
2f2dc053
SW
3750 return;
3751
3752bad:
3753 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3754 (int)msg->front.iov_len);
9ec7cab1 3755 ceph_msg_dump(msg);
2f2dc053
SW
3756 return;
3757}
3758
a25949b9
JL
3759void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3760{
3761 int dcaps;
3762
3763 dcaps = xchg(&req->r_dir_caps, 0);
3764 if (dcaps) {
3765 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3766 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3767 }
3768}
3769
e64f44a8
XL
3770void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3771{
3772 int dcaps;
3773
3774 dcaps = xchg(&req->r_dir_caps, 0);
3775 if (dcaps) {
3776 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3777 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3778 dcaps);
3779 }
3780}
3781
2f2dc053
SW
3782/*
3783 * called under session->mutex.
3784 */
3785static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3786 struct ceph_mds_session *session)
3787{
3788 struct ceph_mds_request *req, *nreq;
3de22be6 3789 struct rb_node *p;
2f2dc053
SW
3790
3791 dout("replay_unsafe_requests mds%d\n", session->s_mds);
3792
3793 mutex_lock(&mdsc->mutex);
9cf54563 3794 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
396bd62c 3795 __send_request(session, req, true);
3de22be6
YZ
3796
3797 /*
3798 * also re-send old requests when MDS enters reconnect stage. So that MDS
3799 * can process completed request in clientreplay stage.
3800 */
3801 p = rb_first(&mdsc->request_tree);
3802 while (p) {
3803 req = rb_entry(p, struct ceph_mds_request, r_node);
3804 p = rb_next(p);
bc2de10d 3805 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3de22be6
YZ
3806 continue;
3807 if (req->r_attempts == 0)
3808 continue; /* only old requests */
a25949b9
JL
3809 if (!req->r_session)
3810 continue;
3811 if (req->r_session->s_mds != session->s_mds)
3812 continue;
3813
e64f44a8 3814 ceph_mdsc_release_dir_caps_no_check(req);
a25949b9 3815
396bd62c 3816 __send_request(session, req, true);
3de22be6 3817 }
2f2dc053
SW
3818 mutex_unlock(&mdsc->mutex);
3819}
3820
81c5a148
YZ
3821static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3822{
3823 struct ceph_msg *reply;
3824 struct ceph_pagelist *_pagelist;
3825 struct page *page;
3826 __le32 *addr;
3827 int err = -ENOMEM;
3828
3829 if (!recon_state->allow_multi)
3830 return -ENOSPC;
3831
3832 /* can't handle message that contains both caps and realm */
3833 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3834
3835 /* pre-allocate new pagelist */
3836 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3837 if (!_pagelist)
3838 return -ENOMEM;
3839
3840 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3841 if (!reply)
3842 goto fail_msg;
3843
3844 /* placeholder for nr_caps */
3845 err = ceph_pagelist_encode_32(_pagelist, 0);
3846 if (err < 0)
3847 goto fail;
3848
3849 if (recon_state->nr_caps) {
3850 /* currently encoding caps */
3851 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3852 if (err)
3853 goto fail;
3854 } else {
3855 /* placeholder for nr_realms (currently encoding relams) */
3856 err = ceph_pagelist_encode_32(_pagelist, 0);
3857 if (err < 0)
3858 goto fail;
3859 }
3860
3861 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3862 if (err)
3863 goto fail;
3864
3865 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3866 addr = kmap_atomic(page);
3867 if (recon_state->nr_caps) {
3868 /* currently encoding caps */
3869 *addr = cpu_to_le32(recon_state->nr_caps);
3870 } else {
3871 /* currently encoding relams */
3872 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3873 }
3874 kunmap_atomic(addr);
3875
3876 reply->hdr.version = cpu_to_le16(5);
3877 reply->hdr.compat_version = cpu_to_le16(4);
3878
3879 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3880 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3881
3882 ceph_con_send(&recon_state->session->s_con, reply);
3883 ceph_pagelist_release(recon_state->pagelist);
3884
3885 recon_state->pagelist = _pagelist;
3886 recon_state->nr_caps = 0;
3887 recon_state->nr_realms = 0;
3888 recon_state->msg_version = 5;
3889 return 0;
3890fail:
3891 ceph_msg_put(reply);
3892fail_msg:
3893 ceph_pagelist_release(_pagelist);
3894 return err;
3895}
3896
a33f6432
YZ
3897static struct dentry* d_find_primary(struct inode *inode)
3898{
3899 struct dentry *alias, *dn = NULL;
3900
3901 if (hlist_empty(&inode->i_dentry))
3902 return NULL;
3903
3904 spin_lock(&inode->i_lock);
3905 if (hlist_empty(&inode->i_dentry))
3906 goto out_unlock;
3907
3908 if (S_ISDIR(inode->i_mode)) {
3909 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3910 if (!IS_ROOT(alias))
3911 dn = dget(alias);
3912 goto out_unlock;
3913 }
3914
3915 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3916 spin_lock(&alias->d_lock);
3917 if (!d_unhashed(alias) &&
3918 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3919 dn = dget_dlock(alias);
3920 }
3921 spin_unlock(&alias->d_lock);
3922 if (dn)
3923 break;
3924 }
3925out_unlock:
3926 spin_unlock(&inode->i_lock);
3927 return dn;
3928}
3929
2f2dc053
SW
3930/*
3931 * Encode information about a cap for a reconnect with the MDS.
3932 */
aaf67de7 3933static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
2f2dc053 3934{
20cb34ae
SW
3935 union {
3936 struct ceph_mds_cap_reconnect v2;
3937 struct ceph_mds_cap_reconnect_v1 v1;
3938 } rec;
aaf67de7 3939 struct ceph_inode_info *ci = ceph_inode(inode);
20cb34ae
SW
3940 struct ceph_reconnect_state *recon_state = arg;
3941 struct ceph_pagelist *pagelist = recon_state->pagelist;
a33f6432 3942 struct dentry *dentry;
aaf67de7 3943 struct ceph_cap *cap;
a33f6432 3944 char *path;
aaf67de7 3945 int pathlen = 0, err = 0;
a33f6432 3946 u64 pathbase;
3469ed0d 3947 u64 snap_follows;
2f2dc053 3948
a33f6432
YZ
3949 dentry = d_find_primary(inode);
3950 if (dentry) {
3951 /* set pathbase to parent dir when msg_version >= 2 */
3952 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3953 recon_state->msg_version >= 2);
3954 dput(dentry);
3955 if (IS_ERR(path)) {
3956 err = PTR_ERR(path);
3957 goto out_err;
3958 }
3959 } else {
3960 path = NULL;
a33f6432
YZ
3961 pathbase = 0;
3962 }
3963
be655596 3964 spin_lock(&ci->i_ceph_lock);
aaf67de7
XL
3965 cap = __get_cap_for_mds(ci, mds);
3966 if (!cap) {
3967 spin_unlock(&ci->i_ceph_lock);
3968 goto out_err;
3969 }
3970 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3971 inode, ceph_vinop(inode), cap, cap->cap_id,
3972 ceph_cap_string(cap->issued));
3973
2f2dc053
SW
3974 cap->seq = 0; /* reset cap seq */
3975 cap->issue_seq = 0; /* and issue_seq */
667ca05c 3976 cap->mseq = 0; /* and migrate_seq */
52d60f8e 3977 cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
20cb34ae 3978
a25949b9 3979 /* These are lost when the session goes away */
785892fe
JL
3980 if (S_ISDIR(inode->i_mode)) {
3981 if (cap->issued & CEPH_CAP_DIR_CREATE) {
3982 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3983 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3984 }
a25949b9 3985 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
785892fe 3986 }
a25949b9 3987
121f22a1 3988 if (recon_state->msg_version >= 2) {
20cb34ae
SW
3989 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3990 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3991 rec.v2.issued = cpu_to_le32(cap->issued);
3992 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
a33f6432 3993 rec.v2.pathbase = cpu_to_le64(pathbase);
ec1dff25
JL
3994 rec.v2.flock_len = (__force __le32)
3995 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
20cb34ae
SW
3996 } else {
3997 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3998 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3999 rec.v1.issued = cpu_to_le32(cap->issued);
2d6795fb 4000 rec.v1.size = cpu_to_le64(i_size_read(inode));
9bbeab41
AB
4001 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
4002 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
20cb34ae 4003 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
a33f6432 4004 rec.v1.pathbase = cpu_to_le64(pathbase);
20cb34ae 4005 }
3469ed0d
YZ
4006
4007 if (list_empty(&ci->i_cap_snaps)) {
92776fd2 4008 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3469ed0d
YZ
4009 } else {
4010 struct ceph_cap_snap *capsnap =
4011 list_first_entry(&ci->i_cap_snaps,
4012 struct ceph_cap_snap, ci_item);
4013 snap_follows = capsnap->follows;
20cb34ae 4014 }
be655596 4015 spin_unlock(&ci->i_ceph_lock);
2f2dc053 4016
121f22a1 4017 if (recon_state->msg_version >= 2) {
40819f6f 4018 int num_fcntl_locks, num_flock_locks;
4deb14a2 4019 struct ceph_filelock *flocks = NULL;
81c5a148 4020 size_t struct_len, total_len = sizeof(u64);
121f22a1 4021 u8 struct_v = 0;
39be95e9
JS
4022
4023encode_again:
b3f8d68f
YZ
4024 if (rec.v2.flock_len) {
4025 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4026 } else {
4027 num_fcntl_locks = 0;
4028 num_flock_locks = 0;
4029 }
4deb14a2 4030 if (num_fcntl_locks + num_flock_locks > 0) {
6da2ec56
KC
4031 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4032 sizeof(struct ceph_filelock),
4033 GFP_NOFS);
4deb14a2
YZ
4034 if (!flocks) {
4035 err = -ENOMEM;
5ccedf1c 4036 goto out_err;
4deb14a2
YZ
4037 }
4038 err = ceph_encode_locks_to_buffer(inode, flocks,
4039 num_fcntl_locks,
4040 num_flock_locks);
4041 if (err) {
4042 kfree(flocks);
4043 flocks = NULL;
4044 if (err == -ENOSPC)
4045 goto encode_again;
5ccedf1c 4046 goto out_err;
4deb14a2
YZ
4047 }
4048 } else {
39be95e9 4049 kfree(flocks);
4deb14a2 4050 flocks = NULL;
39be95e9 4051 }
121f22a1
YZ
4052
4053 if (recon_state->msg_version >= 3) {
4054 /* version, compat_version and struct_len */
81c5a148 4055 total_len += 2 * sizeof(u8) + sizeof(u32);
3469ed0d 4056 struct_v = 2;
121f22a1 4057 }
39be95e9
JS
4058 /*
4059 * number of encoded locks is stable, so copy to pagelist
4060 */
121f22a1
YZ
4061 struct_len = 2 * sizeof(u32) +
4062 (num_fcntl_locks + num_flock_locks) *
4063 sizeof(struct ceph_filelock);
4064 rec.v2.flock_len = cpu_to_le32(struct_len);
4065
a33f6432 4066 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
121f22a1 4067
3469ed0d
YZ
4068 if (struct_v >= 2)
4069 struct_len += sizeof(u64); /* snap_follows */
4070
121f22a1 4071 total_len += struct_len;
81c5a148
YZ
4072
4073 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4074 err = send_reconnect_partial(recon_state);
4075 if (err)
4076 goto out_freeflocks;
4077 pagelist = recon_state->pagelist;
5ccedf1c 4078 }
121f22a1 4079
81c5a148
YZ
4080 err = ceph_pagelist_reserve(pagelist, total_len);
4081 if (err)
4082 goto out_freeflocks;
4083
4084 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
5ccedf1c
YZ
4085 if (recon_state->msg_version >= 3) {
4086 ceph_pagelist_encode_8(pagelist, struct_v);
4087 ceph_pagelist_encode_8(pagelist, 1);
4088 ceph_pagelist_encode_32(pagelist, struct_len);
121f22a1 4089 }
a33f6432 4090 ceph_pagelist_encode_string(pagelist, path, pathlen);
5ccedf1c
YZ
4091 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4092 ceph_locks_to_pagelist(flocks, pagelist,
4093 num_fcntl_locks, num_flock_locks);
4094 if (struct_v >= 2)
4095 ceph_pagelist_encode_64(pagelist, snap_follows);
81c5a148 4096out_freeflocks:
39be95e9 4097 kfree(flocks);
3612abbd 4098 } else {
5ccedf1c 4099 err = ceph_pagelist_reserve(pagelist,
81c5a148
YZ
4100 sizeof(u64) + sizeof(u32) +
4101 pathlen + sizeof(rec.v1));
a33f6432
YZ
4102 if (err)
4103 goto out_err;
5ccedf1c 4104
81c5a148 4105 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
5ccedf1c
YZ
4106 ceph_pagelist_encode_string(pagelist, path, pathlen);
4107 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
40819f6f 4108 }
44c99757 4109
5ccedf1c 4110out_err:
a33f6432
YZ
4111 ceph_mdsc_free_path(path, pathlen);
4112 if (!err)
81c5a148
YZ
4113 recon_state->nr_caps++;
4114 return err;
4115}
4116
4117static int encode_snap_realms(struct ceph_mds_client *mdsc,
4118 struct ceph_reconnect_state *recon_state)
4119{
4120 struct rb_node *p;
4121 struct ceph_pagelist *pagelist = recon_state->pagelist;
4122 int err = 0;
4123
4124 if (recon_state->msg_version >= 4) {
4125 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4126 if (err < 0)
4127 goto fail;
4128 }
4129
4130 /*
4131 * snaprealms. we provide mds with the ino, seq (version), and
4132 * parent for all of our realms. If the mds has any newer info,
4133 * it will tell us.
4134 */
4135 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4136 struct ceph_snap_realm *realm =
4137 rb_entry(p, struct ceph_snap_realm, node);
4138 struct ceph_mds_snaprealm_reconnect sr_rec;
4139
4140 if (recon_state->msg_version >= 4) {
4141 size_t need = sizeof(u8) * 2 + sizeof(u32) +
4142 sizeof(sr_rec);
4143
4144 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4145 err = send_reconnect_partial(recon_state);
4146 if (err)
4147 goto fail;
4148 pagelist = recon_state->pagelist;
4149 }
4150
4151 err = ceph_pagelist_reserve(pagelist, need);
4152 if (err)
4153 goto fail;
4154
4155 ceph_pagelist_encode_8(pagelist, 1);
4156 ceph_pagelist_encode_8(pagelist, 1);
4157 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4158 }
4159
4160 dout(" adding snap realm %llx seq %lld parent %llx\n",
4161 realm->ino, realm->seq, realm->parent_ino);
4162 sr_rec.ino = cpu_to_le64(realm->ino);
4163 sr_rec.seq = cpu_to_le64(realm->seq);
4164 sr_rec.parent = cpu_to_le64(realm->parent_ino);
4165
4166 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4167 if (err)
4168 goto fail;
4169
4170 recon_state->nr_realms++;
4171 }
4172fail:
93cea5be 4173 return err;
2f2dc053
SW
4174}
4175
4176
4177/*
4178 * If an MDS fails and recovers, clients need to reconnect in order to
4179 * reestablish shared state. This includes all caps issued through
4180 * this session _and_ the snap_realm hierarchy. Because it's not
4181 * clear which snap realms the mds cares about, we send everything we
4182 * know about.. that ensures we'll then get any new info the
4183 * recovering MDS might have.
4184 *
4185 * This is a relatively heavyweight operation, but it's rare.
2f2dc053 4186 */
34b6c855
SW
4187static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4188 struct ceph_mds_session *session)
2f2dc053 4189{
2f2dc053 4190 struct ceph_msg *reply;
34b6c855 4191 int mds = session->s_mds;
9abf82b8 4192 int err = -ENOMEM;
81c5a148
YZ
4193 struct ceph_reconnect_state recon_state = {
4194 .session = session,
4195 };
c8a96a31 4196 LIST_HEAD(dispose);
2f2dc053 4197
34b6c855 4198 pr_info("mds%d reconnect start\n", mds);
2f2dc053 4199
81c5a148
YZ
4200 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4201 if (!recon_state.pagelist)
93cea5be 4202 goto fail_nopagelist;
93cea5be 4203
0d9c1ab3 4204 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
a79832f2 4205 if (!reply)
93cea5be 4206 goto fail_nomsg;
93cea5be 4207
d4846487
JL
4208 xa_destroy(&session->s_delegated_inos);
4209
34b6c855
SW
4210 mutex_lock(&session->s_mutex);
4211 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4212 session->s_seq = 0;
2f2dc053 4213
2f2dc053 4214 dout("session %p state %s\n", session,
a687ecaf 4215 ceph_session_state_name(session->s_state));
2f2dc053 4216
52d60f8e 4217 atomic_inc(&session->s_cap_gen);
99a9c273
YZ
4218
4219 spin_lock(&session->s_cap_lock);
03f4fcb0
YZ
4220 /* don't know if session is readonly */
4221 session->s_readonly = 0;
99a9c273
YZ
4222 /*
4223 * notify __ceph_remove_cap() that we are composing cap reconnect.
4224 * If a cap get released before being added to the cap reconnect,
4225 * __ceph_remove_cap() should skip queuing cap release.
4226 */
4227 session->s_cap_reconnect = 1;
e01a5946 4228 /* drop old cap expires; we're about to reestablish that state */
c8a96a31
JL
4229 detach_cap_releases(session, &dispose);
4230 spin_unlock(&session->s_cap_lock);
4231 dispose_cap_releases(mdsc, &dispose);
e01a5946 4232
5d23371f 4233 /* trim unused caps to reduce MDS's cache rejoin time */
c0bd50e2
YZ
4234 if (mdsc->fsc->sb->s_root)
4235 shrink_dcache_parent(mdsc->fsc->sb->s_root);
5d23371f
YZ
4236
4237 ceph_con_close(&session->s_con);
4238 ceph_con_open(&session->s_con,
4239 CEPH_ENTITY_TYPE_MDS, mds,
4240 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4241
4242 /* replay unsafe requests */
4243 replay_unsafe_requests(mdsc, session);
4244
81c5a148
YZ
4245 ceph_early_kick_flushing_caps(mdsc, session);
4246
5d23371f
YZ
4247 down_read(&mdsc->snap_rwsem);
4248
81c5a148
YZ
4249 /* placeholder for nr_caps */
4250 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
93cea5be
SW
4251 if (err)
4252 goto fail;
20cb34ae 4253
81c5a148 4254 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
121f22a1 4255 recon_state.msg_version = 3;
81c5a148
YZ
4256 recon_state.allow_multi = true;
4257 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4258 recon_state.msg_version = 3;
4259 } else {
23c625ce 4260 recon_state.msg_version = 2;
81c5a148
YZ
4261 }
4262 /* trsaverse this session's caps */
a25949b9 4263 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
2f2dc053 4264
99a9c273
YZ
4265 spin_lock(&session->s_cap_lock);
4266 session->s_cap_reconnect = 0;
4267 spin_unlock(&session->s_cap_lock);
4268
81c5a148
YZ
4269 if (err < 0)
4270 goto fail;
2f2dc053 4271
81c5a148
YZ
4272 /* check if all realms can be encoded into current message */
4273 if (mdsc->num_snap_realms) {
4274 size_t total_len =
4275 recon_state.pagelist->length +
4276 mdsc->num_snap_realms *
4277 sizeof(struct ceph_mds_snaprealm_reconnect);
4278 if (recon_state.msg_version >= 4) {
4279 /* number of realms */
4280 total_len += sizeof(u32);
4281 /* version, compat_version and struct_len */
4282 total_len += mdsc->num_snap_realms *
4283 (2 * sizeof(u8) + sizeof(u32));
4284 }
4285 if (total_len > RECONNECT_MAX_SIZE) {
4286 if (!recon_state.allow_multi) {
4287 err = -ENOSPC;
4288 goto fail;
4289 }
4290 if (recon_state.nr_caps) {
4291 err = send_reconnect_partial(&recon_state);
4292 if (err)
4293 goto fail;
4294 }
4295 recon_state.msg_version = 5;
4296 }
2f2dc053 4297 }
2f2dc053 4298
81c5a148
YZ
4299 err = encode_snap_realms(mdsc, &recon_state);
4300 if (err < 0)
4301 goto fail;
4302
4303 if (recon_state.msg_version >= 5) {
4304 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4305 if (err < 0)
4306 goto fail;
4307 }
44c99757 4308
81c5a148
YZ
4309 if (recon_state.nr_caps || recon_state.nr_realms) {
4310 struct page *page =
4311 list_first_entry(&recon_state.pagelist->head,
4312 struct page, lru);
44c99757 4313 __le32 *addr = kmap_atomic(page);
81c5a148
YZ
4314 if (recon_state.nr_caps) {
4315 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4316 *addr = cpu_to_le32(recon_state.nr_caps);
4317 } else if (recon_state.msg_version >= 4) {
4318 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4319 }
44c99757 4320 kunmap_atomic(addr);
ebf18f47 4321 }
44c99757 4322
81c5a148
YZ
4323 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4324 if (recon_state.msg_version >= 4)
4325 reply->hdr.compat_version = cpu_to_le16(4);
e548e9b9 4326
81c5a148
YZ
4327 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4328 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
e548e9b9 4329
2f2dc053
SW
4330 ceph_con_send(&session->s_con, reply);
4331
9abf82b8
SW
4332 mutex_unlock(&session->s_mutex);
4333
4334 mutex_lock(&mdsc->mutex);
4335 __wake_requests(mdsc, &session->s_waiting);
4336 mutex_unlock(&mdsc->mutex);
4337
2f2dc053 4338 up_read(&mdsc->snap_rwsem);
81c5a148 4339 ceph_pagelist_release(recon_state.pagelist);
2f2dc053
SW
4340 return;
4341
93cea5be 4342fail:
2f2dc053 4343 ceph_msg_put(reply);
9abf82b8
SW
4344 up_read(&mdsc->snap_rwsem);
4345 mutex_unlock(&session->s_mutex);
93cea5be 4346fail_nomsg:
81c5a148 4347 ceph_pagelist_release(recon_state.pagelist);
93cea5be 4348fail_nopagelist:
9abf82b8 4349 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
9abf82b8 4350 return;
2f2dc053
SW
4351}
4352
4353
4354/*
4355 * compare old and new mdsmaps, kicking requests
4356 * and closing out old connections as necessary
4357 *
4358 * called under mdsc->mutex.
4359 */
4360static void check_new_map(struct ceph_mds_client *mdsc,
4361 struct ceph_mdsmap *newmap,
4362 struct ceph_mdsmap *oldmap)
4363{
d517b398 4364 int i, j, err;
2f2dc053
SW
4365 int oldstate, newstate;
4366 struct ceph_mds_session *s;
d517b398 4367 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
2f2dc053
SW
4368
4369 dout("check_new_map new %u old %u\n",
4370 newmap->m_epoch, oldmap->m_epoch);
4371
d517b398
XL
4372 if (newmap->m_info) {
4373 for (i = 0; i < newmap->possible_max_rank; i++) {
4374 for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
4375 set_bit(newmap->m_info[i].export_targets[j], targets);
4376 }
4377 }
4378
b38c9eb4 4379 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
d37b1d99 4380 if (!mdsc->sessions[i])
2f2dc053
SW
4381 continue;
4382 s = mdsc->sessions[i];
4383 oldstate = ceph_mdsmap_get_state(oldmap, i);
4384 newstate = ceph_mdsmap_get_state(newmap, i);
4385
0deb01c9 4386 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2f2dc053 4387 i, ceph_mds_state_name(oldstate),
0deb01c9 4388 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2f2dc053 4389 ceph_mds_state_name(newstate),
0deb01c9 4390 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
a687ecaf 4391 ceph_session_state_name(s->s_state));
2f2dc053 4392
b38c9eb4 4393 if (i >= newmap->possible_max_rank) {
6f0f597b 4394 /* force close session for stopped mds */
5b3248c6 4395 ceph_get_mds_session(s);
6f0f597b
YZ
4396 __unregister_session(mdsc, s);
4397 __wake_requests(mdsc, &s->s_waiting);
4398 mutex_unlock(&mdsc->mutex);
2827528d 4399
6f0f597b
YZ
4400 mutex_lock(&s->s_mutex);
4401 cleanup_session_requests(mdsc, s);
4402 remove_session_caps(s);
4403 mutex_unlock(&s->s_mutex);
2827528d 4404
6f0f597b 4405 ceph_put_mds_session(s);
2827528d 4406
6f0f597b
YZ
4407 mutex_lock(&mdsc->mutex);
4408 kick_requests(mdsc, i);
4409 continue;
4410 }
4411
4412 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4413 ceph_mdsmap_get_addr(newmap, i),
4414 sizeof(struct ceph_entity_addr))) {
4415 /* just close it */
4416 mutex_unlock(&mdsc->mutex);
4417 mutex_lock(&s->s_mutex);
4418 mutex_lock(&mdsc->mutex);
4419 ceph_con_close(&s->s_con);
4420 mutex_unlock(&s->s_mutex);
4421 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2f2dc053
SW
4422 } else if (oldstate == newstate) {
4423 continue; /* nothing new with this mds */
4424 }
4425
4426 /*
4427 * send reconnect?
4428 */
4429 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
34b6c855
SW
4430 newstate >= CEPH_MDS_STATE_RECONNECT) {
4431 mutex_unlock(&mdsc->mutex);
d517b398 4432 clear_bit(i, targets);
34b6c855
SW
4433 send_mds_reconnect(mdsc, s);
4434 mutex_lock(&mdsc->mutex);
4435 }
2f2dc053
SW
4436
4437 /*
29790f26 4438 * kick request on any mds that has gone active.
2f2dc053
SW
4439 */
4440 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4441 newstate >= CEPH_MDS_STATE_ACTIVE) {
29790f26
SW
4442 if (oldstate != CEPH_MDS_STATE_CREATING &&
4443 oldstate != CEPH_MDS_STATE_STARTING)
4444 pr_info("mds%d recovery completed\n", s->s_mds);
4445 kick_requests(mdsc, i);
ea8412b2 4446 mutex_unlock(&mdsc->mutex);
829ad4db 4447 mutex_lock(&s->s_mutex);
ea8412b2 4448 mutex_lock(&mdsc->mutex);
2f2dc053 4449 ceph_kick_flushing_caps(mdsc, s);
829ad4db 4450 mutex_unlock(&s->s_mutex);
d2f8bb27 4451 wake_up_session_caps(s, RECONNECT);
2f2dc053
SW
4452 }
4453 }
cb170a22 4454
d517b398
XL
4455 /*
4456 * Only open and reconnect sessions that don't exist yet.
4457 */
4458 for (i = 0; i < newmap->possible_max_rank; i++) {
4459 /*
4460 * In case the import MDS is crashed just after
4461 * the EImportStart journal is flushed, so when
4462 * a standby MDS takes over it and is replaying
4463 * the EImportStart journal the new MDS daemon
4464 * will wait the client to reconnect it, but the
4465 * client may never register/open the session yet.
4466 *
4467 * Will try to reconnect that MDS daemon if the
4468 * rank number is in the export targets array and
4469 * is the up:reconnect state.
4470 */
4471 newstate = ceph_mdsmap_get_state(newmap, i);
4472 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
4473 continue;
4474
4475 /*
4476 * The session maybe registered and opened by some
4477 * requests which were choosing random MDSes during
4478 * the mdsc->mutex's unlock/lock gap below in rare
4479 * case. But the related MDS daemon will just queue
4480 * that requests and be still waiting for the client's
4481 * reconnection request in up:reconnect state.
4482 */
4483 s = __ceph_lookup_mds_session(mdsc, i);
4484 if (likely(!s)) {
4485 s = __open_export_target_session(mdsc, i);
4486 if (IS_ERR(s)) {
4487 err = PTR_ERR(s);
4488 pr_err("failed to open export target session, err %d\n",
4489 err);
4490 continue;
4491 }
4492 }
4493 dout("send reconnect to export target mds.%d\n", i);
4494 mutex_unlock(&mdsc->mutex);
4495 send_mds_reconnect(mdsc, s);
4496 ceph_put_mds_session(s);
4497 mutex_lock(&mdsc->mutex);
4498 }
4499
b38c9eb4 4500 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
cb170a22
SW
4501 s = mdsc->sessions[i];
4502 if (!s)
4503 continue;
4504 if (!ceph_mdsmap_is_laggy(newmap, i))
4505 continue;
4506 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4507 s->s_state == CEPH_MDS_SESSION_HUNG ||
4508 s->s_state == CEPH_MDS_SESSION_CLOSING) {
4509 dout(" connecting to export targets of laggy mds%d\n",
4510 i);
4511 __open_export_target_sessions(mdsc, s);
4512 }
4513 }
2f2dc053
SW
4514}
4515
4516
4517
4518/*
4519 * leases
4520 */
4521
4522/*
4523 * caller must hold session s_mutex, dentry->d_lock
4524 */
4525void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4526{
4527 struct ceph_dentry_info *di = ceph_dentry(dentry);
4528
4529 ceph_put_mds_session(di->lease_session);
4530 di->lease_session = NULL;
4531}
4532
2600d2dd
SW
4533static void handle_lease(struct ceph_mds_client *mdsc,
4534 struct ceph_mds_session *session,
4535 struct ceph_msg *msg)
2f2dc053 4536{
3d14c5d2 4537 struct super_block *sb = mdsc->fsc->sb;
2f2dc053 4538 struct inode *inode;
2f2dc053
SW
4539 struct dentry *parent, *dentry;
4540 struct ceph_dentry_info *di;
2600d2dd 4541 int mds = session->s_mds;
2f2dc053 4542 struct ceph_mds_lease *h = msg->front.iov_base;
1e5ea23d 4543 u32 seq;
2f2dc053 4544 struct ceph_vino vino;
2f2dc053
SW
4545 struct qstr dname;
4546 int release = 0;
4547
2f2dc053
SW
4548 dout("handle_lease from mds%d\n", mds);
4549
4550 /* decode */
4551 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4552 goto bad;
4553 vino.ino = le64_to_cpu(h->ino);
4554 vino.snap = CEPH_NOSNAP;
1e5ea23d 4555 seq = le32_to_cpu(h->seq);
0fcf6c02
YZ
4556 dname.len = get_unaligned_le32(h + 1);
4557 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
2f2dc053 4558 goto bad;
0fcf6c02 4559 dname.name = (void *)(h + 1) + sizeof(u32);
2f2dc053 4560
2f2dc053
SW
4561 /* lookup inode */
4562 inode = ceph_find_inode(sb, vino);
2f90b852
SW
4563 dout("handle_lease %s, ino %llx %p %.*s\n",
4564 ceph_lease_op_name(h->action), vino.ino, inode,
1e5ea23d 4565 dname.len, dname.name);
6cd3bcad
YZ
4566
4567 mutex_lock(&session->s_mutex);
62575e27 4568 inc_session_sequence(session);
6cd3bcad 4569
d37b1d99 4570 if (!inode) {
2f2dc053
SW
4571 dout("handle_lease no inode %llx\n", vino.ino);
4572 goto release;
4573 }
2f2dc053
SW
4574
4575 /* dentry */
4576 parent = d_find_alias(inode);
4577 if (!parent) {
4578 dout("no parent dentry on inode %p\n", inode);
4579 WARN_ON(1);
4580 goto release; /* hrm... */
4581 }
8387ff25 4582 dname.hash = full_name_hash(parent, dname.name, dname.len);
2f2dc053
SW
4583 dentry = d_lookup(parent, &dname);
4584 dput(parent);
4585 if (!dentry)
4586 goto release;
4587
4588 spin_lock(&dentry->d_lock);
4589 di = ceph_dentry(dentry);
4590 switch (h->action) {
4591 case CEPH_MDS_LEASE_REVOKE:
3d8eb7a9 4592 if (di->lease_session == session) {
1e5ea23d
SW
4593 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4594 h->seq = cpu_to_le32(di->lease_seq);
2f2dc053
SW
4595 __ceph_mdsc_drop_dentry_lease(dentry);
4596 }
4597 release = 1;
4598 break;
4599
4600 case CEPH_MDS_LEASE_RENEW:
3d8eb7a9 4601 if (di->lease_session == session &&
52d60f8e 4602 di->lease_gen == atomic_read(&session->s_cap_gen) &&
2f2dc053
SW
4603 di->lease_renew_from &&
4604 di->lease_renew_after == 0) {
4605 unsigned long duration =
3563dbdd 4606 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
2f2dc053 4607
1e5ea23d 4608 di->lease_seq = seq;
9b16f03c 4609 di->time = di->lease_renew_from + duration;
2f2dc053
SW
4610 di->lease_renew_after = di->lease_renew_from +
4611 (duration >> 1);
4612 di->lease_renew_from = 0;
4613 }
4614 break;
4615 }
4616 spin_unlock(&dentry->d_lock);
4617 dput(dentry);
4618
4619 if (!release)
4620 goto out;
4621
4622release:
4623 /* let's just reuse the same message */
4624 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4625 ceph_msg_get(msg);
4626 ceph_con_send(&session->s_con, msg);
4627
4628out:
2f2dc053 4629 mutex_unlock(&session->s_mutex);
23c2c76e 4630 iput(inode);
2f2dc053
SW
4631 return;
4632
4633bad:
4634 pr_err("corrupt lease message\n");
9ec7cab1 4635 ceph_msg_dump(msg);
2f2dc053
SW
4636}
4637
4638void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2f2dc053
SW
4639 struct dentry *dentry, char action,
4640 u32 seq)
4641{
4642 struct ceph_msg *msg;
4643 struct ceph_mds_lease *lease;
8f2a98ef
YZ
4644 struct inode *dir;
4645 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
2f2dc053 4646
8f2a98ef
YZ
4647 dout("lease_send_msg identry %p %s to mds%d\n",
4648 dentry, ceph_lease_op_name(action), session->s_mds);
2f2dc053 4649
b61c2763 4650 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
a79832f2 4651 if (!msg)
2f2dc053
SW
4652 return;
4653 lease = msg->front.iov_base;
4654 lease->action = action;
2f2dc053 4655 lease->seq = cpu_to_le32(seq);
2f2dc053 4656
8f2a98ef
YZ
4657 spin_lock(&dentry->d_lock);
4658 dir = d_inode(dentry->d_parent);
4659 lease->ino = cpu_to_le64(ceph_ino(dir));
4660 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4661
4662 put_unaligned_le32(dentry->d_name.len, lease + 1);
4663 memcpy((void *)(lease + 1) + 4,
4664 dentry->d_name.name, dentry->d_name.len);
4665 spin_unlock(&dentry->d_lock);
2f2dc053
SW
4666
4667 ceph_con_send(&session->s_con, msg);
4668}
4669
2f2dc053 4670/*
59b312f3 4671 * lock unlock the session, to wait ongoing session activities
2f2dc053 4672 */
59b312f3 4673static void lock_unlock_session(struct ceph_mds_session *s)
2f2dc053 4674{
59b312f3
XL
4675 mutex_lock(&s->s_mutex);
4676 mutex_unlock(&s->s_mutex);
2f2dc053
SW
4677}
4678
131d7eb4
YZ
4679static void maybe_recover_session(struct ceph_mds_client *mdsc)
4680{
4681 struct ceph_fs_client *fsc = mdsc->fsc;
4682
4683 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4684 return;
2f2dc053 4685
131d7eb4
YZ
4686 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4687 return;
4688
0b98acd6 4689 if (!READ_ONCE(fsc->blocklisted))
131d7eb4
YZ
4690 return;
4691
0b98acd6 4692 pr_info("auto reconnect after blocklisted\n");
131d7eb4
YZ
4693 ceph_force_reconnect(fsc->sb);
4694}
2f2dc053 4695
3e699bd8
XL
4696bool check_session_state(struct ceph_mds_session *s)
4697{
62575e27
JL
4698 switch (s->s_state) {
4699 case CEPH_MDS_SESSION_OPEN:
4700 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3e699bd8
XL
4701 s->s_state = CEPH_MDS_SESSION_HUNG;
4702 pr_info("mds%d hung\n", s->s_mds);
4703 }
62575e27
JL
4704 break;
4705 case CEPH_MDS_SESSION_CLOSING:
62575e27
JL
4706 case CEPH_MDS_SESSION_NEW:
4707 case CEPH_MDS_SESSION_RESTARTING:
4708 case CEPH_MDS_SESSION_CLOSED:
4709 case CEPH_MDS_SESSION_REJECTED:
3e699bd8 4710 return false;
62575e27 4711 }
3e699bd8
XL
4712
4713 return true;
4714}
4715
62575e27
JL
4716/*
4717 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4718 * then we need to retransmit that request.
4719 */
4720void inc_session_sequence(struct ceph_mds_session *s)
4721{
4722 lockdep_assert_held(&s->s_mutex);
4723
4724 s->s_seq++;
4725
4726 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4727 int ret;
4728
4729 dout("resending session close request for mds%d\n", s->s_mds);
4730 ret = request_close_session(s);
4731 if (ret < 0)
4732 pr_err("unable to close session to mds%d: %d\n",
4733 s->s_mds, ret);
4734 }
4735}
4736
2f2dc053 4737/*
bf2ba432
LH
4738 * delayed work -- periodically trim expired leases, renew caps with mds. If
4739 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4740 * workqueue delay value of 5 secs will be used.
2f2dc053 4741 */
bf2ba432 4742static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
2f2dc053 4743{
bf2ba432
LH
4744 unsigned long max_delay = HZ * 5;
4745
4746 /* 5 secs default delay */
4747 if (!delay || (delay > max_delay))
4748 delay = max_delay;
4749 schedule_delayed_work(&mdsc->delayed_work,
4750 round_jiffies_relative(delay));
2f2dc053
SW
4751}
4752
4753static void delayed_work(struct work_struct *work)
4754{
2f2dc053
SW
4755 struct ceph_mds_client *mdsc =
4756 container_of(work, struct ceph_mds_client, delayed_work.work);
bf2ba432 4757 unsigned long delay;
2f2dc053
SW
4758 int renew_interval;
4759 int renew_caps;
bf2ba432 4760 int i;
2f2dc053
SW
4761
4762 dout("mdsc delayed_work\n");
75c9627e 4763
fa996773
XL
4764 if (mdsc->stopping)
4765 return;
4766
2f2dc053
SW
4767 mutex_lock(&mdsc->mutex);
4768 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4769 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4770 mdsc->last_renew_caps);
4771 if (renew_caps)
4772 mdsc->last_renew_caps = jiffies;
4773
4774 for (i = 0; i < mdsc->max_sessions; i++) {
4775 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
d37b1d99 4776 if (!s)
2f2dc053 4777 continue;
3e699bd8
XL
4778
4779 if (!check_session_state(s)) {
2f2dc053
SW
4780 ceph_put_mds_session(s);
4781 continue;
4782 }
4783 mutex_unlock(&mdsc->mutex);
4784
4785 mutex_lock(&s->s_mutex);
4786 if (renew_caps)
4787 send_renew_caps(mdsc, s);
4788 else
4789 ceph_con_keepalive(&s->s_con);
aab53dd9
SW
4790 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4791 s->s_state == CEPH_MDS_SESSION_HUNG)
3d7ded4d 4792 ceph_send_cap_releases(mdsc, s);
2f2dc053
SW
4793 mutex_unlock(&s->s_mutex);
4794 ceph_put_mds_session(s);
4795
4796 mutex_lock(&mdsc->mutex);
4797 }
4798 mutex_unlock(&mdsc->mutex);
4799
bf2ba432 4800 delay = ceph_check_delayed_caps(mdsc);
37c4efc1
YZ
4801
4802 ceph_queue_cap_reclaim_work(mdsc);
4803
4804 ceph_trim_snapid_map(mdsc);
4805
131d7eb4
YZ
4806 maybe_recover_session(mdsc);
4807
bf2ba432 4808 schedule_delayed(mdsc, delay);
2f2dc053
SW
4809}
4810
3d14c5d2 4811int ceph_mdsc_init(struct ceph_fs_client *fsc)
2f2dc053 4812
2f2dc053 4813{
3d14c5d2 4814 struct ceph_mds_client *mdsc;
f9009efa 4815 int err;
3d14c5d2
YS
4816
4817 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4818 if (!mdsc)
4819 return -ENOMEM;
4820 mdsc->fsc = fsc;
2f2dc053
SW
4821 mutex_init(&mdsc->mutex);
4822 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
d37b1d99 4823 if (!mdsc->mdsmap) {
f9009efa
XL
4824 err = -ENOMEM;
4825 goto err_mdsc;
fb3101b6 4826 }
2d06eeb8 4827
2f2dc053 4828 init_completion(&mdsc->safe_umount_waiters);
f3c60c59 4829 init_waitqueue_head(&mdsc->session_close_wq);
2f2dc053 4830 INIT_LIST_HEAD(&mdsc->waiting_for_map);
0c44a8e0
LH
4831 mdsc->quotarealms_inodes = RB_ROOT;
4832 mutex_init(&mdsc->quotarealms_inodes_mutex);
2f2dc053 4833 init_rwsem(&mdsc->snap_rwsem);
a105f00c 4834 mdsc->snap_realms = RB_ROOT;
2f2dc053
SW
4835 INIT_LIST_HEAD(&mdsc->snap_empty);
4836 spin_lock_init(&mdsc->snap_empty_lock);
44ca18f2 4837 mdsc->request_tree = RB_ROOT;
2f2dc053
SW
4838 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4839 mdsc->last_renew_caps = jiffies;
4840 INIT_LIST_HEAD(&mdsc->cap_delay_list);
3a3430af 4841 INIT_LIST_HEAD(&mdsc->cap_wait_list);
2f2dc053
SW
4842 spin_lock_init(&mdsc->cap_delay_lock);
4843 INIT_LIST_HEAD(&mdsc->snap_flush_list);
4844 spin_lock_init(&mdsc->snap_flush_lock);
553adfd9 4845 mdsc->last_cap_flush_tid = 1;
e4500b5e 4846 INIT_LIST_HEAD(&mdsc->cap_flush_list);
db354052 4847 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
2f2dc053
SW
4848 spin_lock_init(&mdsc->cap_dirty_lock);
4849 init_waitqueue_head(&mdsc->cap_flushing_wq);
37c4efc1 4850 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
f9009efa
XL
4851 err = ceph_metric_init(&mdsc->metric);
4852 if (err)
4853 goto err_mdsmap;
37c4efc1
YZ
4854
4855 spin_lock_init(&mdsc->dentry_list_lock);
4856 INIT_LIST_HEAD(&mdsc->dentry_leases);
4857 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
2d06eeb8 4858
37151668 4859 ceph_caps_init(mdsc);
fe33032d 4860 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
37151668 4861
75c9627e
YZ
4862 spin_lock_init(&mdsc->snapid_map_lock);
4863 mdsc->snapid_map_tree = RB_ROOT;
4864 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4865
10183a69
YZ
4866 init_rwsem(&mdsc->pool_perm_rwsem);
4867 mdsc->pool_perm_tree = RB_ROOT;
4868
dfeb84d4
YZ
4869 strscpy(mdsc->nodename, utsname()->nodename,
4870 sizeof(mdsc->nodename));
a7caa88f
XL
4871
4872 fsc->mdsc = mdsc;
5f44f142 4873 return 0;
f9009efa
XL
4874
4875err_mdsmap:
4876 kfree(mdsc->mdsmap);
4877err_mdsc:
4878 kfree(mdsc);
4879 return err;
2f2dc053
SW
4880}
4881
4882/*
4883 * Wait for safe replies on open mds requests. If we time out, drop
4884 * all requests from the tree to avoid dangling dentry refs.
4885 */
4886static void wait_requests(struct ceph_mds_client *mdsc)
4887{
a319bf56 4888 struct ceph_options *opts = mdsc->fsc->client->options;
2f2dc053 4889 struct ceph_mds_request *req;
2f2dc053
SW
4890
4891 mutex_lock(&mdsc->mutex);
44ca18f2 4892 if (__get_oldest_req(mdsc)) {
2f2dc053 4893 mutex_unlock(&mdsc->mutex);
44ca18f2 4894
2f2dc053
SW
4895 dout("wait_requests waiting for requests\n");
4896 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
a319bf56 4897 ceph_timeout_jiffies(opts->mount_timeout));
2f2dc053
SW
4898
4899 /* tear down remaining requests */
44ca18f2
SW
4900 mutex_lock(&mdsc->mutex);
4901 while ((req = __get_oldest_req(mdsc))) {
2f2dc053
SW
4902 dout("wait_requests timed out on tid %llu\n",
4903 req->r_tid);
428138c9 4904 list_del_init(&req->r_wait);
44ca18f2 4905 __unregister_request(mdsc, req);
2f2dc053
SW
4906 }
4907 }
4908 mutex_unlock(&mdsc->mutex);
4909 dout("wait_requests done\n");
4910}
4911
d095559c
XL
4912void send_flush_mdlog(struct ceph_mds_session *s)
4913{
4914 struct ceph_msg *msg;
4915
4916 /*
4917 * Pre-luminous MDS crashes when it sees an unknown session request
4918 */
4919 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4920 return;
4921
4922 mutex_lock(&s->s_mutex);
4923 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4924 ceph_session_state_name(s->s_state), s->s_seq);
4925 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4926 s->s_seq);
4927 if (!msg) {
4928 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4929 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4930 } else {
4931 ceph_con_send(&s->s_con, msg);
4932 }
4933 mutex_unlock(&s->s_mutex);
4934}
4935
2f2dc053
SW
4936/*
4937 * called before mount is ro, and before dentries are torn down.
4938 * (hmm, does this still race with new lookups?)
4939 */
4940void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4941{
4942 dout("pre_umount\n");
4943 mdsc->stopping = 1;
4944
d095559c 4945 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
59b312f3 4946 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
afcdaea3 4947 ceph_flush_dirty_caps(mdsc);
2f2dc053 4948 wait_requests(mdsc);
17c688c3
SW
4949
4950 /*
4951 * wait for reply handlers to drop their request refs and
4952 * their inode/dcache refs
4953 */
4954 ceph_msgr_flush();
0c44a8e0
LH
4955
4956 ceph_cleanup_quotarealms_inodes(mdsc);
2f2dc053
SW
4957}
4958
4959/*
1b2ba3c5 4960 * flush the mdlog and wait for all write mds requests to flush.
2f2dc053 4961 */
1b2ba3c5
XL
4962static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
4963 u64 want_tid)
2f2dc053 4964{
80fc7314 4965 struct ceph_mds_request *req = NULL, *nextreq;
1b2ba3c5 4966 struct ceph_mds_session *last_session = NULL;
44ca18f2 4967 struct rb_node *n;
2f2dc053
SW
4968
4969 mutex_lock(&mdsc->mutex);
1b2ba3c5 4970 dout("%s want %lld\n", __func__, want_tid);
80fc7314 4971restart:
44ca18f2
SW
4972 req = __get_oldest_req(mdsc);
4973 while (req && req->r_tid <= want_tid) {
80fc7314
SW
4974 /* find next request */
4975 n = rb_next(&req->r_node);
4976 if (n)
4977 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4978 else
4979 nextreq = NULL;
e8a7b8b1
YZ
4980 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4981 (req->r_op & CEPH_MDS_OP_WRITE)) {
1b2ba3c5
XL
4982 struct ceph_mds_session *s = req->r_session;
4983
4984 if (!s) {
4985 req = nextreq;
4986 continue;
4987 }
4988
44ca18f2
SW
4989 /* write op */
4990 ceph_mdsc_get_request(req);
80fc7314
SW
4991 if (nextreq)
4992 ceph_mdsc_get_request(nextreq);
1b2ba3c5 4993 s = ceph_get_mds_session(s);
44ca18f2 4994 mutex_unlock(&mdsc->mutex);
1b2ba3c5
XL
4995
4996 /* send flush mdlog request to MDS */
4997 if (last_session != s) {
4998 send_flush_mdlog(s);
4999 ceph_put_mds_session(last_session);
5000 last_session = s;
5001 } else {
5002 ceph_put_mds_session(s);
5003 }
5004 dout("%s wait on %llu (want %llu)\n", __func__,
44ca18f2
SW
5005 req->r_tid, want_tid);
5006 wait_for_completion(&req->r_safe_completion);
1b2ba3c5 5007
44ca18f2 5008 mutex_lock(&mdsc->mutex);
44ca18f2 5009 ceph_mdsc_put_request(req);
80fc7314
SW
5010 if (!nextreq)
5011 break; /* next dne before, so we're done! */
5012 if (RB_EMPTY_NODE(&nextreq->r_node)) {
5013 /* next request was removed from tree */
5014 ceph_mdsc_put_request(nextreq);
5015 goto restart;
5016 }
5017 ceph_mdsc_put_request(nextreq); /* won't go away */
44ca18f2 5018 }
80fc7314 5019 req = nextreq;
2f2dc053
SW
5020 }
5021 mutex_unlock(&mdsc->mutex);
1b2ba3c5
XL
5022 ceph_put_mds_session(last_session);
5023 dout("%s done\n", __func__);
2f2dc053
SW
5024}
5025
5026void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5027{
0e294387 5028 u64 want_tid, want_flush;
2f2dc053 5029
50c9132d 5030 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
56b7cf95
SW
5031 return;
5032
2f2dc053
SW
5033 dout("sync\n");
5034 mutex_lock(&mdsc->mutex);
5035 want_tid = mdsc->last_tid;
2f2dc053 5036 mutex_unlock(&mdsc->mutex);
2f2dc053 5037
afcdaea3 5038 ceph_flush_dirty_caps(mdsc);
d3383a8e 5039 spin_lock(&mdsc->cap_dirty_lock);
8310b089 5040 want_flush = mdsc->last_cap_flush_tid;
c8799fc4
YZ
5041 if (!list_empty(&mdsc->cap_flush_list)) {
5042 struct ceph_cap_flush *cf =
5043 list_last_entry(&mdsc->cap_flush_list,
5044 struct ceph_cap_flush, g_list);
5045 cf->wake = true;
5046 }
d3383a8e
YZ
5047 spin_unlock(&mdsc->cap_dirty_lock);
5048
0e294387
YZ
5049 dout("sync want tid %lld flush_seq %lld\n",
5050 want_tid, want_flush);
2f2dc053 5051
1b2ba3c5 5052 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
0e294387 5053 wait_caps_flush(mdsc, want_flush);
2f2dc053
SW
5054}
5055
f3c60c59
SW
5056/*
5057 * true if all sessions are closed, or we force unmount
5058 */
fcff415c 5059static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
f3c60c59 5060{
52953d55 5061 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
f3c60c59 5062 return true;
fcff415c 5063 return atomic_read(&mdsc->num_sessions) <= skipped;
f3c60c59 5064}
2f2dc053
SW
5065
5066/*
a68e564a 5067 * called after sb is ro or when metadata corrupted.
2f2dc053
SW
5068 */
5069void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5070{
a319bf56 5071 struct ceph_options *opts = mdsc->fsc->client->options;
2f2dc053
SW
5072 struct ceph_mds_session *session;
5073 int i;
fcff415c 5074 int skipped = 0;
2f2dc053
SW
5075
5076 dout("close_sessions\n");
5077
2f2dc053 5078 /* close sessions */
f3c60c59
SW
5079 mutex_lock(&mdsc->mutex);
5080 for (i = 0; i < mdsc->max_sessions; i++) {
5081 session = __ceph_lookup_mds_session(mdsc, i);
5082 if (!session)
5083 continue;
2f2dc053 5084 mutex_unlock(&mdsc->mutex);
f3c60c59 5085 mutex_lock(&session->s_mutex);
fcff415c
YZ
5086 if (__close_session(mdsc, session) <= 0)
5087 skipped++;
f3c60c59
SW
5088 mutex_unlock(&session->s_mutex);
5089 ceph_put_mds_session(session);
2f2dc053
SW
5090 mutex_lock(&mdsc->mutex);
5091 }
f3c60c59
SW
5092 mutex_unlock(&mdsc->mutex);
5093
5094 dout("waiting for sessions to close\n");
fcff415c
YZ
5095 wait_event_timeout(mdsc->session_close_wq,
5096 done_closing_sessions(mdsc, skipped),
a319bf56 5097 ceph_timeout_jiffies(opts->mount_timeout));
2f2dc053
SW
5098
5099 /* tear down remaining sessions */
f3c60c59 5100 mutex_lock(&mdsc->mutex);
2f2dc053
SW
5101 for (i = 0; i < mdsc->max_sessions; i++) {
5102 if (mdsc->sessions[i]) {
5b3248c6 5103 session = ceph_get_mds_session(mdsc->sessions[i]);
2600d2dd 5104 __unregister_session(mdsc, session);
2f2dc053
SW
5105 mutex_unlock(&mdsc->mutex);
5106 mutex_lock(&session->s_mutex);
5107 remove_session_caps(session);
5108 mutex_unlock(&session->s_mutex);
5109 ceph_put_mds_session(session);
5110 mutex_lock(&mdsc->mutex);
5111 }
5112 }
2f2dc053 5113 WARN_ON(!list_empty(&mdsc->cap_delay_list));
2f2dc053
SW
5114 mutex_unlock(&mdsc->mutex);
5115
75c9627e 5116 ceph_cleanup_snapid_map(mdsc);
5ed91587 5117 ceph_cleanup_global_and_empty_realms(mdsc);
2f2dc053 5118
37c4efc1 5119 cancel_work_sync(&mdsc->cap_reclaim_work);
2f2dc053
SW
5120 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
5121
5122 dout("stopped\n");
5123}
5124
48fec5d0
YZ
5125void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
5126{
5127 struct ceph_mds_session *session;
5128 int mds;
5129
5130 dout("force umount\n");
5131
5132 mutex_lock(&mdsc->mutex);
5133 for (mds = 0; mds < mdsc->max_sessions; mds++) {
5134 session = __ceph_lookup_mds_session(mdsc, mds);
5135 if (!session)
5136 continue;
d468e729
YZ
5137
5138 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
5139 __unregister_session(mdsc, session);
5140 __wake_requests(mdsc, &session->s_waiting);
48fec5d0 5141 mutex_unlock(&mdsc->mutex);
d468e729 5142
48fec5d0
YZ
5143 mutex_lock(&session->s_mutex);
5144 __close_session(mdsc, session);
5145 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
5146 cleanup_session_requests(mdsc, session);
5147 remove_session_caps(session);
5148 }
5149 mutex_unlock(&session->s_mutex);
5150 ceph_put_mds_session(session);
d468e729 5151
48fec5d0
YZ
5152 mutex_lock(&mdsc->mutex);
5153 kick_requests(mdsc, mds);
5154 }
5155 __wake_requests(mdsc, &mdsc->waiting_for_map);
5156 mutex_unlock(&mdsc->mutex);
5157}
5158
3d14c5d2 5159static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2f2dc053
SW
5160{
5161 dout("stop\n");
fa996773
XL
5162 /*
5163 * Make sure the delayed work stopped before releasing
5164 * the resources.
5165 *
5166 * Because the cancel_delayed_work_sync() will only
5167 * guarantee that the work finishes executing. But the
5168 * delayed work will re-arm itself again after that.
5169 */
5170 flush_delayed_work(&mdsc->delayed_work);
5171
2f2dc053
SW
5172 if (mdsc->mdsmap)
5173 ceph_mdsmap_destroy(mdsc->mdsmap);
5174 kfree(mdsc->sessions);
37151668 5175 ceph_caps_finalize(mdsc);
10183a69 5176 ceph_pool_perm_destroy(mdsc);
2f2dc053
SW
5177}
5178
3d14c5d2
YS
5179void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
5180{
5181 struct ceph_mds_client *mdsc = fsc->mdsc;
ef550f6f 5182 dout("mdsc_destroy %p\n", mdsc);
ef550f6f 5183
50c55aec
CX
5184 if (!mdsc)
5185 return;
5186
ef550f6f
SW
5187 /* flush out any connection work with references to us */
5188 ceph_msgr_flush();
5189
62a65f36
YZ
5190 ceph_mdsc_stop(mdsc);
5191
f9009efa
XL
5192 ceph_metric_destroy(&mdsc->metric);
5193
3d14c5d2
YS
5194 fsc->mdsc = NULL;
5195 kfree(mdsc);
ef550f6f 5196 dout("mdsc_destroy %p done\n", mdsc);
3d14c5d2
YS
5197}
5198
430afbad
YZ
5199void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5200{
5201 struct ceph_fs_client *fsc = mdsc->fsc;
5202 const char *mds_namespace = fsc->mount_options->mds_namespace;
5203 void *p = msg->front.iov_base;
5204 void *end = p + msg->front.iov_len;
5205 u32 epoch;
430afbad
YZ
5206 u32 num_fs;
5207 u32 mount_fscid = (u32)-1;
430afbad
YZ
5208 int err = -EINVAL;
5209
5210 ceph_decode_need(&p, end, sizeof(u32), bad);
5211 epoch = ceph_decode_32(&p);
5212
5213 dout("handle_fsmap epoch %u\n", epoch);
5214
06a1ad43
JL
5215 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
5216 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
430afbad 5217
06a1ad43 5218 ceph_decode_32_safe(&p, end, num_fs, bad);
430afbad
YZ
5219 while (num_fs-- > 0) {
5220 void *info_p, *info_end;
5221 u32 info_len;
430afbad
YZ
5222 u32 fscid, namelen;
5223
5224 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
06a1ad43 5225 p += 2; // info_v, info_cv
430afbad
YZ
5226 info_len = ceph_decode_32(&p);
5227 ceph_decode_need(&p, end, info_len, bad);
5228 info_p = p;
5229 info_end = p + info_len;
5230 p = info_end;
5231
5232 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
5233 fscid = ceph_decode_32(&info_p);
5234 namelen = ceph_decode_32(&info_p);
5235 ceph_decode_need(&info_p, info_end, namelen, bad);
5236
5237 if (mds_namespace &&
5238 strlen(mds_namespace) == namelen &&
5239 !strncmp(mds_namespace, (char *)info_p, namelen)) {
5240 mount_fscid = fscid;
5241 break;
5242 }
5243 }
5244
5245 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
5246 if (mount_fscid != (u32)-1) {
5247 fsc->client->monc.fs_cluster_id = mount_fscid;
5248 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
5249 0, true);
5250 ceph_monc_renew_subs(&fsc->client->monc);
5251 } else {
5252 err = -ENOENT;
5253 goto err_out;
5254 }
5255 return;
76bd6ec4 5256
430afbad 5257bad:
631ed4b0
JL
5258 pr_err("error decoding fsmap %d. Shutting down mount.\n", err);
5259 ceph_umount_begin(mdsc->fsc->sb);
430afbad
YZ
5260err_out:
5261 mutex_lock(&mdsc->mutex);
76bd6ec4 5262 mdsc->mdsmap_err = err;
430afbad
YZ
5263 __wake_requests(mdsc, &mdsc->waiting_for_map);
5264 mutex_unlock(&mdsc->mutex);
430afbad 5265}
2f2dc053
SW
5266
5267/*
5268 * handle mds map update.
5269 */
430afbad 5270void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2f2dc053
SW
5271{
5272 u32 epoch;
5273 u32 maplen;
5274 void *p = msg->front.iov_base;
5275 void *end = p + msg->front.iov_len;
5276 struct ceph_mdsmap *newmap, *oldmap;
5277 struct ceph_fsid fsid;
5278 int err = -EINVAL;
5279
5280 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5281 ceph_decode_copy(&p, &fsid, sizeof(fsid));
3d14c5d2 5282 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
0743304d 5283 return;
c89136ea
SW
5284 epoch = ceph_decode_32(&p);
5285 maplen = ceph_decode_32(&p);
2f2dc053
SW
5286 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5287
5288 /* do we need it? */
2f2dc053
SW
5289 mutex_lock(&mdsc->mutex);
5290 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5291 dout("handle_map epoch %u <= our %u\n",
5292 epoch, mdsc->mdsmap->m_epoch);
5293 mutex_unlock(&mdsc->mutex);
5294 return;
5295 }
5296
cd1a677c 5297 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
2f2dc053
SW
5298 if (IS_ERR(newmap)) {
5299 err = PTR_ERR(newmap);
5300 goto bad_unlock;
5301 }
5302
5303 /* swap into place */
5304 if (mdsc->mdsmap) {
5305 oldmap = mdsc->mdsmap;
5306 mdsc->mdsmap = newmap;
5307 check_new_map(mdsc, newmap, oldmap);
5308 ceph_mdsmap_destroy(oldmap);
5309 } else {
5310 mdsc->mdsmap = newmap; /* first mds map */
5311 }
719784ba
CX
5312 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5313 MAX_LFS_FILESIZE);
2f2dc053
SW
5314
5315 __wake_requests(mdsc, &mdsc->waiting_for_map);
82dcabad
ID
5316 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5317 mdsc->mdsmap->m_epoch);
2f2dc053
SW
5318
5319 mutex_unlock(&mdsc->mutex);
bf2ba432 5320 schedule_delayed(mdsc, 0);
2f2dc053
SW
5321 return;
5322
5323bad_unlock:
5324 mutex_unlock(&mdsc->mutex);
5325bad:
631ed4b0
JL
5326 pr_err("error decoding mdsmap %d. Shutting down mount.\n", err);
5327 ceph_umount_begin(mdsc->fsc->sb);
2f2dc053
SW
5328 return;
5329}
5330
4972cf60 5331static struct ceph_connection *mds_get_con(struct ceph_connection *con)
2f2dc053
SW
5332{
5333 struct ceph_mds_session *s = con->private;
5334
5b3248c6 5335 if (ceph_get_mds_session(s))
2f2dc053 5336 return con;
2f2dc053
SW
5337 return NULL;
5338}
5339
4972cf60 5340static void mds_put_con(struct ceph_connection *con)
2f2dc053
SW
5341{
5342 struct ceph_mds_session *s = con->private;
5343
2f2dc053
SW
5344 ceph_put_mds_session(s);
5345}
5346
5347/*
5348 * if the client is unresponsive for long enough, the mds will kill
5349 * the session entirely.
5350 */
4972cf60 5351static void mds_peer_reset(struct ceph_connection *con)
2f2dc053
SW
5352{
5353 struct ceph_mds_session *s = con->private;
7e70f0ed 5354 struct ceph_mds_client *mdsc = s->s_mdsc;
2f2dc053 5355
f3ae1b97 5356 pr_warn("mds%d closed our session\n", s->s_mds);
a68e564a
XL
5357 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO)
5358 send_mds_reconnect(mdsc, s);
2f2dc053
SW
5359}
5360
4972cf60 5361static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2f2dc053
SW
5362{
5363 struct ceph_mds_session *s = con->private;
5364 struct ceph_mds_client *mdsc = s->s_mdsc;
5365 int type = le16_to_cpu(msg->hdr.type);
5366
2600d2dd
SW
5367 mutex_lock(&mdsc->mutex);
5368 if (__verify_registered_session(mdsc, s) < 0) {
5369 mutex_unlock(&mdsc->mutex);
5370 goto out;
5371 }
5372 mutex_unlock(&mdsc->mutex);
5373
2f2dc053
SW
5374 switch (type) {
5375 case CEPH_MSG_MDS_MAP:
430afbad
YZ
5376 ceph_mdsc_handle_mdsmap(mdsc, msg);
5377 break;
5378 case CEPH_MSG_FS_MAP_USER:
5379 ceph_mdsc_handle_fsmap(mdsc, msg);
2f2dc053
SW
5380 break;
5381 case CEPH_MSG_CLIENT_SESSION:
5382 handle_session(s, msg);
5383 break;
5384 case CEPH_MSG_CLIENT_REPLY:
5385 handle_reply(s, msg);
5386 break;
5387 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2600d2dd 5388 handle_forward(mdsc, s, msg);
2f2dc053
SW
5389 break;
5390 case CEPH_MSG_CLIENT_CAPS:
5391 ceph_handle_caps(s, msg);
5392 break;
5393 case CEPH_MSG_CLIENT_SNAP:
2600d2dd 5394 ceph_handle_snap(mdsc, s, msg);
2f2dc053
SW
5395 break;
5396 case CEPH_MSG_CLIENT_LEASE:
2600d2dd 5397 handle_lease(mdsc, s, msg);
2f2dc053 5398 break;
fb18a575
LH
5399 case CEPH_MSG_CLIENT_QUOTA:
5400 ceph_handle_quota(mdsc, s, msg);
5401 break;
2f2dc053
SW
5402
5403 default:
5404 pr_err("received unknown message type %d %s\n", type,
5405 ceph_msg_type_name(type));
5406 }
2600d2dd 5407out:
2f2dc053
SW
5408 ceph_msg_put(msg);
5409}
5410
4e7a5dcd
SW
5411/*
5412 * authentication
5413 */
a3530df3
AE
5414
5415/*
5416 * Note: returned pointer is the address of a structure that's
5417 * managed separately. Caller must *not* attempt to free it.
5418 */
4972cf60
ID
5419static struct ceph_auth_handshake *
5420mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
4e7a5dcd
SW
5421{
5422 struct ceph_mds_session *s = con->private;
5423 struct ceph_mds_client *mdsc = s->s_mdsc;
3d14c5d2 5424 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
74f1869f 5425 struct ceph_auth_handshake *auth = &s->s_auth;
ce287162 5426 int ret;
4e7a5dcd 5427
ce287162
ID
5428 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5429 force_new, proto, NULL, NULL);
5430 if (ret)
5431 return ERR_PTR(ret);
74f1869f 5432
a3530df3 5433 return auth;
4e7a5dcd
SW
5434}
5435
4972cf60 5436static int mds_add_authorizer_challenge(struct ceph_connection *con,
6daca13d
ID
5437 void *challenge_buf, int challenge_buf_len)
5438{
5439 struct ceph_mds_session *s = con->private;
5440 struct ceph_mds_client *mdsc = s->s_mdsc;
5441 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5442
5443 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5444 challenge_buf, challenge_buf_len);
5445}
4e7a5dcd 5446
4972cf60 5447static int mds_verify_authorizer_reply(struct ceph_connection *con)
4e7a5dcd
SW
5448{
5449 struct ceph_mds_session *s = con->private;
5450 struct ceph_mds_client *mdsc = s->s_mdsc;
3d14c5d2 5451 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
285ea34f 5452 struct ceph_auth_handshake *auth = &s->s_auth;
4e7a5dcd 5453
285ea34f
ID
5454 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5455 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5456 NULL, NULL, NULL, NULL);
4e7a5dcd
SW
5457}
5458
4972cf60 5459static int mds_invalidate_authorizer(struct ceph_connection *con)
9bd2e6f8
SW
5460{
5461 struct ceph_mds_session *s = con->private;
5462 struct ceph_mds_client *mdsc = s->s_mdsc;
3d14c5d2 5463 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
9bd2e6f8 5464
27859f97 5465 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
9bd2e6f8 5466
3d14c5d2 5467 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
9bd2e6f8
SW
5468}
5469
cd1a677c
ID
5470static int mds_get_auth_request(struct ceph_connection *con,
5471 void *buf, int *buf_len,
5472 void **authorizer, int *authorizer_len)
5473{
5474 struct ceph_mds_session *s = con->private;
5475 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5476 struct ceph_auth_handshake *auth = &s->s_auth;
5477 int ret;
5478
5479 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5480 buf, buf_len);
5481 if (ret)
5482 return ret;
5483
5484 *authorizer = auth->authorizer_buf;
5485 *authorizer_len = auth->authorizer_buf_len;
5486 return 0;
5487}
5488
5489static int mds_handle_auth_reply_more(struct ceph_connection *con,
5490 void *reply, int reply_len,
5491 void *buf, int *buf_len,
5492 void **authorizer, int *authorizer_len)
5493{
5494 struct ceph_mds_session *s = con->private;
5495 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5496 struct ceph_auth_handshake *auth = &s->s_auth;
5497 int ret;
5498
5499 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5500 buf, buf_len);
5501 if (ret)
5502 return ret;
5503
5504 *authorizer = auth->authorizer_buf;
5505 *authorizer_len = auth->authorizer_buf_len;
5506 return 0;
5507}
5508
5509static int mds_handle_auth_done(struct ceph_connection *con,
5510 u64 global_id, void *reply, int reply_len,
5511 u8 *session_key, int *session_key_len,
5512 u8 *con_secret, int *con_secret_len)
5513{
5514 struct ceph_mds_session *s = con->private;
5515 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5516 struct ceph_auth_handshake *auth = &s->s_auth;
5517
5518 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5519 session_key, session_key_len,
5520 con_secret, con_secret_len);
5521}
5522
5523static int mds_handle_auth_bad_method(struct ceph_connection *con,
5524 int used_proto, int result,
5525 const int *allowed_protos, int proto_cnt,
5526 const int *allowed_modes, int mode_cnt)
5527{
5528 struct ceph_mds_session *s = con->private;
5529 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5530 int ret;
5531
5532 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5533 used_proto, result,
5534 allowed_protos, proto_cnt,
5535 allowed_modes, mode_cnt)) {
5536 ret = ceph_monc_validate_auth(monc);
5537 if (ret)
5538 return ret;
5539 }
5540
5541 return -EACCES;
5542}
5543
53ded495
AE
5544static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5545 struct ceph_msg_header *hdr, int *skip)
5546{
5547 struct ceph_msg *msg;
5548 int type = (int) le16_to_cpu(hdr->type);
5549 int front_len = (int) le32_to_cpu(hdr->front_len);
5550
5551 if (con->in_msg)
5552 return con->in_msg;
5553
5554 *skip = 0;
5555 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5556 if (!msg) {
5557 pr_err("unable to allocate msg type %d len %d\n",
5558 type, front_len);
5559 return NULL;
5560 }
53ded495
AE
5561
5562 return msg;
5563}
5564
79dbd1ba 5565static int mds_sign_message(struct ceph_msg *msg)
33d07337 5566{
79dbd1ba 5567 struct ceph_mds_session *s = msg->con->private;
33d07337 5568 struct ceph_auth_handshake *auth = &s->s_auth;
79dbd1ba 5569
33d07337
YZ
5570 return ceph_auth_sign_message(auth, msg);
5571}
5572
79dbd1ba 5573static int mds_check_message_signature(struct ceph_msg *msg)
33d07337 5574{
79dbd1ba 5575 struct ceph_mds_session *s = msg->con->private;
33d07337 5576 struct ceph_auth_handshake *auth = &s->s_auth;
79dbd1ba 5577
33d07337
YZ
5578 return ceph_auth_check_message_signature(auth, msg);
5579}
5580
9e32789f 5581static const struct ceph_connection_operations mds_con_ops = {
4972cf60
ID
5582 .get = mds_get_con,
5583 .put = mds_put_con,
53ded495 5584 .alloc_msg = mds_alloc_msg,
4972cf60
ID
5585 .dispatch = mds_dispatch,
5586 .peer_reset = mds_peer_reset,
5587 .get_authorizer = mds_get_authorizer,
5588 .add_authorizer_challenge = mds_add_authorizer_challenge,
5589 .verify_authorizer_reply = mds_verify_authorizer_reply,
5590 .invalidate_authorizer = mds_invalidate_authorizer,
79dbd1ba
ID
5591 .sign_message = mds_sign_message,
5592 .check_message_signature = mds_check_message_signature,
cd1a677c
ID
5593 .get_auth_request = mds_get_auth_request,
5594 .handle_auth_reply_more = mds_handle_auth_reply_more,
5595 .handle_auth_done = mds_handle_auth_done,
5596 .handle_auth_bad_method = mds_handle_auth_bad_method,
2f2dc053
SW
5597};
5598
2f2dc053 5599/* eof */