ceph: add metadata perf metric support
[linux-block.git] / fs / ceph / mds_client.c
CommitLineData
b2441318 1// SPDX-License-Identifier: GPL-2.0
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
2f2dc053 3
496e5955 4#include <linux/fs.h>
2f2dc053 5#include <linux/wait.h>
5a0e3ad6 6#include <linux/slab.h>
54008399 7#include <linux/gfp.h>
2f2dc053 8#include <linux/sched.h>
3d14c5d2
YS
9#include <linux/debugfs.h>
10#include <linux/seq_file.h>
3e0708b9 11#include <linux/ratelimit.h>
9ba1e224 12#include <linux/bits.h>
70c94820 13#include <linux/ktime.h>
2f2dc053 14
2f2dc053 15#include "super.h"
3d14c5d2
YS
16#include "mds_client.h"
17
1fe60e51 18#include <linux/ceph/ceph_features.h>
3d14c5d2
YS
19#include <linux/ceph/messenger.h>
20#include <linux/ceph/decode.h>
21#include <linux/ceph/pagelist.h>
22#include <linux/ceph/auth.h>
23#include <linux/ceph/debugfs.h>
2f2dc053 24
81c5a148
YZ
25#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26
2f2dc053
SW
27/*
28 * A cluster of MDS (metadata server) daemons is responsible for
29 * managing the file system namespace (the directory hierarchy and
30 * inodes) and for coordinating shared access to storage. Metadata is
31 * partitioning hierarchically across a number of servers, and that
32 * partition varies over time as the cluster adjusts the distribution
33 * in order to balance load.
34 *
35 * The MDS client is primarily responsible to managing synchronous
36 * metadata requests for operations like open, unlink, and so forth.
37 * If there is a MDS failure, we find out about it when we (possibly
38 * request and) receive a new MDS map, and can resubmit affected
39 * requests.
40 *
41 * For the most part, though, we take advantage of a lossless
42 * communications channel to the MDS, and do not need to worry about
43 * timing out or resubmitting requests.
44 *
45 * We maintain a stateful "session" with each MDS we interact with.
46 * Within each session, we sent periodic heartbeat messages to ensure
47 * any capabilities or leases we have been issues remain valid. If
48 * the session times out and goes stale, our leases and capabilities
49 * are no longer valid.
50 */
51
20cb34ae 52struct ceph_reconnect_state {
81c5a148
YZ
53 struct ceph_mds_session *session;
54 int nr_caps, nr_realms;
20cb34ae 55 struct ceph_pagelist *pagelist;
121f22a1 56 unsigned msg_version;
81c5a148 57 bool allow_multi;
20cb34ae
SW
58};
59
2f2dc053
SW
60static void __wake_requests(struct ceph_mds_client *mdsc,
61 struct list_head *head);
e3ec8d68 62static void ceph_cap_release_work(struct work_struct *work);
37c4efc1 63static void ceph_cap_reclaim_work(struct work_struct *work);
2f2dc053 64
9e32789f 65static const struct ceph_connection_operations mds_con_ops;
2f2dc053
SW
66
67
68/*
69 * mds reply parsing
70 */
71
b37fe1f9
YZ
72static int parse_reply_info_quota(void **p, void *end,
73 struct ceph_mds_reply_info_in *info)
74{
75 u8 struct_v, struct_compat;
76 u32 struct_len;
77
78 ceph_decode_8_safe(p, end, struct_v, bad);
79 ceph_decode_8_safe(p, end, struct_compat, bad);
80 /* struct_v is expected to be >= 1. we only
81 * understand encoding with struct_compat == 1. */
82 if (!struct_v || struct_compat != 1)
83 goto bad;
84 ceph_decode_32_safe(p, end, struct_len, bad);
85 ceph_decode_need(p, end, struct_len, bad);
86 end = *p + struct_len;
87 ceph_decode_64_safe(p, end, info->max_bytes, bad);
88 ceph_decode_64_safe(p, end, info->max_files, bad);
89 *p = end;
90 return 0;
91bad:
92 return -EIO;
93}
94
2f2dc053
SW
95/*
96 * parse individual inode info
97 */
98static int parse_reply_info_in(void **p, void *end,
14303d20 99 struct ceph_mds_reply_info_in *info,
12b4629a 100 u64 features)
2f2dc053 101{
b37fe1f9
YZ
102 int err = 0;
103 u8 struct_v = 0;
2f2dc053 104
b37fe1f9
YZ
105 if (features == (u64)-1) {
106 u32 struct_len;
107 u8 struct_compat;
108 ceph_decode_8_safe(p, end, struct_v, bad);
109 ceph_decode_8_safe(p, end, struct_compat, bad);
110 /* struct_v is expected to be >= 1. we only understand
111 * encoding with struct_compat == 1. */
112 if (!struct_v || struct_compat != 1)
113 goto bad;
114 ceph_decode_32_safe(p, end, struct_len, bad);
115 ceph_decode_need(p, end, struct_len, bad);
116 end = *p + struct_len;
117 }
118
119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
2f2dc053
SW
120 info->in = *p;
121 *p += sizeof(struct ceph_mds_reply_inode) +
122 sizeof(*info->in->fragtree.splits) *
123 le32_to_cpu(info->in->fragtree.nsplits);
124
125 ceph_decode_32_safe(p, end, info->symlink_len, bad);
126 ceph_decode_need(p, end, info->symlink_len, bad);
127 info->symlink = *p;
128 *p += info->symlink_len;
129
23c625ce
ID
130 ceph_decode_copy_safe(p, end, &info->dir_layout,
131 sizeof(info->dir_layout), bad);
2f2dc053
SW
132 ceph_decode_32_safe(p, end, info->xattr_len, bad);
133 ceph_decode_need(p, end, info->xattr_len, bad);
134 info->xattr_data = *p;
135 *p += info->xattr_len;
fb01d1f8 136
b37fe1f9
YZ
137 if (features == (u64)-1) {
138 /* inline data */
fb01d1f8
YZ
139 ceph_decode_64_safe(p, end, info->inline_version, bad);
140 ceph_decode_32_safe(p, end, info->inline_len, bad);
141 ceph_decode_need(p, end, info->inline_len, bad);
142 info->inline_data = *p;
143 *p += info->inline_len;
b37fe1f9
YZ
144 /* quota */
145 err = parse_reply_info_quota(p, end, info);
146 if (err < 0)
147 goto out_bad;
148 /* pool namespace */
149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150 if (info->pool_ns_len > 0) {
151 ceph_decode_need(p, end, info->pool_ns_len, bad);
152 info->pool_ns_data = *p;
153 *p += info->pool_ns_len;
154 }
245ce991
JL
155
156 /* btime */
157 ceph_decode_need(p, end, sizeof(info->btime), bad);
158 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
160 /* change attribute */
a35ead31 161 ceph_decode_64_safe(p, end, info->change_attr, bad);
fb01d1f8 162
08796873
YZ
163 /* dir pin */
164 if (struct_v >= 2) {
165 ceph_decode_32_safe(p, end, info->dir_pin, bad);
166 } else {
167 info->dir_pin = -ENODATA;
168 }
169
193e7b37
DD
170 /* snapshot birth time, remains zero for v<=2 */
171 if (struct_v >= 3) {
172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173 ceph_decode_copy(p, &info->snap_btime,
174 sizeof(info->snap_btime));
175 } else {
176 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177 }
178
b37fe1f9
YZ
179 *p = end;
180 } else {
181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
182 ceph_decode_64_safe(p, end, info->inline_version, bad);
183 ceph_decode_32_safe(p, end, info->inline_len, bad);
184 ceph_decode_need(p, end, info->inline_len, bad);
185 info->inline_data = *p;
186 *p += info->inline_len;
187 } else
188 info->inline_version = CEPH_INLINE_NONE;
189
190 if (features & CEPH_FEATURE_MDS_QUOTA) {
191 err = parse_reply_info_quota(p, end, info);
192 if (err < 0)
193 goto out_bad;
194 } else {
195 info->max_bytes = 0;
196 info->max_files = 0;
197 }
198
199 info->pool_ns_len = 0;
200 info->pool_ns_data = NULL;
201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
203 if (info->pool_ns_len > 0) {
204 ceph_decode_need(p, end, info->pool_ns_len, bad);
205 info->pool_ns_data = *p;
206 *p += info->pool_ns_len;
207 }
208 }
08796873 209
245ce991
JL
210 if (features & CEPH_FEATURE_FS_BTIME) {
211 ceph_decode_need(p, end, sizeof(info->btime), bad);
212 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
a35ead31 213 ceph_decode_64_safe(p, end, info->change_attr, bad);
245ce991
JL
214 }
215
08796873 216 info->dir_pin = -ENODATA;
193e7b37 217 /* info->snap_btime remains zero */
b37fe1f9
YZ
218 }
219 return 0;
220bad:
221 err = -EIO;
222out_bad:
223 return err;
224}
225
226static int parse_reply_info_dir(void **p, void *end,
227 struct ceph_mds_reply_dirfrag **dirfrag,
228 u64 features)
229{
230 if (features == (u64)-1) {
fb18a575
LH
231 u8 struct_v, struct_compat;
232 u32 struct_len;
fb18a575
LH
233 ceph_decode_8_safe(p, end, struct_v, bad);
234 ceph_decode_8_safe(p, end, struct_compat, bad);
b37fe1f9
YZ
235 /* struct_v is expected to be >= 1. we only understand
236 * encoding whose struct_compat == 1. */
237 if (!struct_v || struct_compat != 1)
fb18a575
LH
238 goto bad;
239 ceph_decode_32_safe(p, end, struct_len, bad);
240 ceph_decode_need(p, end, struct_len, bad);
b37fe1f9 241 end = *p + struct_len;
fb18a575
LH
242 }
243
b37fe1f9
YZ
244 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
245 *dirfrag = *p;
246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
247 if (unlikely(*p > end))
248 goto bad;
249 if (features == (u64)-1)
250 *p = end;
251 return 0;
252bad:
253 return -EIO;
254}
255
256static int parse_reply_info_lease(void **p, void *end,
257 struct ceph_mds_reply_lease **lease,
258 u64 features)
259{
260 if (features == (u64)-1) {
261 u8 struct_v, struct_compat;
262 u32 struct_len;
263 ceph_decode_8_safe(p, end, struct_v, bad);
264 ceph_decode_8_safe(p, end, struct_compat, bad);
265 /* struct_v is expected to be >= 1. we only understand
266 * encoding whose struct_compat == 1. */
267 if (!struct_v || struct_compat != 1)
268 goto bad;
269 ceph_decode_32_safe(p, end, struct_len, bad);
270 ceph_decode_need(p, end, struct_len, bad);
271 end = *p + struct_len;
5ea5c5e0
YZ
272 }
273
b37fe1f9
YZ
274 ceph_decode_need(p, end, sizeof(**lease), bad);
275 *lease = *p;
276 *p += sizeof(**lease);
277 if (features == (u64)-1)
278 *p = end;
2f2dc053
SW
279 return 0;
280bad:
b37fe1f9 281 return -EIO;
2f2dc053
SW
282}
283
284/*
285 * parse a normal reply, which may contain a (dir+)dentry and/or a
286 * target inode.
287 */
288static int parse_reply_info_trace(void **p, void *end,
14303d20 289 struct ceph_mds_reply_info_parsed *info,
12b4629a 290 u64 features)
2f2dc053
SW
291{
292 int err;
293
294 if (info->head->is_dentry) {
14303d20 295 err = parse_reply_info_in(p, end, &info->diri, features);
2f2dc053
SW
296 if (err < 0)
297 goto out_bad;
298
b37fe1f9
YZ
299 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
300 if (err < 0)
301 goto out_bad;
2f2dc053
SW
302
303 ceph_decode_32_safe(p, end, info->dname_len, bad);
304 ceph_decode_need(p, end, info->dname_len, bad);
305 info->dname = *p;
306 *p += info->dname_len;
b37fe1f9
YZ
307
308 err = parse_reply_info_lease(p, end, &info->dlease, features);
309 if (err < 0)
310 goto out_bad;
2f2dc053
SW
311 }
312
313 if (info->head->is_target) {
14303d20 314 err = parse_reply_info_in(p, end, &info->targeti, features);
2f2dc053
SW
315 if (err < 0)
316 goto out_bad;
317 }
318
319 if (unlikely(*p != end))
320 goto bad;
321 return 0;
322
323bad:
324 err = -EIO;
325out_bad:
326 pr_err("problem parsing mds trace %d\n", err);
327 return err;
328}
329
330/*
331 * parse readdir results
332 */
b37fe1f9 333static int parse_reply_info_readdir(void **p, void *end,
14303d20 334 struct ceph_mds_reply_info_parsed *info,
12b4629a 335 u64 features)
2f2dc053
SW
336{
337 u32 num, i = 0;
338 int err;
339
b37fe1f9
YZ
340 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
341 if (err < 0)
342 goto out_bad;
2f2dc053
SW
343
344 ceph_decode_need(p, end, sizeof(num) + 2, bad);
c89136ea 345 num = ceph_decode_32(p);
956d39d6
YZ
346 {
347 u16 flags = ceph_decode_16(p);
348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
f3c4ebe6 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
79162547 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
956d39d6 352 }
2f2dc053
SW
353 if (num == 0)
354 goto done;
355
2a5beea3
YZ
356 BUG_ON(!info->dir_entries);
357 if ((unsigned long)(info->dir_entries + num) >
358 (unsigned long)info->dir_entries + info->dir_buf_size) {
54008399
YZ
359 pr_err("dir contents are larger than expected\n");
360 WARN_ON(1);
361 goto bad;
362 }
2f2dc053 363
54008399 364 info->dir_nr = num;
2f2dc053 365 while (num) {
2a5beea3 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
2f2dc053 367 /* dentry */
b37fe1f9 368 ceph_decode_32_safe(p, end, rde->name_len, bad);
2a5beea3
YZ
369 ceph_decode_need(p, end, rde->name_len, bad);
370 rde->name = *p;
371 *p += rde->name_len;
372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
2f2dc053 373
b37fe1f9
YZ
374 /* dentry lease */
375 err = parse_reply_info_lease(p, end, &rde->lease, features);
376 if (err)
377 goto out_bad;
2f2dc053 378 /* inode */
2a5beea3 379 err = parse_reply_info_in(p, end, &rde->inode, features);
2f2dc053
SW
380 if (err < 0)
381 goto out_bad;
8974eebd
YZ
382 /* ceph_readdir_prepopulate() will update it */
383 rde->offset = 0;
2f2dc053
SW
384 i++;
385 num--;
386 }
387
388done:
1d3f8723
JL
389 /* Skip over any unrecognized fields */
390 *p = end;
2f2dc053
SW
391 return 0;
392
393bad:
394 err = -EIO;
395out_bad:
396 pr_err("problem parsing dir contents %d\n", err);
397 return err;
398}
399
25933abd
HS
400/*
401 * parse fcntl F_GETLK results
402 */
403static int parse_reply_info_filelock(void **p, void *end,
14303d20 404 struct ceph_mds_reply_info_parsed *info,
12b4629a 405 u64 features)
25933abd
HS
406{
407 if (*p + sizeof(*info->filelock_reply) > end)
408 goto bad;
409
410 info->filelock_reply = *p;
25933abd 411
1d3f8723
JL
412 /* Skip over any unrecognized fields */
413 *p = end;
25933abd 414 return 0;
25933abd
HS
415bad:
416 return -EIO;
417}
418
d4846487
JL
419
420#if BITS_PER_LONG == 64
421
422#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
423
424static int ceph_parse_deleg_inos(void **p, void *end,
425 struct ceph_mds_session *s)
426{
427 u32 sets;
428
429 ceph_decode_32_safe(p, end, sets, bad);
430 dout("got %u sets of delegated inodes\n", sets);
431 while (sets--) {
432 u64 start, len, ino;
433
434 ceph_decode_64_safe(p, end, start, bad);
435 ceph_decode_64_safe(p, end, len, bad);
436 while (len--) {
437 int err = xa_insert(&s->s_delegated_inos, ino = start++,
438 DELEGATED_INO_AVAILABLE,
439 GFP_KERNEL);
440 if (!err) {
441 dout("added delegated inode 0x%llx\n",
442 start - 1);
443 } else if (err == -EBUSY) {
444 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
445 start - 1);
446 } else {
447 return err;
448 }
449 }
450 }
451 return 0;
452bad:
453 return -EIO;
454}
455
456u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
457{
458 unsigned long ino;
459 void *val;
460
461 xa_for_each(&s->s_delegated_inos, ino, val) {
462 val = xa_erase(&s->s_delegated_inos, ino);
463 if (val == DELEGATED_INO_AVAILABLE)
464 return ino;
465 }
466 return 0;
467}
468
469int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
470{
471 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
472 GFP_KERNEL);
473}
474#else /* BITS_PER_LONG == 64 */
475/*
476 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
477 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
478 * and bottom words?
479 */
480static int ceph_parse_deleg_inos(void **p, void *end,
481 struct ceph_mds_session *s)
482{
483 u32 sets;
484
485 ceph_decode_32_safe(p, end, sets, bad);
486 if (sets)
487 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
488 return 0;
489bad:
490 return -EIO;
491}
492
493u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
494{
495 return 0;
496}
497
498int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
499{
500 return 0;
501}
502#endif /* BITS_PER_LONG == 64 */
503
6e8575fa
SL
504/*
505 * parse create results
506 */
507static int parse_reply_info_create(void **p, void *end,
508 struct ceph_mds_reply_info_parsed *info,
d4846487 509 u64 features, struct ceph_mds_session *s)
6e8575fa 510{
d4846487
JL
511 int ret;
512
b37fe1f9
YZ
513 if (features == (u64)-1 ||
514 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
6e8575fa 515 if (*p == end) {
d4846487 516 /* Malformed reply? */
6e8575fa 517 info->has_create_ino = false;
d4846487
JL
518 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
519 u8 struct_v, struct_compat;
520 u32 len;
521
6e8575fa 522 info->has_create_ino = true;
d4846487
JL
523 ceph_decode_8_safe(p, end, struct_v, bad);
524 ceph_decode_8_safe(p, end, struct_compat, bad);
525 ceph_decode_32_safe(p, end, len, bad);
526 ceph_decode_64_safe(p, end, info->ino, bad);
527 ret = ceph_parse_deleg_inos(p, end, s);
528 if (ret)
529 return ret;
530 } else {
531 /* legacy */
1d3f8723 532 ceph_decode_64_safe(p, end, info->ino, bad);
d4846487 533 info->has_create_ino = true;
6e8575fa 534 }
1d3f8723
JL
535 } else {
536 if (*p != end)
537 goto bad;
6e8575fa
SL
538 }
539
1d3f8723
JL
540 /* Skip over any unrecognized fields */
541 *p = end;
6e8575fa 542 return 0;
6e8575fa
SL
543bad:
544 return -EIO;
545}
546
25933abd
HS
547/*
548 * parse extra results
549 */
550static int parse_reply_info_extra(void **p, void *end,
14303d20 551 struct ceph_mds_reply_info_parsed *info,
d4846487 552 u64 features, struct ceph_mds_session *s)
25933abd 553{
6df8c9d8
JL
554 u32 op = le32_to_cpu(info->head->op);
555
556 if (op == CEPH_MDS_OP_GETFILELOCK)
14303d20 557 return parse_reply_info_filelock(p, end, info, features);
6df8c9d8 558 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
b37fe1f9 559 return parse_reply_info_readdir(p, end, info, features);
6df8c9d8 560 else if (op == CEPH_MDS_OP_CREATE)
d4846487 561 return parse_reply_info_create(p, end, info, features, s);
6e8575fa
SL
562 else
563 return -EIO;
25933abd
HS
564}
565
2f2dc053
SW
566/*
567 * parse entire mds reply
568 */
d4846487 569static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
14303d20 570 struct ceph_mds_reply_info_parsed *info,
12b4629a 571 u64 features)
2f2dc053
SW
572{
573 void *p, *end;
574 u32 len;
575 int err;
576
577 info->head = msg->front.iov_base;
578 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
579 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
580
581 /* trace */
582 ceph_decode_32_safe(&p, end, len, bad);
583 if (len > 0) {
32852a81 584 ceph_decode_need(&p, end, len, bad);
14303d20 585 err = parse_reply_info_trace(&p, p+len, info, features);
2f2dc053
SW
586 if (err < 0)
587 goto out_bad;
588 }
589
25933abd 590 /* extra */
2f2dc053
SW
591 ceph_decode_32_safe(&p, end, len, bad);
592 if (len > 0) {
32852a81 593 ceph_decode_need(&p, end, len, bad);
d4846487 594 err = parse_reply_info_extra(&p, p+len, info, features, s);
2f2dc053
SW
595 if (err < 0)
596 goto out_bad;
597 }
598
599 /* snap blob */
600 ceph_decode_32_safe(&p, end, len, bad);
601 info->snapblob_len = len;
602 info->snapblob = p;
603 p += len;
604
605 if (p != end)
606 goto bad;
607 return 0;
608
609bad:
610 err = -EIO;
611out_bad:
612 pr_err("mds parse_reply err %d\n", err);
613 return err;
614}
615
616static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
617{
2a5beea3 618 if (!info->dir_entries)
54008399 619 return;
2a5beea3 620 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
2f2dc053
SW
621}
622
623
624/*
625 * sessions
626 */
a687ecaf 627const char *ceph_session_state_name(int s)
2f2dc053
SW
628{
629 switch (s) {
630 case CEPH_MDS_SESSION_NEW: return "new";
631 case CEPH_MDS_SESSION_OPENING: return "opening";
632 case CEPH_MDS_SESSION_OPEN: return "open";
633 case CEPH_MDS_SESSION_HUNG: return "hung";
634 case CEPH_MDS_SESSION_CLOSING: return "closing";
4d681c2f 635 case CEPH_MDS_SESSION_CLOSED: return "closed";
44ca18f2 636 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
2f2dc053 637 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
fcff415c 638 case CEPH_MDS_SESSION_REJECTED: return "rejected";
2f2dc053
SW
639 default: return "???";
640 }
641}
642
5b3248c6 643struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
2f2dc053 644{
3997c01d 645 if (refcount_inc_not_zero(&s->s_ref)) {
2f2dc053 646 dout("mdsc get_session %p %d -> %d\n", s,
3997c01d 647 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
2f2dc053
SW
648 return s;
649 } else {
4c069a58 650 dout("mdsc get_session %p 0 -- FAIL\n", s);
2f2dc053
SW
651 return NULL;
652 }
653}
654
655void ceph_put_mds_session(struct ceph_mds_session *s)
656{
657 dout("mdsc put_session %p %d -> %d\n", s,
3997c01d
ER
658 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
659 if (refcount_dec_and_test(&s->s_ref)) {
6c4a1915 660 if (s->s_auth.authorizer)
6c1ea260 661 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
d4846487 662 xa_destroy(&s->s_delegated_inos);
2f2dc053 663 kfree(s);
4e7a5dcd 664 }
2f2dc053
SW
665}
666
667/*
668 * called under mdsc->mutex
669 */
670struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
671 int mds)
672{
d37b1d99 673 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
2f2dc053 674 return NULL;
5b3248c6 675 return ceph_get_mds_session(mdsc->sessions[mds]);
2f2dc053
SW
676}
677
678static bool __have_session(struct ceph_mds_client *mdsc, int mds)
679{
98cfda81 680 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
2f2dc053 681 return false;
98cfda81
CX
682 else
683 return true;
2f2dc053
SW
684}
685
2600d2dd
SW
686static int __verify_registered_session(struct ceph_mds_client *mdsc,
687 struct ceph_mds_session *s)
688{
689 if (s->s_mds >= mdsc->max_sessions ||
690 mdsc->sessions[s->s_mds] != s)
691 return -ENOENT;
692 return 0;
693}
694
2f2dc053
SW
695/*
696 * create+register a new session for given mds.
697 * called under mdsc->mutex.
698 */
699static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
700 int mds)
701{
702 struct ceph_mds_session *s;
703
b38c9eb4 704 if (mds >= mdsc->mdsmap->possible_max_rank)
c338c07c
NY
705 return ERR_PTR(-EINVAL);
706
2f2dc053 707 s = kzalloc(sizeof(*s), GFP_NOFS);
4736b009
DC
708 if (!s)
709 return ERR_PTR(-ENOMEM);
47474d0b
CX
710
711 if (mds >= mdsc->max_sessions) {
712 int newmax = 1 << get_count_order(mds + 1);
713 struct ceph_mds_session **sa;
714
715 dout("%s: realloc to %d\n", __func__, newmax);
716 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
717 if (!sa)
718 goto fail_realloc;
719 if (mdsc->sessions) {
720 memcpy(sa, mdsc->sessions,
721 mdsc->max_sessions * sizeof(void *));
722 kfree(mdsc->sessions);
723 }
724 mdsc->sessions = sa;
725 mdsc->max_sessions = newmax;
726 }
727
728 dout("%s: mds%d\n", __func__, mds);
2f2dc053
SW
729 s->s_mdsc = mdsc;
730 s->s_mds = mds;
731 s->s_state = CEPH_MDS_SESSION_NEW;
732 s->s_ttl = 0;
733 s->s_seq = 0;
734 mutex_init(&s->s_mutex);
735
b7a9e5dd 736 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
2f2dc053 737
d8fb02ab 738 spin_lock_init(&s->s_gen_ttl_lock);
1e9c2eb6 739 s->s_cap_gen = 1;
1ce208a6 740 s->s_cap_ttl = jiffies - 1;
d8fb02ab
AE
741
742 spin_lock_init(&s->s_cap_lock);
2f2dc053
SW
743 s->s_renew_requested = 0;
744 s->s_renew_seq = 0;
745 INIT_LIST_HEAD(&s->s_caps);
746 s->s_nr_caps = 0;
3997c01d 747 refcount_set(&s->s_ref, 1);
2f2dc053
SW
748 INIT_LIST_HEAD(&s->s_waiting);
749 INIT_LIST_HEAD(&s->s_unsafe);
d4846487 750 xa_init(&s->s_delegated_inos);
2f2dc053 751 s->s_num_cap_releases = 0;
99a9c273 752 s->s_cap_reconnect = 0;
7c1332b8 753 s->s_cap_iterator = NULL;
2f2dc053 754 INIT_LIST_HEAD(&s->s_cap_releases);
e3ec8d68
YZ
755 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
756
2f2dc053 757 INIT_LIST_HEAD(&s->s_cap_flushing);
2f2dc053 758
2f2dc053 759 mdsc->sessions[mds] = s;
86d8f67b 760 atomic_inc(&mdsc->num_sessions);
3997c01d 761 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
42ce56e5 762
b7a9e5dd
SW
763 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
764 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
42ce56e5 765
2f2dc053 766 return s;
42ce56e5
SW
767
768fail_realloc:
769 kfree(s);
770 return ERR_PTR(-ENOMEM);
2f2dc053
SW
771}
772
773/*
774 * called under mdsc->mutex
775 */
2600d2dd 776static void __unregister_session(struct ceph_mds_client *mdsc,
42ce56e5 777 struct ceph_mds_session *s)
2f2dc053 778{
2600d2dd
SW
779 dout("__unregister_session mds%d %p\n", s->s_mds, s);
780 BUG_ON(mdsc->sessions[s->s_mds] != s);
42ce56e5
SW
781 mdsc->sessions[s->s_mds] = NULL;
782 ceph_con_close(&s->s_con);
783 ceph_put_mds_session(s);
86d8f67b 784 atomic_dec(&mdsc->num_sessions);
2f2dc053
SW
785}
786
787/*
788 * drop session refs in request.
789 *
790 * should be last request ref, or hold mdsc->mutex
791 */
792static void put_request_session(struct ceph_mds_request *req)
793{
794 if (req->r_session) {
795 ceph_put_mds_session(req->r_session);
796 req->r_session = NULL;
797 }
798}
799
153c8e6b 800void ceph_mdsc_release_request(struct kref *kref)
2f2dc053 801{
153c8e6b
SW
802 struct ceph_mds_request *req = container_of(kref,
803 struct ceph_mds_request,
804 r_kref);
a25949b9 805 ceph_mdsc_release_dir_caps(req);
54008399 806 destroy_reply_info(&req->r_reply_info);
153c8e6b
SW
807 if (req->r_request)
808 ceph_msg_put(req->r_request);
54008399 809 if (req->r_reply)
153c8e6b 810 ceph_msg_put(req->r_reply);
153c8e6b 811 if (req->r_inode) {
41b02e1f 812 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3e1d0452
YZ
813 /* avoid calling iput_final() in mds dispatch threads */
814 ceph_async_iput(req->r_inode);
153c8e6b 815 }
9c1c2b35 816 if (req->r_parent) {
3dd69aab 817 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
9c1c2b35
JL
818 ceph_async_iput(req->r_parent);
819 }
3e1d0452 820 ceph_async_iput(req->r_target_inode);
153c8e6b
SW
821 if (req->r_dentry)
822 dput(req->r_dentry);
844d87c3
SW
823 if (req->r_old_dentry)
824 dput(req->r_old_dentry);
825 if (req->r_old_dentry_dir) {
41b02e1f
SW
826 /*
827 * track (and drop pins for) r_old_dentry_dir
828 * separately, since r_old_dentry's d_parent may have
829 * changed between the dir mutex being dropped and
830 * this request being freed.
831 */
832 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
833 CEPH_CAP_PIN);
3e1d0452 834 ceph_async_iput(req->r_old_dentry_dir);
2f2dc053 835 }
153c8e6b
SW
836 kfree(req->r_path1);
837 kfree(req->r_path2);
25e6bae3
YZ
838 if (req->r_pagelist)
839 ceph_pagelist_release(req->r_pagelist);
153c8e6b 840 put_request_session(req);
37151668 841 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
428138c9 842 WARN_ON_ONCE(!list_empty(&req->r_wait));
058daab7 843 kmem_cache_free(ceph_mds_request_cachep, req);
2f2dc053
SW
844}
845
fcd00b68
ID
846DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
847
2f2dc053
SW
848/*
849 * lookup session, bump ref if found.
850 *
851 * called under mdsc->mutex.
852 */
fcd00b68
ID
853static struct ceph_mds_request *
854lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
2f2dc053
SW
855{
856 struct ceph_mds_request *req;
44ca18f2 857
fcd00b68
ID
858 req = lookup_request(&mdsc->request_tree, tid);
859 if (req)
860 ceph_mdsc_get_request(req);
44ca18f2 861
fcd00b68 862 return req;
2f2dc053
SW
863}
864
865/*
866 * Register an in-flight request, and assign a tid. Link to directory
867 * are modifying (if any).
868 *
869 * Called under mdsc->mutex.
870 */
871static void __register_request(struct ceph_mds_client *mdsc,
872 struct ceph_mds_request *req,
873 struct inode *dir)
874{
e30ee581
ZZ
875 int ret = 0;
876
2f2dc053 877 req->r_tid = ++mdsc->last_tid;
e30ee581
ZZ
878 if (req->r_num_caps) {
879 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
880 req->r_num_caps);
881 if (ret < 0) {
882 pr_err("__register_request %p "
883 "failed to reserve caps: %d\n", req, ret);
884 /* set req->r_err to fail early from __do_request */
885 req->r_err = ret;
886 return;
887 }
888 }
2f2dc053
SW
889 dout("__register_request %p tid %lld\n", req, req->r_tid);
890 ceph_mdsc_get_request(req);
fcd00b68 891 insert_request(&mdsc->request_tree, req);
2f2dc053 892
cb4276cc
SW
893 req->r_uid = current_fsuid();
894 req->r_gid = current_fsgid();
895
e8a7b8b1
YZ
896 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
897 mdsc->oldest_tid = req->r_tid;
898
2f2dc053 899 if (dir) {
3db0a2fc
JL
900 struct ceph_inode_info *ci = ceph_inode(dir);
901
3b663780 902 ihold(dir);
2f2dc053 903 req->r_unsafe_dir = dir;
3db0a2fc
JL
904 spin_lock(&ci->i_unsafe_lock);
905 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
906 spin_unlock(&ci->i_unsafe_lock);
2f2dc053
SW
907 }
908}
909
910static void __unregister_request(struct ceph_mds_client *mdsc,
911 struct ceph_mds_request *req)
912{
913 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
e8a7b8b1 914
df963ea8
JL
915 /* Never leave an unregistered request on an unsafe list! */
916 list_del_init(&req->r_unsafe_item);
917
e8a7b8b1
YZ
918 if (req->r_tid == mdsc->oldest_tid) {
919 struct rb_node *p = rb_next(&req->r_node);
920 mdsc->oldest_tid = 0;
921 while (p) {
922 struct ceph_mds_request *next_req =
923 rb_entry(p, struct ceph_mds_request, r_node);
924 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
925 mdsc->oldest_tid = next_req->r_tid;
926 break;
927 }
928 p = rb_next(p);
929 }
930 }
931
fcd00b68 932 erase_request(&mdsc->request_tree, req);
2f2dc053 933
3db0a2fc 934 if (req->r_unsafe_dir) {
2f2dc053 935 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
2f2dc053
SW
936 spin_lock(&ci->i_unsafe_lock);
937 list_del_init(&req->r_unsafe_dir_item);
938 spin_unlock(&ci->i_unsafe_lock);
4c06ace8 939 }
bc2de10d
JL
940 if (req->r_target_inode &&
941 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
68cd5b4b
YZ
942 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
943 spin_lock(&ci->i_unsafe_lock);
944 list_del_init(&req->r_unsafe_target_item);
945 spin_unlock(&ci->i_unsafe_lock);
946 }
3b663780 947
4c06ace8 948 if (req->r_unsafe_dir) {
3e1d0452
YZ
949 /* avoid calling iput_final() in mds dispatch threads */
950 ceph_async_iput(req->r_unsafe_dir);
3b663780 951 req->r_unsafe_dir = NULL;
2f2dc053 952 }
94aa8ae1 953
fc55d2c9
YZ
954 complete_all(&req->r_safe_completion);
955
94aa8ae1 956 ceph_mdsc_put_request(req);
2f2dc053
SW
957}
958
30c71233
JL
959/*
960 * Walk back up the dentry tree until we hit a dentry representing a
961 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
962 * when calling this) to ensure that the objects won't disappear while we're
963 * working with them. Once we hit a candidate dentry, we attempt to take a
964 * reference to it, and return that as the result.
965 */
f1075480
DC
966static struct inode *get_nonsnap_parent(struct dentry *dentry)
967{
968 struct inode *inode = NULL;
30c71233
JL
969
970 while (dentry && !IS_ROOT(dentry)) {
971 inode = d_inode_rcu(dentry);
972 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
973 break;
974 dentry = dentry->d_parent;
975 }
976 if (inode)
977 inode = igrab(inode);
978 return inode;
979}
980
2f2dc053
SW
981/*
982 * Choose mds to send request to next. If there is a hint set in the
983 * request (e.g., due to a prior forward hint from the mds), use that.
984 * Otherwise, consult frag tree and/or caps to identify the
985 * appropriate mds. If all else fails, choose randomly.
986 *
987 * Called under mdsc->mutex.
988 */
989static int __choose_mds(struct ceph_mds_client *mdsc,
c4853e97
XL
990 struct ceph_mds_request *req,
991 bool *random)
2f2dc053
SW
992{
993 struct inode *inode;
994 struct ceph_inode_info *ci;
995 struct ceph_cap *cap;
996 int mode = req->r_direct_mode;
997 int mds = -1;
998 u32 hash = req->r_direct_hash;
bc2de10d 999 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
2f2dc053 1000
c4853e97
XL
1001 if (random)
1002 *random = false;
1003
2f2dc053
SW
1004 /*
1005 * is there a specific mds we should try? ignore hint if we have
1006 * no session and the mds is not up (active or recovering).
1007 */
1008 if (req->r_resend_mds >= 0 &&
1009 (__have_session(mdsc, req->r_resend_mds) ||
1010 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
3c802092 1011 dout("%s using resend_mds mds%d\n", __func__,
2f2dc053
SW
1012 req->r_resend_mds);
1013 return req->r_resend_mds;
1014 }
1015
1016 if (mode == USE_RANDOM_MDS)
1017 goto random;
1018
1019 inode = NULL;
1020 if (req->r_inode) {
5d37ca14
YZ
1021 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1022 inode = req->r_inode;
1023 ihold(inode);
1024 } else {
38f340cc
YZ
1025 /* req->r_dentry is non-null for LSSNAP request */
1026 rcu_read_lock();
1027 inode = get_nonsnap_parent(req->r_dentry);
1028 rcu_read_unlock();
3c802092 1029 dout("%s using snapdir's parent %p\n", __func__, inode);
5d37ca14 1030 }
38f340cc 1031 } else if (req->r_dentry) {
d79698da 1032 /* ignore race with rename; old or new d_parent is okay */
30c71233
JL
1033 struct dentry *parent;
1034 struct inode *dir;
1035
1036 rcu_read_lock();
41883ba8 1037 parent = READ_ONCE(req->r_dentry->d_parent);
3dd69aab 1038 dir = req->r_parent ? : d_inode_rcu(parent);
eb6bb1c5 1039
30c71233
JL
1040 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1041 /* not this fs or parent went negative */
2b0143b5 1042 inode = d_inode(req->r_dentry);
30c71233
JL
1043 if (inode)
1044 ihold(inode);
eb6bb1c5
SW
1045 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1046 /* direct snapped/virtual snapdir requests
1047 * based on parent dir inode */
30c71233 1048 inode = get_nonsnap_parent(parent);
3c802092 1049 dout("%s using nonsnap parent %p\n", __func__, inode);
ca18bede 1050 } else {
eb6bb1c5 1051 /* dentry target */
2b0143b5 1052 inode = d_inode(req->r_dentry);
ca18bede
YZ
1053 if (!inode || mode == USE_AUTH_MDS) {
1054 /* dir + name */
30c71233 1055 inode = igrab(dir);
ca18bede
YZ
1056 hash = ceph_dentry_hash(dir, req->r_dentry);
1057 is_hash = true;
30c71233
JL
1058 } else {
1059 ihold(inode);
ca18bede 1060 }
2f2dc053 1061 }
30c71233 1062 rcu_read_unlock();
2f2dc053 1063 }
eb6bb1c5 1064
3c802092
XL
1065 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1066 hash, mode);
2f2dc053
SW
1067 if (!inode)
1068 goto random;
1069 ci = ceph_inode(inode);
1070
1071 if (is_hash && S_ISDIR(inode->i_mode)) {
1072 struct ceph_inode_frag frag;
1073 int found;
1074
1075 ceph_choose_frag(ci, hash, &frag, &found);
1076 if (found) {
1077 if (mode == USE_ANY_MDS && frag.ndist > 0) {
1078 u8 r;
1079
1080 /* choose a random replica */
1081 get_random_bytes(&r, 1);
1082 r %= frag.ndist;
1083 mds = frag.dist[r];
3c802092
XL
1084 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1085 __func__, inode, ceph_vinop(inode),
1086 frag.frag, mds, (int)r, frag.ndist);
d66bbd44 1087 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
5d47648f
XL
1088 CEPH_MDS_STATE_ACTIVE &&
1089 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
30c71233 1090 goto out;
2f2dc053
SW
1091 }
1092
1093 /* since this file/dir wasn't known to be
1094 * replicated, then we want to look for the
1095 * authoritative mds. */
2f2dc053
SW
1096 if (frag.mds >= 0) {
1097 /* choose auth mds */
1098 mds = frag.mds;
3c802092
XL
1099 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1100 __func__, inode, ceph_vinop(inode),
1101 frag.frag, mds);
d66bbd44 1102 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
5d47648f
XL
1103 CEPH_MDS_STATE_ACTIVE) {
1104 if (mode == USE_ANY_MDS &&
1105 !ceph_mdsmap_is_laggy(mdsc->mdsmap,
1106 mds))
1107 goto out;
1108 }
2f2dc053 1109 }
5d47648f 1110 mode = USE_AUTH_MDS;
2f2dc053
SW
1111 }
1112 }
1113
be655596 1114 spin_lock(&ci->i_ceph_lock);
2f2dc053
SW
1115 cap = NULL;
1116 if (mode == USE_AUTH_MDS)
1117 cap = ci->i_auth_cap;
1118 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1119 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1120 if (!cap) {
be655596 1121 spin_unlock(&ci->i_ceph_lock);
3e1d0452 1122 ceph_async_iput(inode);
2f2dc053
SW
1123 goto random;
1124 }
1125 mds = cap->session->s_mds;
3c802092 1126 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
2f2dc053
SW
1127 inode, ceph_vinop(inode), mds,
1128 cap == ci->i_auth_cap ? "auth " : "", cap);
be655596 1129 spin_unlock(&ci->i_ceph_lock);
30c71233 1130out:
3e1d0452
YZ
1131 /* avoid calling iput_final() while holding mdsc->mutex or
1132 * in mds dispatch threads */
1133 ceph_async_iput(inode);
2f2dc053
SW
1134 return mds;
1135
1136random:
c4853e97
XL
1137 if (random)
1138 *random = true;
1139
2f2dc053 1140 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
3c802092 1141 dout("%s chose random mds%d\n", __func__, mds);
2f2dc053
SW
1142 return mds;
1143}
1144
1145
1146/*
1147 * session messages
1148 */
1149static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1150{
1151 struct ceph_msg *msg;
1152 struct ceph_mds_session_head *h;
1153
b61c2763
SW
1154 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1155 false);
a79832f2 1156 if (!msg) {
2f2dc053 1157 pr_err("create_session_msg ENOMEM creating msg\n");
a79832f2 1158 return NULL;
2f2dc053
SW
1159 }
1160 h = msg->front.iov_base;
1161 h->op = cpu_to_le32(op);
1162 h->seq = cpu_to_le64(seq);
dbd0c8bf
JS
1163
1164 return msg;
1165}
1166
9ba1e224
XL
1167static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1168#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
342ce182
YZ
1169static void encode_supported_features(void **p, void *end)
1170{
9ba1e224 1171 static const size_t count = ARRAY_SIZE(feature_bits);
342ce182
YZ
1172
1173 if (count > 0) {
1174 size_t i;
9ba1e224 1175 size_t size = FEATURE_BYTES(count);
342ce182
YZ
1176
1177 BUG_ON(*p + 4 + size > end);
1178 ceph_encode_32(p, size);
1179 memset(*p, 0, size);
1180 for (i = 0; i < count; i++)
9ba1e224 1181 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
342ce182
YZ
1182 *p += size;
1183 } else {
1184 BUG_ON(*p + 4 > end);
1185 ceph_encode_32(p, 0);
1186 }
1187}
1188
dbd0c8bf
JS
1189/*
1190 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1191 * to include additional client metadata fields.
1192 */
1193static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1194{
1195 struct ceph_msg *msg;
1196 struct ceph_mds_session_head *h;
1197 int i = -1;
342ce182 1198 int extra_bytes = 0;
dbd0c8bf
JS
1199 int metadata_key_count = 0;
1200 struct ceph_options *opt = mdsc->fsc->client->options;
3f384954 1201 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
9ba1e224 1202 size_t size, count;
342ce182 1203 void *p, *end;
dbd0c8bf 1204
a6a5ce4f 1205 const char* metadata[][2] = {
717e6f28
YZ
1206 {"hostname", mdsc->nodename},
1207 {"kernel_version", init_utsname()->release},
3f384954
YZ
1208 {"entity_id", opt->name ? : ""},
1209 {"root", fsopt->server_path ? : "/"},
dbd0c8bf
JS
1210 {NULL, NULL}
1211 };
1212
1213 /* Calculate serialized length of metadata */
342ce182 1214 extra_bytes = 4; /* map length */
d37b1d99 1215 for (i = 0; metadata[i][0]; ++i) {
342ce182 1216 extra_bytes += 8 + strlen(metadata[i][0]) +
dbd0c8bf
JS
1217 strlen(metadata[i][1]);
1218 metadata_key_count++;
1219 }
9ba1e224 1220
342ce182 1221 /* supported feature */
9ba1e224
XL
1222 size = 0;
1223 count = ARRAY_SIZE(feature_bits);
1224 if (count > 0)
1225 size = FEATURE_BYTES(count);
1226 extra_bytes += 4 + size;
dbd0c8bf
JS
1227
1228 /* Allocate the message */
342ce182 1229 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
dbd0c8bf
JS
1230 GFP_NOFS, false);
1231 if (!msg) {
1232 pr_err("create_session_msg ENOMEM creating msg\n");
1233 return NULL;
1234 }
342ce182
YZ
1235 p = msg->front.iov_base;
1236 end = p + msg->front.iov_len;
1237
1238 h = p;
dbd0c8bf
JS
1239 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1240 h->seq = cpu_to_le64(seq);
1241
1242 /*
1243 * Serialize client metadata into waiting buffer space, using
1244 * the format that userspace expects for map<string, string>
7cfa0313 1245 *
9ba1e224 1246 * ClientSession messages with metadata are v3
dbd0c8bf 1247 */
342ce182 1248 msg->hdr.version = cpu_to_le16(3);
7cfa0313 1249 msg->hdr.compat_version = cpu_to_le16(1);
dbd0c8bf
JS
1250
1251 /* The write pointer, following the session_head structure */
342ce182 1252 p += sizeof(*h);
dbd0c8bf
JS
1253
1254 /* Number of entries in the map */
1255 ceph_encode_32(&p, metadata_key_count);
1256
1257 /* Two length-prefixed strings for each entry in the map */
d37b1d99 1258 for (i = 0; metadata[i][0]; ++i) {
dbd0c8bf
JS
1259 size_t const key_len = strlen(metadata[i][0]);
1260 size_t const val_len = strlen(metadata[i][1]);
1261
1262 ceph_encode_32(&p, key_len);
1263 memcpy(p, metadata[i][0], key_len);
1264 p += key_len;
1265 ceph_encode_32(&p, val_len);
1266 memcpy(p, metadata[i][1], val_len);
1267 p += val_len;
1268 }
1269
342ce182
YZ
1270 encode_supported_features(&p, end);
1271 msg->front.iov_len = p - msg->front.iov_base;
1272 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1273
2f2dc053
SW
1274 return msg;
1275}
1276
1277/*
1278 * send session open request.
1279 *
1280 * called under mdsc->mutex
1281 */
1282static int __open_session(struct ceph_mds_client *mdsc,
1283 struct ceph_mds_session *session)
1284{
1285 struct ceph_msg *msg;
1286 int mstate;
1287 int mds = session->s_mds;
2f2dc053
SW
1288
1289 /* wait for mds to go active? */
1290 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1291 dout("open_session to mds%d (%s)\n", mds,
1292 ceph_mds_state_name(mstate));
1293 session->s_state = CEPH_MDS_SESSION_OPENING;
1294 session->s_renew_requested = jiffies;
1295
1296 /* send connect message */
dbd0c8bf 1297 msg = create_session_open_msg(mdsc, session->s_seq);
a79832f2
SW
1298 if (!msg)
1299 return -ENOMEM;
2f2dc053 1300 ceph_con_send(&session->s_con, msg);
2f2dc053
SW
1301 return 0;
1302}
1303
ed0552a1
SW
1304/*
1305 * open sessions for any export targets for the given mds
1306 *
1307 * called under mdsc->mutex
1308 */
5d72d13c
YZ
1309static struct ceph_mds_session *
1310__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1311{
1312 struct ceph_mds_session *session;
1313
1314 session = __ceph_lookup_mds_session(mdsc, target);
1315 if (!session) {
1316 session = register_session(mdsc, target);
1317 if (IS_ERR(session))
1318 return session;
1319 }
1320 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1321 session->s_state == CEPH_MDS_SESSION_CLOSING)
1322 __open_session(mdsc, session);
1323
1324 return session;
1325}
1326
1327struct ceph_mds_session *
1328ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1329{
1330 struct ceph_mds_session *session;
1331
1332 dout("open_export_target_session to mds%d\n", target);
1333
1334 mutex_lock(&mdsc->mutex);
1335 session = __open_export_target_session(mdsc, target);
1336 mutex_unlock(&mdsc->mutex);
1337
1338 return session;
1339}
1340
ed0552a1
SW
1341static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1342 struct ceph_mds_session *session)
1343{
1344 struct ceph_mds_info *mi;
1345 struct ceph_mds_session *ts;
1346 int i, mds = session->s_mds;
ed0552a1 1347
b38c9eb4 1348 if (mds >= mdsc->mdsmap->possible_max_rank)
ed0552a1 1349 return;
5d72d13c 1350
ed0552a1
SW
1351 mi = &mdsc->mdsmap->m_info[mds];
1352 dout("open_export_target_sessions for mds%d (%d targets)\n",
1353 session->s_mds, mi->num_export_targets);
1354
1355 for (i = 0; i < mi->num_export_targets; i++) {
5d72d13c
YZ
1356 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1357 if (!IS_ERR(ts))
1358 ceph_put_mds_session(ts);
ed0552a1
SW
1359 }
1360}
1361
154f42c2
SW
1362void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1363 struct ceph_mds_session *session)
1364{
1365 mutex_lock(&mdsc->mutex);
1366 __open_export_target_sessions(mdsc, session);
1367 mutex_unlock(&mdsc->mutex);
1368}
1369
2f2dc053
SW
1370/*
1371 * session caps
1372 */
1373
c8a96a31
JL
1374static void detach_cap_releases(struct ceph_mds_session *session,
1375 struct list_head *target)
2f2dc053 1376{
c8a96a31
JL
1377 lockdep_assert_held(&session->s_cap_lock);
1378
1379 list_splice_init(&session->s_cap_releases, target);
745a8e3b 1380 session->s_num_cap_releases = 0;
c8a96a31
JL
1381 dout("dispose_cap_releases mds%d\n", session->s_mds);
1382}
2f2dc053 1383
c8a96a31
JL
1384static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1385 struct list_head *dispose)
1386{
1387 while (!list_empty(dispose)) {
745a8e3b
YZ
1388 struct ceph_cap *cap;
1389 /* zero out the in-progress message */
c8a96a31 1390 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
745a8e3b
YZ
1391 list_del(&cap->session_caps);
1392 ceph_put_cap(mdsc, cap);
2f2dc053 1393 }
2f2dc053
SW
1394}
1395
1c841a96
YZ
1396static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1397 struct ceph_mds_session *session)
1398{
1399 struct ceph_mds_request *req;
1400 struct rb_node *p;
f4b97866 1401 struct ceph_inode_info *ci;
1c841a96
YZ
1402
1403 dout("cleanup_session_requests mds%d\n", session->s_mds);
1404 mutex_lock(&mdsc->mutex);
1405 while (!list_empty(&session->s_unsafe)) {
1406 req = list_first_entry(&session->s_unsafe,
1407 struct ceph_mds_request, r_unsafe_item);
3e0708b9
YZ
1408 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1409 req->r_tid);
f4b97866
YZ
1410 if (req->r_target_inode) {
1411 /* dropping unsafe change of inode's attributes */
1412 ci = ceph_inode(req->r_target_inode);
1413 errseq_set(&ci->i_meta_err, -EIO);
1414 }
1415 if (req->r_unsafe_dir) {
1416 /* dropping unsafe directory operation */
1417 ci = ceph_inode(req->r_unsafe_dir);
1418 errseq_set(&ci->i_meta_err, -EIO);
1419 }
1c841a96
YZ
1420 __unregister_request(mdsc, req);
1421 }
1422 /* zero r_attempts, so kick_requests() will re-send requests */
1423 p = rb_first(&mdsc->request_tree);
1424 while (p) {
1425 req = rb_entry(p, struct ceph_mds_request, r_node);
1426 p = rb_next(p);
1427 if (req->r_session &&
1428 req->r_session->s_mds == session->s_mds)
1429 req->r_attempts = 0;
1430 }
1431 mutex_unlock(&mdsc->mutex);
1432}
1433
2f2dc053 1434/*
f818a736
SW
1435 * Helper to safely iterate over all caps associated with a session, with
1436 * special care taken to handle a racing __ceph_remove_cap().
2f2dc053 1437 *
f818a736 1438 * Caller must hold session s_mutex.
2f2dc053 1439 */
f5d77269
JL
1440int ceph_iterate_session_caps(struct ceph_mds_session *session,
1441 int (*cb)(struct inode *, struct ceph_cap *,
1442 void *), void *arg)
2f2dc053 1443{
7c1332b8
SW
1444 struct list_head *p;
1445 struct ceph_cap *cap;
1446 struct inode *inode, *last_inode = NULL;
1447 struct ceph_cap *old_cap = NULL;
2f2dc053
SW
1448 int ret;
1449
1450 dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1451 spin_lock(&session->s_cap_lock);
7c1332b8
SW
1452 p = session->s_caps.next;
1453 while (p != &session->s_caps) {
1454 cap = list_entry(p, struct ceph_cap, session_caps);
2f2dc053 1455 inode = igrab(&cap->ci->vfs_inode);
7c1332b8
SW
1456 if (!inode) {
1457 p = p->next;
2f2dc053 1458 continue;
7c1332b8
SW
1459 }
1460 session->s_cap_iterator = cap;
2f2dc053 1461 spin_unlock(&session->s_cap_lock);
7c1332b8
SW
1462
1463 if (last_inode) {
3e1d0452
YZ
1464 /* avoid calling iput_final() while holding
1465 * s_mutex or in mds dispatch threads */
1466 ceph_async_iput(last_inode);
7c1332b8
SW
1467 last_inode = NULL;
1468 }
1469 if (old_cap) {
37151668 1470 ceph_put_cap(session->s_mdsc, old_cap);
7c1332b8
SW
1471 old_cap = NULL;
1472 }
1473
2f2dc053 1474 ret = cb(inode, cap, arg);
7c1332b8
SW
1475 last_inode = inode;
1476
2f2dc053 1477 spin_lock(&session->s_cap_lock);
7c1332b8 1478 p = p->next;
d37b1d99 1479 if (!cap->ci) {
7c1332b8
SW
1480 dout("iterate_session_caps finishing cap %p removal\n",
1481 cap);
1482 BUG_ON(cap->session != session);
745a8e3b 1483 cap->session = NULL;
7c1332b8
SW
1484 list_del_init(&cap->session_caps);
1485 session->s_nr_caps--;
e3ec8d68
YZ
1486 if (cap->queue_release)
1487 __ceph_queue_cap_release(session, cap);
1488 else
745a8e3b 1489 old_cap = cap; /* put_cap it w/o locks held */
7c1332b8 1490 }
5dacf091
SW
1491 if (ret < 0)
1492 goto out;
2f2dc053 1493 }
5dacf091
SW
1494 ret = 0;
1495out:
7c1332b8 1496 session->s_cap_iterator = NULL;
2f2dc053 1497 spin_unlock(&session->s_cap_lock);
7c1332b8 1498
3e1d0452 1499 ceph_async_iput(last_inode);
7c1332b8 1500 if (old_cap)
37151668 1501 ceph_put_cap(session->s_mdsc, old_cap);
7c1332b8 1502
5dacf091 1503 return ret;
2f2dc053
SW
1504}
1505
1506static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
6c99f254 1507 void *arg)
2f2dc053 1508{
6c93df5d 1509 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
2f2dc053 1510 struct ceph_inode_info *ci = ceph_inode(inode);
553adfd9 1511 LIST_HEAD(to_remove);
f4b97866 1512 bool dirty_dropped = false;
6c93df5d 1513 bool invalidate = false;
6c99f254 1514
2f2dc053
SW
1515 dout("removing cap %p, ci is %p, inode is %p\n",
1516 cap, ci, &ci->vfs_inode);
be655596 1517 spin_lock(&ci->i_ceph_lock);
a096b09a 1518 __ceph_remove_cap(cap, false);
571ade33 1519 if (!ci->i_auth_cap) {
553adfd9 1520 struct ceph_cap_flush *cf;
6c93df5d 1521 struct ceph_mds_client *mdsc = fsc->mdsc;
6c99f254 1522
d468e729
YZ
1523 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1524 if (inode->i_data.nrpages > 0)
1525 invalidate = true;
1526 if (ci->i_wrbuffer_ref > 0)
1527 mapping_set_error(&inode->i_data, -EIO);
1528 }
6c93df5d 1529
e4500b5e
YZ
1530 while (!list_empty(&ci->i_cap_flush_list)) {
1531 cf = list_first_entry(&ci->i_cap_flush_list,
1532 struct ceph_cap_flush, i_list);
8cdcc07d 1533 list_move(&cf->i_list, &to_remove);
553adfd9
YZ
1534 }
1535
6c99f254 1536 spin_lock(&mdsc->cap_dirty_lock);
8310b089 1537
e4500b5e
YZ
1538 list_for_each_entry(cf, &to_remove, i_list)
1539 list_del(&cf->g_list);
8310b089 1540
6c99f254 1541 if (!list_empty(&ci->i_dirty_item)) {
3e0708b9
YZ
1542 pr_warn_ratelimited(
1543 " dropping dirty %s state for %p %lld\n",
6c99f254
SW
1544 ceph_cap_string(ci->i_dirty_caps),
1545 inode, ceph_ino(inode));
1546 ci->i_dirty_caps = 0;
1547 list_del_init(&ci->i_dirty_item);
f4b97866 1548 dirty_dropped = true;
6c99f254
SW
1549 }
1550 if (!list_empty(&ci->i_flushing_item)) {
3e0708b9
YZ
1551 pr_warn_ratelimited(
1552 " dropping dirty+flushing %s state for %p %lld\n",
6c99f254
SW
1553 ceph_cap_string(ci->i_flushing_caps),
1554 inode, ceph_ino(inode));
1555 ci->i_flushing_caps = 0;
1556 list_del_init(&ci->i_flushing_item);
1557 mdsc->num_cap_flushing--;
f4b97866 1558 dirty_dropped = true;
6c99f254 1559 }
6c99f254 1560 spin_unlock(&mdsc->cap_dirty_lock);
553adfd9 1561
f4b97866
YZ
1562 if (dirty_dropped) {
1563 errseq_set(&ci->i_meta_err, -EIO);
1564
1565 if (ci->i_wrbuffer_ref_head == 0 &&
1566 ci->i_wr_ref == 0 &&
1567 ci->i_dirty_caps == 0 &&
1568 ci->i_flushing_caps == 0) {
1569 ceph_put_snap_context(ci->i_head_snapc);
1570 ci->i_head_snapc = NULL;
1571 }
1572 }
1573
b3f8d68f
YZ
1574 if (atomic_read(&ci->i_filelock_ref) > 0) {
1575 /* make further file lock syscall return -EIO */
1576 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1577 pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1578 inode, ceph_ino(inode));
1579 }
1580
f66fd9f0 1581 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
e4500b5e 1582 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
f66fd9f0
YZ
1583 ci->i_prealloc_cap_flush = NULL;
1584 }
6c99f254 1585 }
be655596 1586 spin_unlock(&ci->i_ceph_lock);
553adfd9
YZ
1587 while (!list_empty(&to_remove)) {
1588 struct ceph_cap_flush *cf;
1589 cf = list_first_entry(&to_remove,
e4500b5e
YZ
1590 struct ceph_cap_flush, i_list);
1591 list_del(&cf->i_list);
f66fd9f0 1592 ceph_free_cap_flush(cf);
553adfd9 1593 }
77310320
YZ
1594
1595 wake_up_all(&ci->i_cap_wq);
6c93df5d
YZ
1596 if (invalidate)
1597 ceph_queue_invalidate(inode);
f4b97866 1598 if (dirty_dropped)
6c99f254 1599 iput(inode);
2f2dc053
SW
1600 return 0;
1601}
1602
1603/*
1604 * caller must hold session s_mutex
1605 */
1606static void remove_session_caps(struct ceph_mds_session *session)
1607{
6c93df5d
YZ
1608 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1609 struct super_block *sb = fsc->sb;
c8a96a31
JL
1610 LIST_HEAD(dispose);
1611
2f2dc053 1612 dout("remove_session_caps on %p\n", session);
f5d77269 1613 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
6f60f889 1614
c8799fc4
YZ
1615 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1616
6f60f889
YZ
1617 spin_lock(&session->s_cap_lock);
1618 if (session->s_nr_caps > 0) {
6f60f889
YZ
1619 struct inode *inode;
1620 struct ceph_cap *cap, *prev = NULL;
1621 struct ceph_vino vino;
1622 /*
1623 * iterate_session_caps() skips inodes that are being
1624 * deleted, we need to wait until deletions are complete.
1625 * __wait_on_freeing_inode() is designed for the job,
1626 * but it is not exported, so use lookup inode function
1627 * to access it.
1628 */
1629 while (!list_empty(&session->s_caps)) {
1630 cap = list_entry(session->s_caps.next,
1631 struct ceph_cap, session_caps);
1632 if (cap == prev)
1633 break;
1634 prev = cap;
1635 vino = cap->ci->i_vino;
1636 spin_unlock(&session->s_cap_lock);
1637
ed284c49 1638 inode = ceph_find_inode(sb, vino);
3e1d0452
YZ
1639 /* avoid calling iput_final() while holding s_mutex */
1640 ceph_async_iput(inode);
6f60f889
YZ
1641
1642 spin_lock(&session->s_cap_lock);
1643 }
1644 }
745a8e3b
YZ
1645
1646 // drop cap expires and unlock s_cap_lock
c8a96a31 1647 detach_cap_releases(session, &dispose);
6f60f889 1648
2f2dc053 1649 BUG_ON(session->s_nr_caps > 0);
6c99f254 1650 BUG_ON(!list_empty(&session->s_cap_flushing));
c8a96a31
JL
1651 spin_unlock(&session->s_cap_lock);
1652 dispose_cap_releases(session->s_mdsc, &dispose);
2f2dc053
SW
1653}
1654
d2f8bb27
YZ
1655enum {
1656 RECONNECT,
1657 RENEWCAPS,
1658 FORCE_RO,
1659};
1660
2f2dc053
SW
1661/*
1662 * wake up any threads waiting on this session's caps. if the cap is
1663 * old (didn't get renewed on the client reconnect), remove it now.
1664 *
1665 * caller must hold s_mutex.
1666 */
1667static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1668 void *arg)
1669{
0dc2570f 1670 struct ceph_inode_info *ci = ceph_inode(inode);
d2f8bb27 1671 unsigned long ev = (unsigned long)arg;
0dc2570f 1672
d2f8bb27 1673 if (ev == RECONNECT) {
be655596 1674 spin_lock(&ci->i_ceph_lock);
0dc2570f
SW
1675 ci->i_wanted_max_size = 0;
1676 ci->i_requested_max_size = 0;
be655596 1677 spin_unlock(&ci->i_ceph_lock);
d2f8bb27
YZ
1678 } else if (ev == RENEWCAPS) {
1679 if (cap->cap_gen < cap->session->s_cap_gen) {
1680 /* mds did not re-issue stale cap */
1681 spin_lock(&ci->i_ceph_lock);
1682 cap->issued = cap->implemented = CEPH_CAP_PIN;
d2f8bb27
YZ
1683 spin_unlock(&ci->i_ceph_lock);
1684 }
1685 } else if (ev == FORCE_RO) {
0dc2570f 1686 }
e5360309 1687 wake_up_all(&ci->i_cap_wq);
2f2dc053
SW
1688 return 0;
1689}
1690
d2f8bb27 1691static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
2f2dc053
SW
1692{
1693 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
f5d77269
JL
1694 ceph_iterate_session_caps(session, wake_up_session_cb,
1695 (void *)(unsigned long)ev);
2f2dc053
SW
1696}
1697
1698/*
1699 * Send periodic message to MDS renewing all currently held caps. The
1700 * ack will reset the expiration for all caps from this session.
1701 *
1702 * caller holds s_mutex
1703 */
1704static int send_renew_caps(struct ceph_mds_client *mdsc,
1705 struct ceph_mds_session *session)
1706{
1707 struct ceph_msg *msg;
1708 int state;
1709
1710 if (time_after_eq(jiffies, session->s_cap_ttl) &&
1711 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1712 pr_info("mds%d caps stale\n", session->s_mds);
e4cb4cb8 1713 session->s_renew_requested = jiffies;
2f2dc053
SW
1714
1715 /* do not try to renew caps until a recovering mds has reconnected
1716 * with its clients. */
1717 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1718 if (state < CEPH_MDS_STATE_RECONNECT) {
1719 dout("send_renew_caps ignoring mds%d (%s)\n",
1720 session->s_mds, ceph_mds_state_name(state));
1721 return 0;
1722 }
1723
1724 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1725 ceph_mds_state_name(state));
2f2dc053
SW
1726 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1727 ++session->s_renew_seq);
a79832f2
SW
1728 if (!msg)
1729 return -ENOMEM;
2f2dc053
SW
1730 ceph_con_send(&session->s_con, msg);
1731 return 0;
1732}
1733
186e4f7a
YZ
1734static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1735 struct ceph_mds_session *session, u64 seq)
1736{
1737 struct ceph_msg *msg;
1738
1739 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
a687ecaf 1740 session->s_mds, ceph_session_state_name(session->s_state), seq);
186e4f7a
YZ
1741 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1742 if (!msg)
1743 return -ENOMEM;
1744 ceph_con_send(&session->s_con, msg);
1745 return 0;
1746}
1747
1748
2f2dc053
SW
1749/*
1750 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
0dc2570f
SW
1751 *
1752 * Called under session->s_mutex
2f2dc053
SW
1753 */
1754static void renewed_caps(struct ceph_mds_client *mdsc,
1755 struct ceph_mds_session *session, int is_renew)
1756{
1757 int was_stale;
1758 int wake = 0;
1759
1760 spin_lock(&session->s_cap_lock);
1ce208a6 1761 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2f2dc053
SW
1762
1763 session->s_cap_ttl = session->s_renew_requested +
1764 mdsc->mdsmap->m_session_timeout*HZ;
1765
1766 if (was_stale) {
1767 if (time_before(jiffies, session->s_cap_ttl)) {
1768 pr_info("mds%d caps renewed\n", session->s_mds);
1769 wake = 1;
1770 } else {
1771 pr_info("mds%d caps still stale\n", session->s_mds);
1772 }
1773 }
1774 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1775 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1776 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1777 spin_unlock(&session->s_cap_lock);
1778
1779 if (wake)
d2f8bb27 1780 wake_up_session_caps(session, RENEWCAPS);
2f2dc053
SW
1781}
1782
1783/*
1784 * send a session close request
1785 */
1786static int request_close_session(struct ceph_mds_client *mdsc,
1787 struct ceph_mds_session *session)
1788{
1789 struct ceph_msg *msg;
2f2dc053
SW
1790
1791 dout("request_close_session mds%d state %s seq %lld\n",
a687ecaf 1792 session->s_mds, ceph_session_state_name(session->s_state),
2f2dc053
SW
1793 session->s_seq);
1794 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
a79832f2
SW
1795 if (!msg)
1796 return -ENOMEM;
1797 ceph_con_send(&session->s_con, msg);
fcff415c 1798 return 1;
2f2dc053
SW
1799}
1800
1801/*
1802 * Called with s_mutex held.
1803 */
1804static int __close_session(struct ceph_mds_client *mdsc,
1805 struct ceph_mds_session *session)
1806{
1807 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1808 return 0;
1809 session->s_state = CEPH_MDS_SESSION_CLOSING;
1810 return request_close_session(mdsc, session);
1811}
1812
040d7860
YZ
1813static bool drop_negative_children(struct dentry *dentry)
1814{
1815 struct dentry *child;
1816 bool all_negative = true;
1817
1818 if (!d_is_dir(dentry))
1819 goto out;
1820
1821 spin_lock(&dentry->d_lock);
1822 list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1823 if (d_really_is_positive(child)) {
1824 all_negative = false;
1825 break;
1826 }
1827 }
1828 spin_unlock(&dentry->d_lock);
1829
1830 if (all_negative)
1831 shrink_dcache_parent(dentry);
1832out:
1833 return all_negative;
1834}
1835
2f2dc053
SW
1836/*
1837 * Trim old(er) caps.
1838 *
1839 * Because we can't cache an inode without one or more caps, we do
1840 * this indirectly: if a cap is unused, we prune its aliases, at which
1841 * point the inode will hopefully get dropped to.
1842 *
1843 * Yes, this is a bit sloppy. Our only real goal here is to respond to
1844 * memory pressure from the MDS, though, so it needn't be perfect.
1845 */
1846static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1847{
533a2818 1848 int *remaining = arg;
2f2dc053 1849 struct ceph_inode_info *ci = ceph_inode(inode);
979abfdd 1850 int used, wanted, oissued, mine;
2f2dc053 1851
533a2818 1852 if (*remaining <= 0)
2f2dc053
SW
1853 return -1;
1854
be655596 1855 spin_lock(&ci->i_ceph_lock);
2f2dc053
SW
1856 mine = cap->issued | cap->implemented;
1857 used = __ceph_caps_used(ci);
979abfdd 1858 wanted = __ceph_caps_file_wanted(ci);
2f2dc053
SW
1859 oissued = __ceph_caps_issued_other(ci, cap);
1860
979abfdd 1861 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2f2dc053 1862 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
979abfdd
YZ
1863 ceph_cap_string(used), ceph_cap_string(wanted));
1864 if (cap == ci->i_auth_cap) {
622f3e25
YZ
1865 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1866 !list_empty(&ci->i_cap_snaps))
979abfdd
YZ
1867 goto out;
1868 if ((used | wanted) & CEPH_CAP_ANY_WR)
1869 goto out;
89aa5930
YZ
1870 /* Note: it's possible that i_filelock_ref becomes non-zero
1871 * after dropping auth caps. It doesn't hurt because reply
1872 * of lock mds request will re-add auth caps. */
1873 if (atomic_read(&ci->i_filelock_ref) > 0)
1874 goto out;
979abfdd 1875 }
5e804ac4
YZ
1876 /* The inode has cached pages, but it's no longer used.
1877 * we can safely drop it */
525d15e8
YZ
1878 if (S_ISREG(inode->i_mode) &&
1879 wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
5e804ac4
YZ
1880 !(oissued & CEPH_CAP_FILE_CACHE)) {
1881 used = 0;
1882 oissued = 0;
1883 }
979abfdd 1884 if ((used | wanted) & ~oissued & mine)
2f2dc053
SW
1885 goto out; /* we need these caps */
1886
2f2dc053
SW
1887 if (oissued) {
1888 /* we aren't the only cap.. just remove us */
a096b09a 1889 __ceph_remove_cap(cap, true);
533a2818 1890 (*remaining)--;
2f2dc053 1891 } else {
040d7860 1892 struct dentry *dentry;
5e804ac4 1893 /* try dropping referring dentries */
be655596 1894 spin_unlock(&ci->i_ceph_lock);
040d7860
YZ
1895 dentry = d_find_any_alias(inode);
1896 if (dentry && drop_negative_children(dentry)) {
1897 int count;
1898 dput(dentry);
1899 d_prune_aliases(inode);
1900 count = atomic_read(&inode->i_count);
1901 if (count == 1)
533a2818 1902 (*remaining)--;
040d7860
YZ
1903 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1904 inode, cap, count);
1905 } else {
1906 dput(dentry);
1907 }
2f2dc053
SW
1908 return 0;
1909 }
1910
1911out:
be655596 1912 spin_unlock(&ci->i_ceph_lock);
2f2dc053
SW
1913 return 0;
1914}
1915
1916/*
1917 * Trim session cap count down to some max number.
1918 */
e30ee581
ZZ
1919int ceph_trim_caps(struct ceph_mds_client *mdsc,
1920 struct ceph_mds_session *session,
1921 int max_caps)
2f2dc053
SW
1922{
1923 int trim_caps = session->s_nr_caps - max_caps;
1924
1925 dout("trim_caps mds%d start: %d / %d, trim %d\n",
1926 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1927 if (trim_caps > 0) {
533a2818
JL
1928 int remaining = trim_caps;
1929
1930 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2f2dc053
SW
1931 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1932 session->s_mds, session->s_nr_caps, max_caps,
533a2818 1933 trim_caps - remaining);
2f2dc053 1934 }
a56371d9 1935
e3ec8d68 1936 ceph_flush_cap_releases(mdsc, session);
2f2dc053
SW
1937 return 0;
1938}
1939
8310b089
YZ
1940static int check_caps_flush(struct ceph_mds_client *mdsc,
1941 u64 want_flush_tid)
1942{
8310b089
YZ
1943 int ret = 1;
1944
1945 spin_lock(&mdsc->cap_dirty_lock);
e4500b5e
YZ
1946 if (!list_empty(&mdsc->cap_flush_list)) {
1947 struct ceph_cap_flush *cf =
1948 list_first_entry(&mdsc->cap_flush_list,
1949 struct ceph_cap_flush, g_list);
1950 if (cf->tid <= want_flush_tid) {
1951 dout("check_caps_flush still flushing tid "
1952 "%llu <= %llu\n", cf->tid, want_flush_tid);
1953 ret = 0;
1954 }
8310b089
YZ
1955 }
1956 spin_unlock(&mdsc->cap_dirty_lock);
1957 return ret;
d3383a8e
YZ
1958}
1959
2f2dc053
SW
1960/*
1961 * flush all dirty inode data to disk.
1962 *
8310b089 1963 * returns true if we've flushed through want_flush_tid
2f2dc053 1964 */
affbc19a 1965static void wait_caps_flush(struct ceph_mds_client *mdsc,
0e294387 1966 u64 want_flush_tid)
2f2dc053 1967{
0e294387 1968 dout("check_caps_flush want %llu\n", want_flush_tid);
8310b089
YZ
1969
1970 wait_event(mdsc->cap_flushing_wq,
1971 check_caps_flush(mdsc, want_flush_tid));
1972
1973 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2f2dc053
SW
1974}
1975
1976/*
1977 * called under s_mutex
1978 */
e3ec8d68
YZ
1979static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1980 struct ceph_mds_session *session)
2f2dc053 1981{
745a8e3b
YZ
1982 struct ceph_msg *msg = NULL;
1983 struct ceph_mds_cap_release *head;
1984 struct ceph_mds_cap_item *item;
92475f05 1985 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
745a8e3b
YZ
1986 struct ceph_cap *cap;
1987 LIST_HEAD(tmp_list);
1988 int num_cap_releases;
92475f05
JL
1989 __le32 barrier, *cap_barrier;
1990
1991 down_read(&osdc->lock);
1992 barrier = cpu_to_le32(osdc->epoch_barrier);
1993 up_read(&osdc->lock);
2f2dc053 1994
0f8605f2 1995 spin_lock(&session->s_cap_lock);
745a8e3b
YZ
1996again:
1997 list_splice_init(&session->s_cap_releases, &tmp_list);
1998 num_cap_releases = session->s_num_cap_releases;
1999 session->s_num_cap_releases = 0;
2f2dc053 2000 spin_unlock(&session->s_cap_lock);
e01a5946 2001
745a8e3b
YZ
2002 while (!list_empty(&tmp_list)) {
2003 if (!msg) {
2004 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
09cbfeaf 2005 PAGE_SIZE, GFP_NOFS, false);
745a8e3b
YZ
2006 if (!msg)
2007 goto out_err;
2008 head = msg->front.iov_base;
2009 head->num = cpu_to_le32(0);
2010 msg->front.iov_len = sizeof(*head);
92475f05
JL
2011
2012 msg->hdr.version = cpu_to_le16(2);
2013 msg->hdr.compat_version = cpu_to_le16(1);
745a8e3b 2014 }
92475f05 2015
745a8e3b
YZ
2016 cap = list_first_entry(&tmp_list, struct ceph_cap,
2017 session_caps);
2018 list_del(&cap->session_caps);
2019 num_cap_releases--;
e01a5946 2020
00bd8edb 2021 head = msg->front.iov_base;
4198aba4
JL
2022 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2023 &head->num);
745a8e3b
YZ
2024 item = msg->front.iov_base + msg->front.iov_len;
2025 item->ino = cpu_to_le64(cap->cap_ino);
2026 item->cap_id = cpu_to_le64(cap->cap_id);
2027 item->migrate_seq = cpu_to_le32(cap->mseq);
2028 item->seq = cpu_to_le32(cap->issue_seq);
2029 msg->front.iov_len += sizeof(*item);
2030
2031 ceph_put_cap(mdsc, cap);
2032
2033 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
92475f05
JL
2034 // Append cap_barrier field
2035 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2036 *cap_barrier = barrier;
2037 msg->front.iov_len += sizeof(*cap_barrier);
2038
745a8e3b
YZ
2039 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2040 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2041 ceph_con_send(&session->s_con, msg);
2042 msg = NULL;
2043 }
00bd8edb 2044 }
e01a5946 2045
745a8e3b 2046 BUG_ON(num_cap_releases != 0);
e01a5946 2047
745a8e3b
YZ
2048 spin_lock(&session->s_cap_lock);
2049 if (!list_empty(&session->s_cap_releases))
2050 goto again;
2051 spin_unlock(&session->s_cap_lock);
2052
2053 if (msg) {
92475f05
JL
2054 // Append cap_barrier field
2055 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2056 *cap_barrier = barrier;
2057 msg->front.iov_len += sizeof(*cap_barrier);
2058
745a8e3b
YZ
2059 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2060 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2061 ceph_con_send(&session->s_con, msg);
e01a5946 2062 }
745a8e3b
YZ
2063 return;
2064out_err:
2065 pr_err("send_cap_releases mds%d, failed to allocate message\n",
2066 session->s_mds);
2067 spin_lock(&session->s_cap_lock);
2068 list_splice(&tmp_list, &session->s_cap_releases);
2069 session->s_num_cap_releases += num_cap_releases;
2070 spin_unlock(&session->s_cap_lock);
e01a5946
SW
2071}
2072
e3ec8d68
YZ
2073static void ceph_cap_release_work(struct work_struct *work)
2074{
2075 struct ceph_mds_session *session =
2076 container_of(work, struct ceph_mds_session, s_cap_release_work);
2077
2078 mutex_lock(&session->s_mutex);
2079 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2080 session->s_state == CEPH_MDS_SESSION_HUNG)
2081 ceph_send_cap_releases(session->s_mdsc, session);
2082 mutex_unlock(&session->s_mutex);
2083 ceph_put_mds_session(session);
2084}
2085
2086void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2087 struct ceph_mds_session *session)
2088{
2089 if (mdsc->stopping)
2090 return;
2091
5b3248c6 2092 ceph_get_mds_session(session);
e3ec8d68
YZ
2093 if (queue_work(mdsc->fsc->cap_wq,
2094 &session->s_cap_release_work)) {
2095 dout("cap release work queued\n");
2096 } else {
2097 ceph_put_mds_session(session);
2098 dout("failed to queue cap release work\n");
2099 }
2100}
2101
2102/*
2103 * caller holds session->s_cap_lock
2104 */
2105void __ceph_queue_cap_release(struct ceph_mds_session *session,
2106 struct ceph_cap *cap)
2107{
2108 list_add_tail(&cap->session_caps, &session->s_cap_releases);
2109 session->s_num_cap_releases++;
2110
2111 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2112 ceph_flush_cap_releases(session->s_mdsc, session);
2113}
2114
37c4efc1
YZ
2115static void ceph_cap_reclaim_work(struct work_struct *work)
2116{
2117 struct ceph_mds_client *mdsc =
2118 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2119 int ret = ceph_trim_dentries(mdsc);
2120 if (ret == -EAGAIN)
2121 ceph_queue_cap_reclaim_work(mdsc);
2122}
2123
2124void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2125{
2126 if (mdsc->stopping)
2127 return;
2128
2129 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2130 dout("caps reclaim work queued\n");
2131 } else {
2132 dout("failed to queue caps release work\n");
2133 }
2134}
2135
fe33032d
YZ
2136void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2137{
2138 int val;
2139 if (!nr)
2140 return;
2141 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
bba1560b 2142 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
fe33032d
YZ
2143 atomic_set(&mdsc->cap_reclaim_pending, 0);
2144 ceph_queue_cap_reclaim_work(mdsc);
2145 }
2146}
2147
2f2dc053
SW
2148/*
2149 * requests
2150 */
2151
54008399
YZ
2152int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2153 struct inode *dir)
2154{
2155 struct ceph_inode_info *ci = ceph_inode(dir);
2156 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2157 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2a5beea3 2158 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
ad8c28a9
JL
2159 unsigned int num_entries;
2160 int order;
54008399
YZ
2161
2162 spin_lock(&ci->i_ceph_lock);
2163 num_entries = ci->i_files + ci->i_subdirs;
2164 spin_unlock(&ci->i_ceph_lock);
ad8c28a9 2165 num_entries = max(num_entries, 1U);
54008399
YZ
2166 num_entries = min(num_entries, opt->max_readdir);
2167
2168 order = get_order(size * num_entries);
2169 while (order >= 0) {
2a5beea3
YZ
2170 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2171 __GFP_NOWARN,
2172 order);
2173 if (rinfo->dir_entries)
54008399
YZ
2174 break;
2175 order--;
2176 }
2a5beea3 2177 if (!rinfo->dir_entries)
54008399
YZ
2178 return -ENOMEM;
2179
2180 num_entries = (PAGE_SIZE << order) / size;
2181 num_entries = min(num_entries, opt->max_readdir);
2182
2183 rinfo->dir_buf_size = PAGE_SIZE << order;
2184 req->r_num_caps = num_entries + 1;
2185 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2186 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2187 return 0;
2188}
2189
2f2dc053
SW
2190/*
2191 * Create an mds request.
2192 */
2193struct ceph_mds_request *
2194ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2195{
058daab7 2196 struct ceph_mds_request *req;
2f2dc053 2197
058daab7 2198 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2f2dc053
SW
2199 if (!req)
2200 return ERR_PTR(-ENOMEM);
2201
b4556396 2202 mutex_init(&req->r_fill_mutex);
37151668 2203 req->r_mdsc = mdsc;
2f2dc053 2204 req->r_started = jiffies;
70c94820 2205 req->r_start_latency = ktime_get();
2f2dc053
SW
2206 req->r_resend_mds = -1;
2207 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
68cd5b4b 2208 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2f2dc053 2209 req->r_fmode = -1;
153c8e6b 2210 kref_init(&req->r_kref);
fcd00b68 2211 RB_CLEAR_NODE(&req->r_node);
2f2dc053
SW
2212 INIT_LIST_HEAD(&req->r_wait);
2213 init_completion(&req->r_completion);
2214 init_completion(&req->r_safe_completion);
2215 INIT_LIST_HEAD(&req->r_unsafe_item);
2216
668c9a61 2217 ktime_get_coarse_real_ts64(&req->r_stamp);
b8e69066 2218
2f2dc053
SW
2219 req->r_op = op;
2220 req->r_direct_mode = mode;
2221 return req;
2222}
2223
2224/*
44ca18f2 2225 * return oldest (lowest) request, tid in request tree, 0 if none.
2f2dc053
SW
2226 *
2227 * called under mdsc->mutex.
2228 */
44ca18f2
SW
2229static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2230{
2231 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2232 return NULL;
2233 return rb_entry(rb_first(&mdsc->request_tree),
2234 struct ceph_mds_request, r_node);
2235}
2236
e8a7b8b1 2237static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2f2dc053 2238{
e8a7b8b1 2239 return mdsc->oldest_tid;
2f2dc053
SW
2240}
2241
2242/*
2243 * Build a dentry's path. Allocate on heap; caller must kfree. Based
2244 * on build_path_from_dentry in fs/cifs/dir.c.
2245 *
2246 * If @stop_on_nosnap, generate path relative to the first non-snapped
2247 * inode.
2248 *
2249 * Encode hidden .snap dirs as a double /, i.e.
2250 * foo/.snap/bar -> foo//bar
2251 */
69a10fb3 2252char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2f2dc053
SW
2253 int stop_on_nosnap)
2254{
2255 struct dentry *temp;
2256 char *path;
f77f21bb 2257 int pos;
1b71fe2e 2258 unsigned seq;
69a10fb3 2259 u64 base;
2f2dc053 2260
d37b1d99 2261 if (!dentry)
2f2dc053
SW
2262 return ERR_PTR(-EINVAL);
2263
f77f21bb 2264 path = __getname();
d37b1d99 2265 if (!path)
2f2dc053 2266 return ERR_PTR(-ENOMEM);
f77f21bb
JL
2267retry:
2268 pos = PATH_MAX - 1;
2269 path[pos] = '\0';
2270
2271 seq = read_seqbegin(&rename_lock);
1b71fe2e 2272 rcu_read_lock();
f77f21bb
JL
2273 temp = dentry;
2274 for (;;) {
1b71fe2e 2275 struct inode *inode;
2f2dc053 2276
1b71fe2e 2277 spin_lock(&temp->d_lock);
2b0143b5 2278 inode = d_inode(temp);
2f2dc053 2279 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
104648ad 2280 dout("build_path path+%d: %p SNAPDIR\n",
2f2dc053 2281 pos, temp);
d6b8bd67 2282 } else if (stop_on_nosnap && inode && dentry != temp &&
2f2dc053 2283 ceph_snap(inode) == CEPH_NOSNAP) {
9d5a09e6 2284 spin_unlock(&temp->d_lock);
d6b8bd67 2285 pos++; /* get rid of any prepended '/' */
2f2dc053
SW
2286 break;
2287 } else {
2288 pos -= temp->d_name.len;
1b71fe2e
AV
2289 if (pos < 0) {
2290 spin_unlock(&temp->d_lock);
2f2dc053 2291 break;
1b71fe2e 2292 }
f77f21bb 2293 memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2f2dc053 2294 }
1b71fe2e 2295 spin_unlock(&temp->d_lock);
41883ba8 2296 temp = READ_ONCE(temp->d_parent);
f77f21bb
JL
2297
2298 /* Are we at the root? */
2299 if (IS_ROOT(temp))
2300 break;
2301
2302 /* Are we out of buffer? */
2303 if (--pos < 0)
2304 break;
2305
2306 path[pos] = '/';
2f2dc053 2307 }
69a10fb3 2308 base = ceph_ino(d_inode(temp));
1b71fe2e 2309 rcu_read_unlock();
f5946bcc
JL
2310
2311 if (read_seqretry(&rename_lock, seq))
2312 goto retry;
2313
2314 if (pos < 0) {
2315 /*
2316 * A rename didn't occur, but somehow we didn't end up where
2317 * we thought we would. Throw a warning and try again.
2318 */
2319 pr_warn("build_path did not end path lookup where "
2320 "expected, pos is %d\n", pos);
2f2dc053
SW
2321 goto retry;
2322 }
2323
69a10fb3 2324 *pbase = base;
f77f21bb 2325 *plen = PATH_MAX - 1 - pos;
104648ad 2326 dout("build_path on %p %d built %llx '%.*s'\n",
f77f21bb
JL
2327 dentry, d_count(dentry), base, *plen, path + pos);
2328 return path + pos;
2f2dc053
SW
2329}
2330
fd36a717 2331static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2f2dc053 2332 const char **ppath, int *ppathlen, u64 *pino,
1bcb3440 2333 bool *pfreepath, bool parent_locked)
2f2dc053
SW
2334{
2335 char *path;
2336
c6b0b656 2337 rcu_read_lock();
fd36a717
JL
2338 if (!dir)
2339 dir = d_inode_rcu(dentry->d_parent);
964fff74 2340 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
c6b0b656
JL
2341 *pino = ceph_ino(dir);
2342 rcu_read_unlock();
964fff74
JL
2343 *ppath = dentry->d_name.name;
2344 *ppathlen = dentry->d_name.len;
2f2dc053
SW
2345 return 0;
2346 }
c6b0b656 2347 rcu_read_unlock();
2f2dc053
SW
2348 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2349 if (IS_ERR(path))
2350 return PTR_ERR(path);
2351 *ppath = path;
1bcb3440 2352 *pfreepath = true;
2f2dc053
SW
2353 return 0;
2354}
2355
2356static int build_inode_path(struct inode *inode,
2357 const char **ppath, int *ppathlen, u64 *pino,
1bcb3440 2358 bool *pfreepath)
2f2dc053
SW
2359{
2360 struct dentry *dentry;
2361 char *path;
2362
2363 if (ceph_snap(inode) == CEPH_NOSNAP) {
2364 *pino = ceph_ino(inode);
2365 *ppathlen = 0;
2366 return 0;
2367 }
2368 dentry = d_find_alias(inode);
2369 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2370 dput(dentry);
2371 if (IS_ERR(path))
2372 return PTR_ERR(path);
2373 *ppath = path;
1bcb3440 2374 *pfreepath = true;
2f2dc053
SW
2375 return 0;
2376}
2377
2378/*
2379 * request arguments may be specified via an inode *, a dentry *, or
2380 * an explicit ino+path.
2381 */
2382static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
fd36a717
JL
2383 struct inode *rdiri, const char *rpath,
2384 u64 rino, const char **ppath, int *pathlen,
1bcb3440 2385 u64 *ino, bool *freepath, bool parent_locked)
2f2dc053
SW
2386{
2387 int r = 0;
2388
2389 if (rinode) {
2390 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2391 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2392 ceph_snap(rinode));
2393 } else if (rdentry) {
fd36a717 2394 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
1bcb3440 2395 freepath, parent_locked);
2f2dc053
SW
2396 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2397 *ppath);
795858db 2398 } else if (rpath || rino) {
2f2dc053
SW
2399 *ino = rino;
2400 *ppath = rpath;
b000056a 2401 *pathlen = rpath ? strlen(rpath) : 0;
2f2dc053
SW
2402 dout(" path %.*s\n", *pathlen, rpath);
2403 }
2404
2405 return r;
2406}
2407
2408/*
2409 * called under mdsc->mutex
2410 */
2411static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2412 struct ceph_mds_request *req,
6e6f0923 2413 int mds, bool drop_cap_releases)
2f2dc053
SW
2414{
2415 struct ceph_msg *msg;
2416 struct ceph_mds_request_head *head;
2417 const char *path1 = NULL;
2418 const char *path2 = NULL;
2419 u64 ino1 = 0, ino2 = 0;
2420 int pathlen1 = 0, pathlen2 = 0;
1bcb3440 2421 bool freepath1 = false, freepath2 = false;
2f2dc053
SW
2422 int len;
2423 u16 releases;
2424 void *p, *end;
2425 int ret;
2426
2427 ret = set_request_path_attr(req->r_inode, req->r_dentry,
3dd69aab 2428 req->r_parent, req->r_path1, req->r_ino1.ino,
1bcb3440
JL
2429 &path1, &pathlen1, &ino1, &freepath1,
2430 test_bit(CEPH_MDS_R_PARENT_LOCKED,
2431 &req->r_req_flags));
2f2dc053
SW
2432 if (ret < 0) {
2433 msg = ERR_PTR(ret);
2434 goto out;
2435 }
2436
1bcb3440 2437 /* If r_old_dentry is set, then assume that its parent is locked */
2f2dc053 2438 ret = set_request_path_attr(NULL, req->r_old_dentry,
fd36a717 2439 req->r_old_dentry_dir,
2f2dc053 2440 req->r_path2, req->r_ino2.ino,
1bcb3440 2441 &path2, &pathlen2, &ino2, &freepath2, true);
2f2dc053
SW
2442 if (ret < 0) {
2443 msg = ERR_PTR(ret);
2444 goto out_free1;
2445 }
2446
2447 len = sizeof(*head) +
b8e69066 2448 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
777d738a 2449 sizeof(struct ceph_timespec);
2f2dc053
SW
2450
2451 /* calculate (max) length for cap releases */
2452 len += sizeof(struct ceph_mds_request_release) *
2453 (!!req->r_inode_drop + !!req->r_dentry_drop +
2454 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2455 if (req->r_dentry_drop)
c1dfc277 2456 len += pathlen1;
2f2dc053 2457 if (req->r_old_dentry_drop)
c1dfc277 2458 len += pathlen2;
2f2dc053 2459
0d9c1ab3 2460 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
a79832f2
SW
2461 if (!msg) {
2462 msg = ERR_PTR(-ENOMEM);
2f2dc053 2463 goto out_free2;
a79832f2 2464 }
2f2dc053 2465
7cfa0313 2466 msg->hdr.version = cpu_to_le16(2);
6df058c0
SW
2467 msg->hdr.tid = cpu_to_le64(req->r_tid);
2468
2f2dc053
SW
2469 head = msg->front.iov_base;
2470 p = msg->front.iov_base + sizeof(*head);
2471 end = msg->front.iov_base + msg->front.iov_len;
2472
2473 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2474 head->op = cpu_to_le32(req->r_op);
ff3d0046
EB
2475 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2476 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
6deb8008 2477 head->ino = cpu_to_le64(req->r_deleg_ino);
2f2dc053
SW
2478 head->args = req->r_args;
2479
2480 ceph_encode_filepath(&p, end, ino1, path1);
2481 ceph_encode_filepath(&p, end, ino2, path2);
2482
e979cf50
SW
2483 /* make note of release offset, in case we need to replay */
2484 req->r_request_release_offset = p - msg->front.iov_base;
2485
2f2dc053
SW
2486 /* cap releases */
2487 releases = 0;
2488 if (req->r_inode_drop)
2489 releases += ceph_encode_inode_release(&p,
2b0143b5 2490 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
719a2514
YZ
2491 mds, req->r_inode_drop, req->r_inode_unless,
2492 req->r_op == CEPH_MDS_OP_READDIR);
2f2dc053
SW
2493 if (req->r_dentry_drop)
2494 releases += ceph_encode_dentry_release(&p, req->r_dentry,
3dd69aab 2495 req->r_parent, mds, req->r_dentry_drop,
ca6c8ae0 2496 req->r_dentry_unless);
2f2dc053
SW
2497 if (req->r_old_dentry_drop)
2498 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
ca6c8ae0
JL
2499 req->r_old_dentry_dir, mds,
2500 req->r_old_dentry_drop,
2501 req->r_old_dentry_unless);
2f2dc053
SW
2502 if (req->r_old_inode_drop)
2503 releases += ceph_encode_inode_release(&p,
2b0143b5 2504 d_inode(req->r_old_dentry),
2f2dc053 2505 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
6e6f0923
YZ
2506
2507 if (drop_cap_releases) {
2508 releases = 0;
2509 p = msg->front.iov_base + req->r_request_release_offset;
2510 }
2511
2f2dc053
SW
2512 head->num_releases = cpu_to_le16(releases);
2513
b8e69066 2514 /* time stamp */
1f041a89
YZ
2515 {
2516 struct ceph_timespec ts;
0ed1e90a 2517 ceph_encode_timespec64(&ts, &req->r_stamp);
1f041a89
YZ
2518 ceph_encode_copy(&p, &ts, sizeof(ts));
2519 }
b8e69066 2520
2f2dc053
SW
2521 BUG_ON(p > end);
2522 msg->front.iov_len = p - msg->front.iov_base;
2523 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2524
25e6bae3
YZ
2525 if (req->r_pagelist) {
2526 struct ceph_pagelist *pagelist = req->r_pagelist;
25e6bae3
YZ
2527 ceph_msg_data_add_pagelist(msg, pagelist);
2528 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2529 } else {
2530 msg->hdr.data_len = 0;
ebf18f47 2531 }
02afca6c 2532
2f2dc053
SW
2533 msg->hdr.data_off = cpu_to_le16(0);
2534
2535out_free2:
2536 if (freepath2)
f77f21bb 2537 ceph_mdsc_free_path((char *)path2, pathlen2);
2f2dc053
SW
2538out_free1:
2539 if (freepath1)
f77f21bb 2540 ceph_mdsc_free_path((char *)path1, pathlen1);
2f2dc053
SW
2541out:
2542 return msg;
2543}
2544
2545/*
2546 * called under mdsc->mutex if error, under no mutex if
2547 * success.
2548 */
2549static void complete_request(struct ceph_mds_client *mdsc,
2550 struct ceph_mds_request *req)
2551{
70c94820
XL
2552 req->r_end_latency = ktime_get();
2553
2f2dc053
SW
2554 if (req->r_callback)
2555 req->r_callback(mdsc, req);
111c7081 2556 complete_all(&req->r_completion);
2f2dc053
SW
2557}
2558
2559/*
2560 * called under mdsc->mutex
2561 */
2562static int __prepare_send_request(struct ceph_mds_client *mdsc,
2563 struct ceph_mds_request *req,
6e6f0923 2564 int mds, bool drop_cap_releases)
2f2dc053
SW
2565{
2566 struct ceph_mds_request_head *rhead;
2567 struct ceph_msg *msg;
2568 int flags = 0;
2569
2f2dc053 2570 req->r_attempts++;
e55b71f8
GF
2571 if (req->r_inode) {
2572 struct ceph_cap *cap =
2573 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2574
2575 if (cap)
2576 req->r_sent_on_mseq = cap->mseq;
2577 else
2578 req->r_sent_on_mseq = -1;
2579 }
2f2dc053
SW
2580 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2581 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2582
bc2de10d 2583 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
c5c9a0bf 2584 void *p;
01a92f17
SW
2585 /*
2586 * Replay. Do not regenerate message (and rebuild
2587 * paths, etc.); just use the original message.
2588 * Rebuilding paths will break for renames because
2589 * d_move mangles the src name.
2590 */
2591 msg = req->r_request;
2592 rhead = msg->front.iov_base;
2593
2594 flags = le32_to_cpu(rhead->flags);
2595 flags |= CEPH_MDS_FLAG_REPLAY;
2596 rhead->flags = cpu_to_le32(flags);
2597
2598 if (req->r_target_inode)
2599 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2600
2601 rhead->num_retry = req->r_attempts - 1;
e979cf50
SW
2602
2603 /* remove cap/dentry releases from message */
2604 rhead->num_releases = 0;
c5c9a0bf
YZ
2605
2606 /* time stamp */
2607 p = msg->front.iov_base + req->r_request_release_offset;
1f041a89
YZ
2608 {
2609 struct ceph_timespec ts;
0ed1e90a 2610 ceph_encode_timespec64(&ts, &req->r_stamp);
1f041a89
YZ
2611 ceph_encode_copy(&p, &ts, sizeof(ts));
2612 }
c5c9a0bf
YZ
2613
2614 msg->front.iov_len = p - msg->front.iov_base;
2615 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
01a92f17
SW
2616 return 0;
2617 }
2618
2f2dc053
SW
2619 if (req->r_request) {
2620 ceph_msg_put(req->r_request);
2621 req->r_request = NULL;
2622 }
6e6f0923 2623 msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2f2dc053 2624 if (IS_ERR(msg)) {
e1518c7c 2625 req->r_err = PTR_ERR(msg);
a79832f2 2626 return PTR_ERR(msg);
2f2dc053
SW
2627 }
2628 req->r_request = msg;
2629
2630 rhead = msg->front.iov_base;
2f2dc053 2631 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
bc2de10d 2632 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2f2dc053 2633 flags |= CEPH_MDS_FLAG_REPLAY;
3bb48b41
JL
2634 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2635 flags |= CEPH_MDS_FLAG_ASYNC;
3dd69aab 2636 if (req->r_parent)
2f2dc053
SW
2637 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2638 rhead->flags = cpu_to_le32(flags);
2639 rhead->num_fwd = req->r_num_fwd;
2640 rhead->num_retry = req->r_attempts - 1;
2641
3dd69aab 2642 dout(" r_parent = %p\n", req->r_parent);
2f2dc053
SW
2643 return 0;
2644}
2645
9cf54563
XL
2646/*
2647 * called under mdsc->mutex
2648 */
2649static int __send_request(struct ceph_mds_client *mdsc,
2650 struct ceph_mds_session *session,
2651 struct ceph_mds_request *req,
2652 bool drop_cap_releases)
2653{
2654 int err;
2655
2656 err = __prepare_send_request(mdsc, req, session->s_mds,
2657 drop_cap_releases);
2658 if (!err) {
2659 ceph_msg_get(req->r_request);
2660 ceph_con_send(&session->s_con, req->r_request);
2661 }
2662
2663 return err;
2664}
2665
2f2dc053
SW
2666/*
2667 * send request, or put it on the appropriate wait list.
2668 */
d5548492 2669static void __do_request(struct ceph_mds_client *mdsc,
2f2dc053
SW
2670 struct ceph_mds_request *req)
2671{
2672 struct ceph_mds_session *session = NULL;
2673 int mds = -1;
48fec5d0 2674 int err = 0;
c4853e97 2675 bool random;
2f2dc053 2676
bc2de10d
JL
2677 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2678 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
eb1b8af3 2679 __unregister_request(mdsc, req);
d5548492 2680 return;
eb1b8af3 2681 }
2f2dc053
SW
2682
2683 if (req->r_timeout &&
2684 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2685 dout("do_request timed out\n");
8ccf7fcc 2686 err = -ETIMEDOUT;
2f2dc053
SW
2687 goto finish;
2688 }
52953d55 2689 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
48fec5d0
YZ
2690 dout("do_request forced umount\n");
2691 err = -EIO;
2692 goto finish;
2693 }
52953d55 2694 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
e9e427f0
YZ
2695 if (mdsc->mdsmap_err) {
2696 err = mdsc->mdsmap_err;
2697 dout("do_request mdsmap err %d\n", err);
2698 goto finish;
2699 }
cc8e8342
YZ
2700 if (mdsc->mdsmap->m_epoch == 0) {
2701 dout("do_request no mdsmap, waiting for map\n");
2702 list_add(&req->r_wait, &mdsc->waiting_for_map);
d5548492 2703 return;
cc8e8342 2704 }
e9e427f0
YZ
2705 if (!(mdsc->fsc->mount_options->flags &
2706 CEPH_MOUNT_OPT_MOUNTWAIT) &&
2707 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
97820058 2708 err = -EHOSTUNREACH;
e9e427f0
YZ
2709 goto finish;
2710 }
2711 }
2f2dc053 2712
dc69e2e9
SW
2713 put_request_session(req);
2714
c4853e97 2715 mds = __choose_mds(mdsc, req, &random);
2f2dc053
SW
2716 if (mds < 0 ||
2717 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3bb48b41
JL
2718 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2719 err = -EJUKEBOX;
2720 goto finish;
2721 }
2f2dc053
SW
2722 dout("do_request no mds or not active, waiting for map\n");
2723 list_add(&req->r_wait, &mdsc->waiting_for_map);
d5548492 2724 return;
2f2dc053
SW
2725 }
2726
2727 /* get, open session */
2728 session = __ceph_lookup_mds_session(mdsc, mds);
9c423956 2729 if (!session) {
2f2dc053 2730 session = register_session(mdsc, mds);
9c423956
SW
2731 if (IS_ERR(session)) {
2732 err = PTR_ERR(session);
2733 goto finish;
2734 }
2735 }
5b3248c6 2736 req->r_session = ceph_get_mds_session(session);
dc69e2e9 2737
2f2dc053 2738 dout("do_request mds%d session %p state %s\n", mds, session,
a687ecaf 2739 ceph_session_state_name(session->s_state));
2f2dc053
SW
2740 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2741 session->s_state != CEPH_MDS_SESSION_HUNG) {
fcff415c
YZ
2742 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2743 err = -EACCES;
2744 goto out_session;
2745 }
3bb48b41
JL
2746 /*
2747 * We cannot queue async requests since the caps and delegated
2748 * inodes are bound to the session. Just return -EJUKEBOX and
2749 * let the caller retry a sync request in that case.
2750 */
2751 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2752 err = -EJUKEBOX;
2753 goto out_session;
2754 }
2f2dc053 2755 if (session->s_state == CEPH_MDS_SESSION_NEW ||
c4853e97 2756 session->s_state == CEPH_MDS_SESSION_CLOSING) {
2f2dc053 2757 __open_session(mdsc, session);
c4853e97
XL
2758 /* retry the same mds later */
2759 if (random)
2760 req->r_resend_mds = mds;
2761 }
2f2dc053
SW
2762 list_add(&req->r_wait, &session->s_waiting);
2763 goto out_session;
2764 }
2765
2766 /* send request */
2f2dc053
SW
2767 req->r_resend_mds = -1; /* forget any previous mds hint */
2768
2769 if (req->r_request_started == 0) /* note request start time */
2770 req->r_request_started = jiffies;
2771
9cf54563 2772 err = __send_request(mdsc, session, req, false);
2f2dc053
SW
2773
2774out_session:
2775 ceph_put_mds_session(session);
48fec5d0
YZ
2776finish:
2777 if (err) {
2778 dout("__do_request early error %d\n", err);
2779 req->r_err = err;
2780 complete_request(mdsc, req);
2781 __unregister_request(mdsc, req);
2782 }
d5548492 2783 return;
2f2dc053
SW
2784}
2785
2786/*
2787 * called under mdsc->mutex
2788 */
2789static void __wake_requests(struct ceph_mds_client *mdsc,
2790 struct list_head *head)
2791{
ed75ec2c
YZ
2792 struct ceph_mds_request *req;
2793 LIST_HEAD(tmp_list);
2794
2795 list_splice_init(head, &tmp_list);
2f2dc053 2796
ed75ec2c
YZ
2797 while (!list_empty(&tmp_list)) {
2798 req = list_entry(tmp_list.next,
2799 struct ceph_mds_request, r_wait);
2f2dc053 2800 list_del_init(&req->r_wait);
7971bd92 2801 dout(" wake request %p tid %llu\n", req, req->r_tid);
2f2dc053
SW
2802 __do_request(mdsc, req);
2803 }
2804}
2805
2806/*
2807 * Wake up threads with requests pending for @mds, so that they can
29790f26 2808 * resubmit their requests to a possibly different mds.
2f2dc053 2809 */
29790f26 2810static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2f2dc053 2811{
44ca18f2 2812 struct ceph_mds_request *req;
282c1052 2813 struct rb_node *p = rb_first(&mdsc->request_tree);
2f2dc053
SW
2814
2815 dout("kick_requests mds%d\n", mds);
282c1052 2816 while (p) {
44ca18f2 2817 req = rb_entry(p, struct ceph_mds_request, r_node);
282c1052 2818 p = rb_next(p);
bc2de10d 2819 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
44ca18f2 2820 continue;
3de22be6
YZ
2821 if (req->r_attempts > 0)
2822 continue; /* only new requests */
44ca18f2
SW
2823 if (req->r_session &&
2824 req->r_session->s_mds == mds) {
2825 dout(" kicking tid %llu\n", req->r_tid);
03974e81 2826 list_del_init(&req->r_wait);
44ca18f2 2827 __do_request(mdsc, req);
2f2dc053
SW
2828 }
2829 }
2830}
2831
86bda539 2832int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2f2dc053
SW
2833 struct ceph_mds_request *req)
2834{
891f3f5a 2835 int err = 0;
86bda539
JL
2836
2837 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2838 if (req->r_inode)
2839 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
9c1c2b35 2840 if (req->r_parent) {
719a2514
YZ
2841 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2842 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2843 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2844 spin_lock(&ci->i_ceph_lock);
2845 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2846 __ceph_touch_fmode(ci, mdsc, fmode);
2847 spin_unlock(&ci->i_ceph_lock);
9c1c2b35
JL
2848 ihold(req->r_parent);
2849 }
86bda539
JL
2850 if (req->r_old_dentry_dir)
2851 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2852 CEPH_CAP_PIN);
2853
891f3f5a
JL
2854 if (req->r_inode) {
2855 err = ceph_wait_on_async_create(req->r_inode);
2856 if (err) {
2857 dout("%s: wait for async create returned: %d\n",
2858 __func__, err);
2859 return err;
2860 }
2861 }
2862
2863 if (!err && req->r_old_inode) {
2864 err = ceph_wait_on_async_create(req->r_old_inode);
2865 if (err) {
2866 dout("%s: wait for async create returned: %d\n",
2867 __func__, err);
2868 return err;
2869 }
2870 }
2871
86bda539 2872 dout("submit_request on %p for inode %p\n", req, dir);
2f2dc053 2873 mutex_lock(&mdsc->mutex);
86bda539 2874 __register_request(mdsc, req, dir);
2f2dc053 2875 __do_request(mdsc, req);
86bda539 2876 err = req->r_err;
2f2dc053 2877 mutex_unlock(&mdsc->mutex);
86bda539 2878 return err;
2f2dc053
SW
2879}
2880
8340f22c
JL
2881static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
2882 struct ceph_mds_request *req)
2f2dc053
SW
2883{
2884 int err;
2885
e1518c7c 2886 /* wait */
e1518c7c 2887 dout("do_request waiting\n");
5be73034 2888 if (!req->r_timeout && req->r_wait_for_completion) {
9280be24 2889 err = req->r_wait_for_completion(mdsc, req);
e1518c7c 2890 } else {
5be73034
ID
2891 long timeleft = wait_for_completion_killable_timeout(
2892 &req->r_completion,
2893 ceph_timeout_jiffies(req->r_timeout));
2894 if (timeleft > 0)
2895 err = 0;
2896 else if (!timeleft)
8ccf7fcc 2897 err = -ETIMEDOUT; /* timed out */
5be73034
ID
2898 else
2899 err = timeleft; /* killed */
e1518c7c
SW
2900 }
2901 dout("do_request waited, got %d\n", err);
2902 mutex_lock(&mdsc->mutex);
5b1daecd 2903
e1518c7c 2904 /* only abort if we didn't race with a real reply */
bc2de10d 2905 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
e1518c7c
SW
2906 err = le32_to_cpu(req->r_reply_info.head->result);
2907 } else if (err < 0) {
2908 dout("aborted request %lld with %d\n", req->r_tid, err);
b4556396
SW
2909
2910 /*
2911 * ensure we aren't running concurrently with
2912 * ceph_fill_trace or ceph_readdir_prepopulate, which
2913 * rely on locks (dir mutex) held by our caller.
2914 */
2915 mutex_lock(&req->r_fill_mutex);
e1518c7c 2916 req->r_err = err;
bc2de10d 2917 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
b4556396 2918 mutex_unlock(&req->r_fill_mutex);
5b1daecd 2919
3dd69aab 2920 if (req->r_parent &&
167c9e35
SW
2921 (req->r_op & CEPH_MDS_OP_WRITE))
2922 ceph_invalidate_dir_request(req);
2f2dc053 2923 } else {
e1518c7c 2924 err = req->r_err;
2f2dc053 2925 }
2f2dc053 2926
e1518c7c 2927 mutex_unlock(&mdsc->mutex);
8340f22c
JL
2928 return err;
2929}
2930
2931/*
2932 * Synchrously perform an mds request. Take care of all of the
2933 * session setup, forwarding, retry details.
2934 */
2935int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2936 struct inode *dir,
2937 struct ceph_mds_request *req)
2938{
2939 int err;
2940
2941 dout("do_request on %p\n", req);
2942
2943 /* issue */
2944 err = ceph_mdsc_submit_request(mdsc, dir, req);
2945 if (!err)
2946 err = ceph_mdsc_wait_request(mdsc, req);
2f2dc053
SW
2947 dout("do_request %p done, result %d\n", req, err);
2948 return err;
2949}
2950
167c9e35 2951/*
2f276c51 2952 * Invalidate dir's completeness, dentry lease state on an aborted MDS
167c9e35
SW
2953 * namespace request.
2954 */
2955void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2956{
8d8f371c
YZ
2957 struct inode *dir = req->r_parent;
2958 struct inode *old_dir = req->r_old_dentry_dir;
167c9e35 2959
8d8f371c 2960 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
167c9e35 2961
8d8f371c
YZ
2962 ceph_dir_clear_complete(dir);
2963 if (old_dir)
2964 ceph_dir_clear_complete(old_dir);
167c9e35
SW
2965 if (req->r_dentry)
2966 ceph_invalidate_dentry_lease(req->r_dentry);
2967 if (req->r_old_dentry)
2968 ceph_invalidate_dentry_lease(req->r_old_dentry);
2969}
2970
2f2dc053
SW
2971/*
2972 * Handle mds reply.
2973 *
2974 * We take the session mutex and parse and process the reply immediately.
2975 * This preserves the logical ordering of replies, capabilities, etc., sent
2976 * by the MDS as they are applied to our local cache.
2977 */
2978static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2979{
2980 struct ceph_mds_client *mdsc = session->s_mdsc;
2981 struct ceph_mds_request *req;
2982 struct ceph_mds_reply_head *head = msg->front.iov_base;
2983 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
982d6011 2984 struct ceph_snap_realm *realm;
2f2dc053
SW
2985 u64 tid;
2986 int err, result;
2600d2dd 2987 int mds = session->s_mds;
2f2dc053 2988
2f2dc053
SW
2989 if (msg->front.iov_len < sizeof(*head)) {
2990 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
9ec7cab1 2991 ceph_msg_dump(msg);
2f2dc053
SW
2992 return;
2993 }
2994
2995 /* get request, session */
6df058c0 2996 tid = le64_to_cpu(msg->hdr.tid);
2f2dc053 2997 mutex_lock(&mdsc->mutex);
fcd00b68 2998 req = lookup_get_request(mdsc, tid);
2f2dc053
SW
2999 if (!req) {
3000 dout("handle_reply on unknown tid %llu\n", tid);
3001 mutex_unlock(&mdsc->mutex);
3002 return;
3003 }
3004 dout("handle_reply %p\n", req);
2f2dc053
SW
3005
3006 /* correct session? */
d96d6049 3007 if (req->r_session != session) {
2f2dc053
SW
3008 pr_err("mdsc_handle_reply got %llu on session mds%d"
3009 " not mds%d\n", tid, session->s_mds,
3010 req->r_session ? req->r_session->s_mds : -1);
3011 mutex_unlock(&mdsc->mutex);
3012 goto out;
3013 }
3014
3015 /* dup? */
bc2de10d
JL
3016 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3017 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
f3ae1b97 3018 pr_warn("got a dup %s reply on %llu from mds%d\n",
2f2dc053
SW
3019 head->safe ? "safe" : "unsafe", tid, mds);
3020 mutex_unlock(&mdsc->mutex);
3021 goto out;
3022 }
bc2de10d 3023 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
f3ae1b97 3024 pr_warn("got unsafe after safe on %llu from mds%d\n",
85792d0d
SW
3025 tid, mds);
3026 mutex_unlock(&mdsc->mutex);
3027 goto out;
3028 }
2f2dc053
SW
3029
3030 result = le32_to_cpu(head->result);
3031
3032 /*
e55b71f8
GF
3033 * Handle an ESTALE
3034 * if we're not talking to the authority, send to them
3035 * if the authority has changed while we weren't looking,
3036 * send to new authority
3037 * Otherwise we just have to return an ESTALE
2f2dc053
SW
3038 */
3039 if (result == -ESTALE) {
4c069a58 3040 dout("got ESTALE on request %llu\n", req->r_tid);
51da8e8c 3041 req->r_resend_mds = -1;
ca18bede 3042 if (req->r_direct_mode != USE_AUTH_MDS) {
4c069a58 3043 dout("not using auth, setting for that now\n");
e55b71f8 3044 req->r_direct_mode = USE_AUTH_MDS;
2f2dc053
SW
3045 __do_request(mdsc, req);
3046 mutex_unlock(&mdsc->mutex);
3047 goto out;
e55b71f8 3048 } else {
c4853e97 3049 int mds = __choose_mds(mdsc, req, NULL);
ca18bede 3050 if (mds >= 0 && mds != req->r_session->s_mds) {
4c069a58 3051 dout("but auth changed, so resending\n");
e55b71f8
GF
3052 __do_request(mdsc, req);
3053 mutex_unlock(&mdsc->mutex);
3054 goto out;
3055 }
2f2dc053 3056 }
4c069a58 3057 dout("have to return ESTALE on request %llu\n", req->r_tid);
2f2dc053
SW
3058 }
3059
e55b71f8 3060
2f2dc053 3061 if (head->safe) {
bc2de10d 3062 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2f2dc053 3063 __unregister_request(mdsc, req);
2f2dc053 3064
07edc057
XL
3065 /* last request during umount? */
3066 if (mdsc->stopping && !__get_oldest_req(mdsc))
3067 complete_all(&mdsc->safe_umount_waiters);
3068
bc2de10d 3069 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2f2dc053
SW
3070 /*
3071 * We already handled the unsafe response, now do the
3072 * cleanup. No need to examine the response; the MDS
3073 * doesn't include any result info in the safe
3074 * response. And even if it did, there is nothing
3075 * useful we could do with a revised return value.
3076 */
3077 dout("got safe reply %llu, mds%d\n", tid, mds);
2f2dc053 3078
2f2dc053
SW
3079 mutex_unlock(&mdsc->mutex);
3080 goto out;
3081 }
e1518c7c 3082 } else {
bc2de10d 3083 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2f2dc053
SW
3084 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3085 }
3086
3087 dout("handle_reply tid %lld result %d\n", tid, result);
3088 rinfo = &req->r_reply_info;
b37fe1f9 3089 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
d4846487 3090 err = parse_reply_info(session, msg, rinfo, (u64)-1);
b37fe1f9 3091 else
d4846487 3092 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
2f2dc053
SW
3093 mutex_unlock(&mdsc->mutex);
3094
3095 mutex_lock(&session->s_mutex);
3096 if (err < 0) {
25933abd 3097 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
9ec7cab1 3098 ceph_msg_dump(msg);
2f2dc053
SW
3099 goto out_err;
3100 }
3101
3102 /* snap trace */
982d6011 3103 realm = NULL;
2f2dc053
SW
3104 if (rinfo->snapblob_len) {
3105 down_write(&mdsc->snap_rwsem);
3106 ceph_update_snap_trace(mdsc, rinfo->snapblob,
982d6011
YZ
3107 rinfo->snapblob + rinfo->snapblob_len,
3108 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3109 &realm);
2f2dc053
SW
3110 downgrade_write(&mdsc->snap_rwsem);
3111 } else {
3112 down_read(&mdsc->snap_rwsem);
3113 }
3114
3115 /* insert trace into our cache */
b4556396 3116 mutex_lock(&req->r_fill_mutex);
315f2408 3117 current->journal_info = req;
f5a03b08 3118 err = ceph_fill_trace(mdsc->fsc->sb, req);
2f2dc053 3119 if (err == 0) {
6e8575fa 3120 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
81c6aea5 3121 req->r_op == CEPH_MDS_OP_LSSNAP))
2f2dc053 3122 ceph_readdir_prepopulate(req, req->r_session);
2f2dc053 3123 }
315f2408 3124 current->journal_info = NULL;
b4556396 3125 mutex_unlock(&req->r_fill_mutex);
2f2dc053
SW
3126
3127 up_read(&mdsc->snap_rwsem);
982d6011
YZ
3128 if (realm)
3129 ceph_put_snap_realm(mdsc, realm);
68cd5b4b 3130
fe33032d
YZ
3131 if (err == 0) {
3132 if (req->r_target_inode &&
3133 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3134 struct ceph_inode_info *ci =
3135 ceph_inode(req->r_target_inode);
3136 spin_lock(&ci->i_unsafe_lock);
3137 list_add_tail(&req->r_unsafe_target_item,
3138 &ci->i_unsafe_iops);
3139 spin_unlock(&ci->i_unsafe_lock);
3140 }
3141
3142 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
68cd5b4b 3143 }
2f2dc053 3144out_err:
e1518c7c 3145 mutex_lock(&mdsc->mutex);
bc2de10d 3146 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
e1518c7c
SW
3147 if (err) {
3148 req->r_err = err;
3149 } else {
5fdb1389 3150 req->r_reply = ceph_msg_get(msg);
bc2de10d 3151 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
e1518c7c 3152 }
2f2dc053 3153 } else {
e1518c7c 3154 dout("reply arrived after request %lld was aborted\n", tid);
2f2dc053 3155 }
e1518c7c 3156 mutex_unlock(&mdsc->mutex);
2f2dc053 3157
2f2dc053
SW
3158 mutex_unlock(&session->s_mutex);
3159
3160 /* kick calling process */
3161 complete_request(mdsc, req);
70c94820
XL
3162
3163 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
3164 req->r_end_latency, err);
2f2dc053
SW
3165out:
3166 ceph_mdsc_put_request(req);
3167 return;
3168}
3169
3170
3171
3172/*
3173 * handle mds notification that our request has been forwarded.
3174 */
2600d2dd
SW
3175static void handle_forward(struct ceph_mds_client *mdsc,
3176 struct ceph_mds_session *session,
3177 struct ceph_msg *msg)
2f2dc053
SW
3178{
3179 struct ceph_mds_request *req;
a1ea787c 3180 u64 tid = le64_to_cpu(msg->hdr.tid);
2f2dc053
SW
3181 u32 next_mds;
3182 u32 fwd_seq;
2f2dc053
SW
3183 int err = -EINVAL;
3184 void *p = msg->front.iov_base;
3185 void *end = p + msg->front.iov_len;
2f2dc053 3186
a1ea787c 3187 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
3188 next_mds = ceph_decode_32(&p);
3189 fwd_seq = ceph_decode_32(&p);
2f2dc053
SW
3190
3191 mutex_lock(&mdsc->mutex);
fcd00b68 3192 req = lookup_get_request(mdsc, tid);
2f2dc053 3193 if (!req) {
2a8e5e36 3194 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2f2dc053
SW
3195 goto out; /* dup reply? */
3196 }
3197
bc2de10d 3198 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2a8e5e36
SW
3199 dout("forward tid %llu aborted, unregistering\n", tid);
3200 __unregister_request(mdsc, req);
3201 } else if (fwd_seq <= req->r_num_fwd) {
3202 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2f2dc053
SW
3203 tid, next_mds, req->r_num_fwd, fwd_seq);
3204 } else {
3205 /* resend. forward race not possible; mds would drop */
2a8e5e36
SW
3206 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3207 BUG_ON(req->r_err);
bc2de10d 3208 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3de22be6 3209 req->r_attempts = 0;
2f2dc053
SW
3210 req->r_num_fwd = fwd_seq;
3211 req->r_resend_mds = next_mds;
3212 put_request_session(req);
3213 __do_request(mdsc, req);
3214 }
3215 ceph_mdsc_put_request(req);
3216out:
3217 mutex_unlock(&mdsc->mutex);
3218 return;
3219
3220bad:
3221 pr_err("mdsc_handle_forward decode error err=%d\n", err);
3222}
3223
131d7eb4
YZ
3224static int __decode_session_metadata(void **p, void *end,
3225 bool *blacklisted)
84bf3950
YZ
3226{
3227 /* map<string,string> */
3228 u32 n;
131d7eb4 3229 bool err_str;
84bf3950
YZ
3230 ceph_decode_32_safe(p, end, n, bad);
3231 while (n-- > 0) {
3232 u32 len;
3233 ceph_decode_32_safe(p, end, len, bad);
3234 ceph_decode_need(p, end, len, bad);
131d7eb4 3235 err_str = !strncmp(*p, "error_string", len);
84bf3950
YZ
3236 *p += len;
3237 ceph_decode_32_safe(p, end, len, bad);
3238 ceph_decode_need(p, end, len, bad);
131d7eb4
YZ
3239 if (err_str && strnstr(*p, "blacklisted", len))
3240 *blacklisted = true;
84bf3950
YZ
3241 *p += len;
3242 }
3243 return 0;
3244bad:
3245 return -1;
3246}
3247
2f2dc053
SW
3248/*
3249 * handle a mds session control message
3250 */
3251static void handle_session(struct ceph_mds_session *session,
3252 struct ceph_msg *msg)
3253{
3254 struct ceph_mds_client *mdsc = session->s_mdsc;
84bf3950
YZ
3255 int mds = session->s_mds;
3256 int msg_version = le16_to_cpu(msg->hdr.version);
3257 void *p = msg->front.iov_base;
3258 void *end = p + msg->front.iov_len;
3259 struct ceph_mds_session_head *h;
2f2dc053 3260 u32 op;
0fa82633 3261 u64 seq, features = 0;
2f2dc053 3262 int wake = 0;
131d7eb4 3263 bool blacklisted = false;
2f2dc053 3264
2f2dc053 3265 /* decode */
84bf3950
YZ
3266 ceph_decode_need(&p, end, sizeof(*h), bad);
3267 h = p;
3268 p += sizeof(*h);
3269
2f2dc053
SW
3270 op = le32_to_cpu(h->op);
3271 seq = le64_to_cpu(h->seq);
3272
84bf3950
YZ
3273 if (msg_version >= 3) {
3274 u32 len;
3275 /* version >= 2, metadata */
131d7eb4 3276 if (__decode_session_metadata(&p, end, &blacklisted) < 0)
84bf3950
YZ
3277 goto bad;
3278 /* version >= 3, feature bits */
3279 ceph_decode_32_safe(&p, end, len, bad);
0fa82633
JL
3280 ceph_decode_64_safe(&p, end, features, bad);
3281 p += len - sizeof(features);
84bf3950
YZ
3282 }
3283
2f2dc053 3284 mutex_lock(&mdsc->mutex);
0a07fc8c 3285 if (op == CEPH_SESSION_CLOSE) {
5b3248c6 3286 ceph_get_mds_session(session);
2600d2dd 3287 __unregister_session(mdsc, session);
0a07fc8c 3288 }
2f2dc053
SW
3289 /* FIXME: this ttl calculation is generous */
3290 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3291 mutex_unlock(&mdsc->mutex);
3292
3293 mutex_lock(&session->s_mutex);
3294
3295 dout("handle_session mds%d %s %p state %s seq %llu\n",
3296 mds, ceph_session_op_name(op), session,
a687ecaf 3297 ceph_session_state_name(session->s_state), seq);
2f2dc053
SW
3298
3299 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3300 session->s_state = CEPH_MDS_SESSION_OPEN;
3301 pr_info("mds%d came back\n", session->s_mds);
3302 }
3303
3304 switch (op) {
3305 case CEPH_SESSION_OPEN:
29790f26
SW
3306 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3307 pr_info("mds%d reconnect success\n", session->s_mds);
2f2dc053 3308 session->s_state = CEPH_MDS_SESSION_OPEN;
84bf3950 3309 session->s_features = features;
2f2dc053
SW
3310 renewed_caps(mdsc, session, 0);
3311 wake = 1;
3312 if (mdsc->stopping)
3313 __close_session(mdsc, session);
3314 break;
3315
3316 case CEPH_SESSION_RENEWCAPS:
3317 if (session->s_renew_seq == seq)
3318 renewed_caps(mdsc, session, 1);
3319 break;
3320
3321 case CEPH_SESSION_CLOSE:
29790f26
SW
3322 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3323 pr_info("mds%d reconnect denied\n", session->s_mds);
4d681c2f 3324 session->s_state = CEPH_MDS_SESSION_CLOSED;
1c841a96 3325 cleanup_session_requests(mdsc, session);
2f2dc053 3326 remove_session_caps(session);
656e4382 3327 wake = 2; /* for good measure */
f3c60c59 3328 wake_up_all(&mdsc->session_close_wq);
2f2dc053
SW
3329 break;
3330
3331 case CEPH_SESSION_STALE:
3332 pr_info("mds%d caps went stale, renewing\n",
3333 session->s_mds);
d8fb02ab 3334 spin_lock(&session->s_gen_ttl_lock);
2f2dc053 3335 session->s_cap_gen++;
1ce208a6 3336 session->s_cap_ttl = jiffies - 1;
d8fb02ab 3337 spin_unlock(&session->s_gen_ttl_lock);
2f2dc053
SW
3338 send_renew_caps(mdsc, session);
3339 break;
3340
3341 case CEPH_SESSION_RECALL_STATE:
e30ee581 3342 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2f2dc053
SW
3343 break;
3344
186e4f7a
YZ
3345 case CEPH_SESSION_FLUSHMSG:
3346 send_flushmsg_ack(mdsc, session, seq);
3347 break;
3348
03f4fcb0
YZ
3349 case CEPH_SESSION_FORCE_RO:
3350 dout("force_session_readonly %p\n", session);
3351 spin_lock(&session->s_cap_lock);
3352 session->s_readonly = true;
3353 spin_unlock(&session->s_cap_lock);
d2f8bb27 3354 wake_up_session_caps(session, FORCE_RO);
03f4fcb0
YZ
3355 break;
3356
fcff415c
YZ
3357 case CEPH_SESSION_REJECT:
3358 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3359 pr_info("mds%d rejected session\n", session->s_mds);
3360 session->s_state = CEPH_MDS_SESSION_REJECTED;
3361 cleanup_session_requests(mdsc, session);
3362 remove_session_caps(session);
131d7eb4
YZ
3363 if (blacklisted)
3364 mdsc->fsc->blacklisted = true;
fcff415c
YZ
3365 wake = 2; /* for good measure */
3366 break;
3367
2f2dc053
SW
3368 default:
3369 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3370 WARN_ON(1);
3371 }
3372
3373 mutex_unlock(&session->s_mutex);
3374 if (wake) {
3375 mutex_lock(&mdsc->mutex);
3376 __wake_requests(mdsc, &session->s_waiting);
656e4382
YZ
3377 if (wake == 2)
3378 kick_requests(mdsc, mds);
2f2dc053
SW
3379 mutex_unlock(&mdsc->mutex);
3380 }
0a07fc8c
YZ
3381 if (op == CEPH_SESSION_CLOSE)
3382 ceph_put_mds_session(session);
2f2dc053
SW
3383 return;
3384
3385bad:
3386 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3387 (int)msg->front.iov_len);
9ec7cab1 3388 ceph_msg_dump(msg);
2f2dc053
SW
3389 return;
3390}
3391
a25949b9
JL
3392void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3393{
3394 int dcaps;
3395
3396 dcaps = xchg(&req->r_dir_caps, 0);
3397 if (dcaps) {
3398 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3399 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3400 }
3401}
3402
2f2dc053
SW
3403/*
3404 * called under session->mutex.
3405 */
3406static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3407 struct ceph_mds_session *session)
3408{
3409 struct ceph_mds_request *req, *nreq;
3de22be6 3410 struct rb_node *p;
2f2dc053
SW
3411
3412 dout("replay_unsafe_requests mds%d\n", session->s_mds);
3413
3414 mutex_lock(&mdsc->mutex);
9cf54563
XL
3415 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3416 __send_request(mdsc, session, req, true);
3de22be6
YZ
3417
3418 /*
3419 * also re-send old requests when MDS enters reconnect stage. So that MDS
3420 * can process completed request in clientreplay stage.
3421 */
3422 p = rb_first(&mdsc->request_tree);
3423 while (p) {
3424 req = rb_entry(p, struct ceph_mds_request, r_node);
3425 p = rb_next(p);
bc2de10d 3426 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3de22be6
YZ
3427 continue;
3428 if (req->r_attempts == 0)
3429 continue; /* only old requests */
a25949b9
JL
3430 if (!req->r_session)
3431 continue;
3432 if (req->r_session->s_mds != session->s_mds)
3433 continue;
3434
3435 ceph_mdsc_release_dir_caps(req);
3436
3437 __send_request(mdsc, session, req, true);
3de22be6 3438 }
2f2dc053
SW
3439 mutex_unlock(&mdsc->mutex);
3440}
3441
81c5a148
YZ
3442static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3443{
3444 struct ceph_msg *reply;
3445 struct ceph_pagelist *_pagelist;
3446 struct page *page;
3447 __le32 *addr;
3448 int err = -ENOMEM;
3449
3450 if (!recon_state->allow_multi)
3451 return -ENOSPC;
3452
3453 /* can't handle message that contains both caps and realm */
3454 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3455
3456 /* pre-allocate new pagelist */
3457 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3458 if (!_pagelist)
3459 return -ENOMEM;
3460
3461 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3462 if (!reply)
3463 goto fail_msg;
3464
3465 /* placeholder for nr_caps */
3466 err = ceph_pagelist_encode_32(_pagelist, 0);
3467 if (err < 0)
3468 goto fail;
3469
3470 if (recon_state->nr_caps) {
3471 /* currently encoding caps */
3472 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3473 if (err)
3474 goto fail;
3475 } else {
3476 /* placeholder for nr_realms (currently encoding relams) */
3477 err = ceph_pagelist_encode_32(_pagelist, 0);
3478 if (err < 0)
3479 goto fail;
3480 }
3481
3482 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3483 if (err)
3484 goto fail;
3485
3486 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3487 addr = kmap_atomic(page);
3488 if (recon_state->nr_caps) {
3489 /* currently encoding caps */
3490 *addr = cpu_to_le32(recon_state->nr_caps);
3491 } else {
3492 /* currently encoding relams */
3493 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3494 }
3495 kunmap_atomic(addr);
3496
3497 reply->hdr.version = cpu_to_le16(5);
3498 reply->hdr.compat_version = cpu_to_le16(4);
3499
3500 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3501 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3502
3503 ceph_con_send(&recon_state->session->s_con, reply);
3504 ceph_pagelist_release(recon_state->pagelist);
3505
3506 recon_state->pagelist = _pagelist;
3507 recon_state->nr_caps = 0;
3508 recon_state->nr_realms = 0;
3509 recon_state->msg_version = 5;
3510 return 0;
3511fail:
3512 ceph_msg_put(reply);
3513fail_msg:
3514 ceph_pagelist_release(_pagelist);
3515 return err;
3516}
3517
2f2dc053
SW
3518/*
3519 * Encode information about a cap for a reconnect with the MDS.
3520 */
a25949b9 3521static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
2f2dc053
SW
3522 void *arg)
3523{
20cb34ae
SW
3524 union {
3525 struct ceph_mds_cap_reconnect v2;
3526 struct ceph_mds_cap_reconnect_v1 v1;
3527 } rec;
b3f8d68f 3528 struct ceph_inode_info *ci = cap->ci;
20cb34ae
SW
3529 struct ceph_reconnect_state *recon_state = arg;
3530 struct ceph_pagelist *pagelist = recon_state->pagelist;
5ccedf1c 3531 int err;
3469ed0d 3532 u64 snap_follows;
2f2dc053 3533
2f2dc053
SW
3534 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3535 inode, ceph_vinop(inode), cap, cap->cap_id,
3536 ceph_cap_string(cap->issued));
2f2dc053 3537
be655596 3538 spin_lock(&ci->i_ceph_lock);
2f2dc053
SW
3539 cap->seq = 0; /* reset cap seq */
3540 cap->issue_seq = 0; /* and issue_seq */
667ca05c 3541 cap->mseq = 0; /* and migrate_seq */
99a9c273 3542 cap->cap_gen = cap->session->s_cap_gen;
20cb34ae 3543
a25949b9 3544 /* These are lost when the session goes away */
785892fe
JL
3545 if (S_ISDIR(inode->i_mode)) {
3546 if (cap->issued & CEPH_CAP_DIR_CREATE) {
3547 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3548 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3549 }
a25949b9 3550 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
785892fe 3551 }
a25949b9 3552
121f22a1 3553 if (recon_state->msg_version >= 2) {
20cb34ae
SW
3554 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3555 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3556 rec.v2.issued = cpu_to_le32(cap->issued);
3557 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
5ccedf1c 3558 rec.v2.pathbase = 0;
ec1dff25
JL
3559 rec.v2.flock_len = (__force __le32)
3560 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
20cb34ae
SW
3561 } else {
3562 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3563 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3564 rec.v1.issued = cpu_to_le32(cap->issued);
3565 rec.v1.size = cpu_to_le64(inode->i_size);
9bbeab41
AB
3566 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3567 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
20cb34ae 3568 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
5ccedf1c 3569 rec.v1.pathbase = 0;
20cb34ae 3570 }
3469ed0d
YZ
3571
3572 if (list_empty(&ci->i_cap_snaps)) {
92776fd2 3573 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3469ed0d
YZ
3574 } else {
3575 struct ceph_cap_snap *capsnap =
3576 list_first_entry(&ci->i_cap_snaps,
3577 struct ceph_cap_snap, ci_item);
3578 snap_follows = capsnap->follows;
20cb34ae 3579 }
be655596 3580 spin_unlock(&ci->i_ceph_lock);
2f2dc053 3581
121f22a1 3582 if (recon_state->msg_version >= 2) {
40819f6f 3583 int num_fcntl_locks, num_flock_locks;
4deb14a2 3584 struct ceph_filelock *flocks = NULL;
81c5a148 3585 size_t struct_len, total_len = sizeof(u64);
121f22a1 3586 u8 struct_v = 0;
39be95e9
JS
3587
3588encode_again:
b3f8d68f
YZ
3589 if (rec.v2.flock_len) {
3590 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3591 } else {
3592 num_fcntl_locks = 0;
3593 num_flock_locks = 0;
3594 }
4deb14a2 3595 if (num_fcntl_locks + num_flock_locks > 0) {
6da2ec56
KC
3596 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3597 sizeof(struct ceph_filelock),
3598 GFP_NOFS);
4deb14a2
YZ
3599 if (!flocks) {
3600 err = -ENOMEM;
5ccedf1c 3601 goto out_err;
4deb14a2
YZ
3602 }
3603 err = ceph_encode_locks_to_buffer(inode, flocks,
3604 num_fcntl_locks,
3605 num_flock_locks);
3606 if (err) {
3607 kfree(flocks);
3608 flocks = NULL;
3609 if (err == -ENOSPC)
3610 goto encode_again;
5ccedf1c 3611 goto out_err;
4deb14a2
YZ
3612 }
3613 } else {
39be95e9 3614 kfree(flocks);
4deb14a2 3615 flocks = NULL;
39be95e9 3616 }
121f22a1
YZ
3617
3618 if (recon_state->msg_version >= 3) {
3619 /* version, compat_version and struct_len */
81c5a148 3620 total_len += 2 * sizeof(u8) + sizeof(u32);
3469ed0d 3621 struct_v = 2;
121f22a1 3622 }
39be95e9
JS
3623 /*
3624 * number of encoded locks is stable, so copy to pagelist
3625 */
121f22a1
YZ
3626 struct_len = 2 * sizeof(u32) +
3627 (num_fcntl_locks + num_flock_locks) *
3628 sizeof(struct ceph_filelock);
3629 rec.v2.flock_len = cpu_to_le32(struct_len);
3630
5ccedf1c 3631 struct_len += sizeof(u32) + sizeof(rec.v2);
121f22a1 3632
3469ed0d
YZ
3633 if (struct_v >= 2)
3634 struct_len += sizeof(u64); /* snap_follows */
3635
121f22a1 3636 total_len += struct_len;
81c5a148
YZ
3637
3638 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3639 err = send_reconnect_partial(recon_state);
3640 if (err)
3641 goto out_freeflocks;
3642 pagelist = recon_state->pagelist;
5ccedf1c 3643 }
121f22a1 3644
81c5a148
YZ
3645 err = ceph_pagelist_reserve(pagelist, total_len);
3646 if (err)
3647 goto out_freeflocks;
3648
3649 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
5ccedf1c
YZ
3650 if (recon_state->msg_version >= 3) {
3651 ceph_pagelist_encode_8(pagelist, struct_v);
3652 ceph_pagelist_encode_8(pagelist, 1);
3653 ceph_pagelist_encode_32(pagelist, struct_len);
121f22a1 3654 }
5ccedf1c
YZ
3655 ceph_pagelist_encode_string(pagelist, NULL, 0);
3656 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3657 ceph_locks_to_pagelist(flocks, pagelist,
3658 num_fcntl_locks, num_flock_locks);
3659 if (struct_v >= 2)
3660 ceph_pagelist_encode_64(pagelist, snap_follows);
81c5a148 3661out_freeflocks:
39be95e9 3662 kfree(flocks);
3612abbd 3663 } else {
5ccedf1c
YZ
3664 u64 pathbase = 0;
3665 int pathlen = 0;
3666 char *path = NULL;
3667 struct dentry *dentry;
3668
3669 dentry = d_find_alias(inode);
3670 if (dentry) {
3671 path = ceph_mdsc_build_path(dentry,
3672 &pathlen, &pathbase, 0);
3673 dput(dentry);
3674 if (IS_ERR(path)) {
3675 err = PTR_ERR(path);
3676 goto out_err;
3677 }
3678 rec.v1.pathbase = cpu_to_le64(pathbase);
121f22a1 3679 }
5ccedf1c
YZ
3680
3681 err = ceph_pagelist_reserve(pagelist,
81c5a148
YZ
3682 sizeof(u64) + sizeof(u32) +
3683 pathlen + sizeof(rec.v1));
5ccedf1c 3684 if (err) {
81c5a148 3685 goto out_freepath;
5ccedf1c
YZ
3686 }
3687
81c5a148 3688 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
5ccedf1c
YZ
3689 ceph_pagelist_encode_string(pagelist, path, pathlen);
3690 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
81c5a148 3691out_freepath:
f77f21bb 3692 ceph_mdsc_free_path(path, pathlen);
40819f6f 3693 }
44c99757 3694
5ccedf1c 3695out_err:
81c5a148
YZ
3696 if (err >= 0)
3697 recon_state->nr_caps++;
3698 return err;
3699}
3700
3701static int encode_snap_realms(struct ceph_mds_client *mdsc,
3702 struct ceph_reconnect_state *recon_state)
3703{
3704 struct rb_node *p;
3705 struct ceph_pagelist *pagelist = recon_state->pagelist;
3706 int err = 0;
3707
3708 if (recon_state->msg_version >= 4) {
3709 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3710 if (err < 0)
3711 goto fail;
3712 }
3713
3714 /*
3715 * snaprealms. we provide mds with the ino, seq (version), and
3716 * parent for all of our realms. If the mds has any newer info,
3717 * it will tell us.
3718 */
3719 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3720 struct ceph_snap_realm *realm =
3721 rb_entry(p, struct ceph_snap_realm, node);
3722 struct ceph_mds_snaprealm_reconnect sr_rec;
3723
3724 if (recon_state->msg_version >= 4) {
3725 size_t need = sizeof(u8) * 2 + sizeof(u32) +
3726 sizeof(sr_rec);
3727
3728 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3729 err = send_reconnect_partial(recon_state);
3730 if (err)
3731 goto fail;
3732 pagelist = recon_state->pagelist;
3733 }
3734
3735 err = ceph_pagelist_reserve(pagelist, need);
3736 if (err)
3737 goto fail;
3738
3739 ceph_pagelist_encode_8(pagelist, 1);
3740 ceph_pagelist_encode_8(pagelist, 1);
3741 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3742 }
3743
3744 dout(" adding snap realm %llx seq %lld parent %llx\n",
3745 realm->ino, realm->seq, realm->parent_ino);
3746 sr_rec.ino = cpu_to_le64(realm->ino);
3747 sr_rec.seq = cpu_to_le64(realm->seq);
3748 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3749
3750 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3751 if (err)
3752 goto fail;
3753
3754 recon_state->nr_realms++;
3755 }
3756fail:
93cea5be 3757 return err;
2f2dc053
SW
3758}
3759
3760
3761/*
3762 * If an MDS fails and recovers, clients need to reconnect in order to
3763 * reestablish shared state. This includes all caps issued through
3764 * this session _and_ the snap_realm hierarchy. Because it's not
3765 * clear which snap realms the mds cares about, we send everything we
3766 * know about.. that ensures we'll then get any new info the
3767 * recovering MDS might have.
3768 *
3769 * This is a relatively heavyweight operation, but it's rare.
3770 *
3771 * called with mdsc->mutex held.
3772 */
34b6c855
SW
3773static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3774 struct ceph_mds_session *session)
2f2dc053 3775{
2f2dc053 3776 struct ceph_msg *reply;
34b6c855 3777 int mds = session->s_mds;
9abf82b8 3778 int err = -ENOMEM;
81c5a148
YZ
3779 struct ceph_reconnect_state recon_state = {
3780 .session = session,
3781 };
c8a96a31 3782 LIST_HEAD(dispose);
2f2dc053 3783
34b6c855 3784 pr_info("mds%d reconnect start\n", mds);
2f2dc053 3785
81c5a148
YZ
3786 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3787 if (!recon_state.pagelist)
93cea5be 3788 goto fail_nopagelist;
93cea5be 3789
0d9c1ab3 3790 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
a79832f2 3791 if (!reply)
93cea5be 3792 goto fail_nomsg;
93cea5be 3793
d4846487
JL
3794 xa_destroy(&session->s_delegated_inos);
3795
34b6c855
SW
3796 mutex_lock(&session->s_mutex);
3797 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3798 session->s_seq = 0;
2f2dc053 3799
2f2dc053 3800 dout("session %p state %s\n", session,
a687ecaf 3801 ceph_session_state_name(session->s_state));
2f2dc053 3802
99a9c273
YZ
3803 spin_lock(&session->s_gen_ttl_lock);
3804 session->s_cap_gen++;
3805 spin_unlock(&session->s_gen_ttl_lock);
3806
3807 spin_lock(&session->s_cap_lock);
03f4fcb0
YZ
3808 /* don't know if session is readonly */
3809 session->s_readonly = 0;
99a9c273
YZ
3810 /*
3811 * notify __ceph_remove_cap() that we are composing cap reconnect.
3812 * If a cap get released before being added to the cap reconnect,
3813 * __ceph_remove_cap() should skip queuing cap release.
3814 */
3815 session->s_cap_reconnect = 1;
e01a5946 3816 /* drop old cap expires; we're about to reestablish that state */
c8a96a31
JL
3817 detach_cap_releases(session, &dispose);
3818 spin_unlock(&session->s_cap_lock);
3819 dispose_cap_releases(mdsc, &dispose);
e01a5946 3820
5d23371f 3821 /* trim unused caps to reduce MDS's cache rejoin time */
c0bd50e2
YZ
3822 if (mdsc->fsc->sb->s_root)
3823 shrink_dcache_parent(mdsc->fsc->sb->s_root);
5d23371f
YZ
3824
3825 ceph_con_close(&session->s_con);
3826 ceph_con_open(&session->s_con,
3827 CEPH_ENTITY_TYPE_MDS, mds,
3828 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3829
3830 /* replay unsafe requests */
3831 replay_unsafe_requests(mdsc, session);
3832
81c5a148
YZ
3833 ceph_early_kick_flushing_caps(mdsc, session);
3834
5d23371f
YZ
3835 down_read(&mdsc->snap_rwsem);
3836
81c5a148
YZ
3837 /* placeholder for nr_caps */
3838 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
93cea5be
SW
3839 if (err)
3840 goto fail;
20cb34ae 3841
81c5a148 3842 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
121f22a1 3843 recon_state.msg_version = 3;
81c5a148
YZ
3844 recon_state.allow_multi = true;
3845 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3846 recon_state.msg_version = 3;
3847 } else {
23c625ce 3848 recon_state.msg_version = 2;
81c5a148
YZ
3849 }
3850 /* trsaverse this session's caps */
a25949b9 3851 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
2f2dc053 3852
99a9c273
YZ
3853 spin_lock(&session->s_cap_lock);
3854 session->s_cap_reconnect = 0;
3855 spin_unlock(&session->s_cap_lock);
3856
81c5a148
YZ
3857 if (err < 0)
3858 goto fail;
2f2dc053 3859
81c5a148
YZ
3860 /* check if all realms can be encoded into current message */
3861 if (mdsc->num_snap_realms) {
3862 size_t total_len =
3863 recon_state.pagelist->length +
3864 mdsc->num_snap_realms *
3865 sizeof(struct ceph_mds_snaprealm_reconnect);
3866 if (recon_state.msg_version >= 4) {
3867 /* number of realms */
3868 total_len += sizeof(u32);
3869 /* version, compat_version and struct_len */
3870 total_len += mdsc->num_snap_realms *
3871 (2 * sizeof(u8) + sizeof(u32));
3872 }
3873 if (total_len > RECONNECT_MAX_SIZE) {
3874 if (!recon_state.allow_multi) {
3875 err = -ENOSPC;
3876 goto fail;
3877 }
3878 if (recon_state.nr_caps) {
3879 err = send_reconnect_partial(&recon_state);
3880 if (err)
3881 goto fail;
3882 }
3883 recon_state.msg_version = 5;
3884 }
2f2dc053 3885 }
2f2dc053 3886
81c5a148
YZ
3887 err = encode_snap_realms(mdsc, &recon_state);
3888 if (err < 0)
3889 goto fail;
3890
3891 if (recon_state.msg_version >= 5) {
3892 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3893 if (err < 0)
3894 goto fail;
3895 }
44c99757 3896
81c5a148
YZ
3897 if (recon_state.nr_caps || recon_state.nr_realms) {
3898 struct page *page =
3899 list_first_entry(&recon_state.pagelist->head,
3900 struct page, lru);
44c99757 3901 __le32 *addr = kmap_atomic(page);
81c5a148
YZ
3902 if (recon_state.nr_caps) {
3903 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3904 *addr = cpu_to_le32(recon_state.nr_caps);
3905 } else if (recon_state.msg_version >= 4) {
3906 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3907 }
44c99757 3908 kunmap_atomic(addr);
ebf18f47 3909 }
44c99757 3910
81c5a148
YZ
3911 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3912 if (recon_state.msg_version >= 4)
3913 reply->hdr.compat_version = cpu_to_le16(4);
e548e9b9 3914
81c5a148
YZ
3915 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3916 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
e548e9b9 3917
2f2dc053
SW
3918 ceph_con_send(&session->s_con, reply);
3919
9abf82b8
SW
3920 mutex_unlock(&session->s_mutex);
3921
3922 mutex_lock(&mdsc->mutex);
3923 __wake_requests(mdsc, &session->s_waiting);
3924 mutex_unlock(&mdsc->mutex);
3925
2f2dc053 3926 up_read(&mdsc->snap_rwsem);
81c5a148 3927 ceph_pagelist_release(recon_state.pagelist);
2f2dc053
SW
3928 return;
3929
93cea5be 3930fail:
2f2dc053 3931 ceph_msg_put(reply);
9abf82b8
SW
3932 up_read(&mdsc->snap_rwsem);
3933 mutex_unlock(&session->s_mutex);
93cea5be 3934fail_nomsg:
81c5a148 3935 ceph_pagelist_release(recon_state.pagelist);
93cea5be 3936fail_nopagelist:
9abf82b8 3937 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
9abf82b8 3938 return;
2f2dc053
SW
3939}
3940
3941
3942/*
3943 * compare old and new mdsmaps, kicking requests
3944 * and closing out old connections as necessary
3945 *
3946 * called under mdsc->mutex.
3947 */
3948static void check_new_map(struct ceph_mds_client *mdsc,
3949 struct ceph_mdsmap *newmap,
3950 struct ceph_mdsmap *oldmap)
3951{
3952 int i;
3953 int oldstate, newstate;
3954 struct ceph_mds_session *s;
3955
3956 dout("check_new_map new %u old %u\n",
3957 newmap->m_epoch, oldmap->m_epoch);
3958
b38c9eb4 3959 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
d37b1d99 3960 if (!mdsc->sessions[i])
2f2dc053
SW
3961 continue;
3962 s = mdsc->sessions[i];
3963 oldstate = ceph_mdsmap_get_state(oldmap, i);
3964 newstate = ceph_mdsmap_get_state(newmap, i);
3965
0deb01c9 3966 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2f2dc053 3967 i, ceph_mds_state_name(oldstate),
0deb01c9 3968 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2f2dc053 3969 ceph_mds_state_name(newstate),
0deb01c9 3970 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
a687ecaf 3971 ceph_session_state_name(s->s_state));
2f2dc053 3972
b38c9eb4 3973 if (i >= newmap->possible_max_rank) {
6f0f597b 3974 /* force close session for stopped mds */
5b3248c6 3975 ceph_get_mds_session(s);
6f0f597b
YZ
3976 __unregister_session(mdsc, s);
3977 __wake_requests(mdsc, &s->s_waiting);
3978 mutex_unlock(&mdsc->mutex);
2827528d 3979
6f0f597b
YZ
3980 mutex_lock(&s->s_mutex);
3981 cleanup_session_requests(mdsc, s);
3982 remove_session_caps(s);
3983 mutex_unlock(&s->s_mutex);
2827528d 3984
6f0f597b 3985 ceph_put_mds_session(s);
2827528d 3986
6f0f597b
YZ
3987 mutex_lock(&mdsc->mutex);
3988 kick_requests(mdsc, i);
3989 continue;
3990 }
3991
3992 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
3993 ceph_mdsmap_get_addr(newmap, i),
3994 sizeof(struct ceph_entity_addr))) {
3995 /* just close it */
3996 mutex_unlock(&mdsc->mutex);
3997 mutex_lock(&s->s_mutex);
3998 mutex_lock(&mdsc->mutex);
3999 ceph_con_close(&s->s_con);
4000 mutex_unlock(&s->s_mutex);
4001 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2f2dc053
SW
4002 } else if (oldstate == newstate) {
4003 continue; /* nothing new with this mds */
4004 }
4005
4006 /*
4007 * send reconnect?
4008 */
4009 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
34b6c855
SW
4010 newstate >= CEPH_MDS_STATE_RECONNECT) {
4011 mutex_unlock(&mdsc->mutex);
4012 send_mds_reconnect(mdsc, s);
4013 mutex_lock(&mdsc->mutex);
4014 }
2f2dc053
SW
4015
4016 /*
29790f26 4017 * kick request on any mds that has gone active.
2f2dc053
SW
4018 */
4019 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4020 newstate >= CEPH_MDS_STATE_ACTIVE) {
29790f26
SW
4021 if (oldstate != CEPH_MDS_STATE_CREATING &&
4022 oldstate != CEPH_MDS_STATE_STARTING)
4023 pr_info("mds%d recovery completed\n", s->s_mds);
4024 kick_requests(mdsc, i);
2f2dc053 4025 ceph_kick_flushing_caps(mdsc, s);
d2f8bb27 4026 wake_up_session_caps(s, RECONNECT);
2f2dc053
SW
4027 }
4028 }
cb170a22 4029
b38c9eb4 4030 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
cb170a22
SW
4031 s = mdsc->sessions[i];
4032 if (!s)
4033 continue;
4034 if (!ceph_mdsmap_is_laggy(newmap, i))
4035 continue;
4036 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4037 s->s_state == CEPH_MDS_SESSION_HUNG ||
4038 s->s_state == CEPH_MDS_SESSION_CLOSING) {
4039 dout(" connecting to export targets of laggy mds%d\n",
4040 i);
4041 __open_export_target_sessions(mdsc, s);
4042 }
4043 }
2f2dc053
SW
4044}
4045
4046
4047
4048/*
4049 * leases
4050 */
4051
4052/*
4053 * caller must hold session s_mutex, dentry->d_lock
4054 */
4055void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4056{
4057 struct ceph_dentry_info *di = ceph_dentry(dentry);
4058
4059 ceph_put_mds_session(di->lease_session);
4060 di->lease_session = NULL;
4061}
4062
2600d2dd
SW
4063static void handle_lease(struct ceph_mds_client *mdsc,
4064 struct ceph_mds_session *session,
4065 struct ceph_msg *msg)
2f2dc053 4066{
3d14c5d2 4067 struct super_block *sb = mdsc->fsc->sb;
2f2dc053 4068 struct inode *inode;
2f2dc053
SW
4069 struct dentry *parent, *dentry;
4070 struct ceph_dentry_info *di;
2600d2dd 4071 int mds = session->s_mds;
2f2dc053 4072 struct ceph_mds_lease *h = msg->front.iov_base;
1e5ea23d 4073 u32 seq;
2f2dc053 4074 struct ceph_vino vino;
2f2dc053
SW
4075 struct qstr dname;
4076 int release = 0;
4077
2f2dc053
SW
4078 dout("handle_lease from mds%d\n", mds);
4079
4080 /* decode */
4081 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4082 goto bad;
4083 vino.ino = le64_to_cpu(h->ino);
4084 vino.snap = CEPH_NOSNAP;
1e5ea23d 4085 seq = le32_to_cpu(h->seq);
0fcf6c02
YZ
4086 dname.len = get_unaligned_le32(h + 1);
4087 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
2f2dc053 4088 goto bad;
0fcf6c02 4089 dname.name = (void *)(h + 1) + sizeof(u32);
2f2dc053 4090
2f2dc053
SW
4091 /* lookup inode */
4092 inode = ceph_find_inode(sb, vino);
2f90b852
SW
4093 dout("handle_lease %s, ino %llx %p %.*s\n",
4094 ceph_lease_op_name(h->action), vino.ino, inode,
1e5ea23d 4095 dname.len, dname.name);
6cd3bcad
YZ
4096
4097 mutex_lock(&session->s_mutex);
4098 session->s_seq++;
4099
d37b1d99 4100 if (!inode) {
2f2dc053
SW
4101 dout("handle_lease no inode %llx\n", vino.ino);
4102 goto release;
4103 }
2f2dc053
SW
4104
4105 /* dentry */
4106 parent = d_find_alias(inode);
4107 if (!parent) {
4108 dout("no parent dentry on inode %p\n", inode);
4109 WARN_ON(1);
4110 goto release; /* hrm... */
4111 }
8387ff25 4112 dname.hash = full_name_hash(parent, dname.name, dname.len);
2f2dc053
SW
4113 dentry = d_lookup(parent, &dname);
4114 dput(parent);
4115 if (!dentry)
4116 goto release;
4117
4118 spin_lock(&dentry->d_lock);
4119 di = ceph_dentry(dentry);
4120 switch (h->action) {
4121 case CEPH_MDS_LEASE_REVOKE:
3d8eb7a9 4122 if (di->lease_session == session) {
1e5ea23d
SW
4123 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4124 h->seq = cpu_to_le32(di->lease_seq);
2f2dc053
SW
4125 __ceph_mdsc_drop_dentry_lease(dentry);
4126 }
4127 release = 1;
4128 break;
4129
4130 case CEPH_MDS_LEASE_RENEW:
3d8eb7a9 4131 if (di->lease_session == session &&
2f2dc053
SW
4132 di->lease_gen == session->s_cap_gen &&
4133 di->lease_renew_from &&
4134 di->lease_renew_after == 0) {
4135 unsigned long duration =
3563dbdd 4136 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
2f2dc053 4137
1e5ea23d 4138 di->lease_seq = seq;
9b16f03c 4139 di->time = di->lease_renew_from + duration;
2f2dc053
SW
4140 di->lease_renew_after = di->lease_renew_from +
4141 (duration >> 1);
4142 di->lease_renew_from = 0;
4143 }
4144 break;
4145 }
4146 spin_unlock(&dentry->d_lock);
4147 dput(dentry);
4148
4149 if (!release)
4150 goto out;
4151
4152release:
4153 /* let's just reuse the same message */
4154 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4155 ceph_msg_get(msg);
4156 ceph_con_send(&session->s_con, msg);
4157
4158out:
2f2dc053 4159 mutex_unlock(&session->s_mutex);
3e1d0452
YZ
4160 /* avoid calling iput_final() in mds dispatch threads */
4161 ceph_async_iput(inode);
2f2dc053
SW
4162 return;
4163
4164bad:
4165 pr_err("corrupt lease message\n");
9ec7cab1 4166 ceph_msg_dump(msg);
2f2dc053
SW
4167}
4168
4169void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2f2dc053
SW
4170 struct dentry *dentry, char action,
4171 u32 seq)
4172{
4173 struct ceph_msg *msg;
4174 struct ceph_mds_lease *lease;
8f2a98ef
YZ
4175 struct inode *dir;
4176 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
2f2dc053 4177
8f2a98ef
YZ
4178 dout("lease_send_msg identry %p %s to mds%d\n",
4179 dentry, ceph_lease_op_name(action), session->s_mds);
2f2dc053 4180
b61c2763 4181 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
a79832f2 4182 if (!msg)
2f2dc053
SW
4183 return;
4184 lease = msg->front.iov_base;
4185 lease->action = action;
2f2dc053 4186 lease->seq = cpu_to_le32(seq);
2f2dc053 4187
8f2a98ef
YZ
4188 spin_lock(&dentry->d_lock);
4189 dir = d_inode(dentry->d_parent);
4190 lease->ino = cpu_to_le64(ceph_ino(dir));
4191 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4192
4193 put_unaligned_le32(dentry->d_name.len, lease + 1);
4194 memcpy((void *)(lease + 1) + 4,
4195 dentry->d_name.name, dentry->d_name.len);
4196 spin_unlock(&dentry->d_lock);
2f2dc053
SW
4197 /*
4198 * if this is a preemptive lease RELEASE, no need to
4199 * flush request stream, since the actual request will
4200 * soon follow.
4201 */
4202 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4203
4204 ceph_con_send(&session->s_con, msg);
4205}
4206
2f2dc053 4207/*
7aac453a 4208 * lock unlock sessions, to wait ongoing session activities
2f2dc053 4209 */
7aac453a 4210static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
2f2dc053
SW
4211{
4212 int i;
4213
2f2dc053
SW
4214 mutex_lock(&mdsc->mutex);
4215 for (i = 0; i < mdsc->max_sessions; i++) {
4216 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4217 if (!s)
4218 continue;
4219 mutex_unlock(&mdsc->mutex);
4220 mutex_lock(&s->s_mutex);
4221 mutex_unlock(&s->s_mutex);
4222 ceph_put_mds_session(s);
4223 mutex_lock(&mdsc->mutex);
4224 }
4225 mutex_unlock(&mdsc->mutex);
4226}
4227
131d7eb4
YZ
4228static void maybe_recover_session(struct ceph_mds_client *mdsc)
4229{
4230 struct ceph_fs_client *fsc = mdsc->fsc;
4231
4232 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4233 return;
2f2dc053 4234
131d7eb4
YZ
4235 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4236 return;
4237
4238 if (!READ_ONCE(fsc->blacklisted))
4239 return;
4240
4241 if (fsc->last_auto_reconnect &&
4242 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4243 return;
4244
4245 pr_info("auto reconnect after blacklisted\n");
4246 fsc->last_auto_reconnect = jiffies;
4247 ceph_force_reconnect(fsc->sb);
4248}
2f2dc053
SW
4249
4250/*
4251 * delayed work -- periodically trim expired leases, renew caps with mds
4252 */
4253static void schedule_delayed(struct ceph_mds_client *mdsc)
4254{
4255 int delay = 5;
4256 unsigned hz = round_jiffies_relative(HZ * delay);
4257 schedule_delayed_work(&mdsc->delayed_work, hz);
4258}
4259
4260static void delayed_work(struct work_struct *work)
4261{
4262 int i;
4263 struct ceph_mds_client *mdsc =
4264 container_of(work, struct ceph_mds_client, delayed_work.work);
4265 int renew_interval;
4266 int renew_caps;
4267
4268 dout("mdsc delayed_work\n");
75c9627e 4269
2f2dc053
SW
4270 mutex_lock(&mdsc->mutex);
4271 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4272 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4273 mdsc->last_renew_caps);
4274 if (renew_caps)
4275 mdsc->last_renew_caps = jiffies;
4276
4277 for (i = 0; i < mdsc->max_sessions; i++) {
4278 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
d37b1d99 4279 if (!s)
2f2dc053
SW
4280 continue;
4281 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4282 dout("resending session close request for mds%d\n",
4283 s->s_mds);
4284 request_close_session(mdsc, s);
4285 ceph_put_mds_session(s);
4286 continue;
4287 }
4288 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4289 if (s->s_state == CEPH_MDS_SESSION_OPEN) {
4290 s->s_state = CEPH_MDS_SESSION_HUNG;
4291 pr_info("mds%d hung\n", s->s_mds);
4292 }
4293 }
71a228bc
EC
4294 if (s->s_state == CEPH_MDS_SESSION_NEW ||
4295 s->s_state == CEPH_MDS_SESSION_RESTARTING ||
4296 s->s_state == CEPH_MDS_SESSION_REJECTED) {
2f2dc053
SW
4297 /* this mds is failed or recovering, just wait */
4298 ceph_put_mds_session(s);
4299 continue;
4300 }
4301 mutex_unlock(&mdsc->mutex);
4302
4303 mutex_lock(&s->s_mutex);
4304 if (renew_caps)
4305 send_renew_caps(mdsc, s);
4306 else
4307 ceph_con_keepalive(&s->s_con);
aab53dd9
SW
4308 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4309 s->s_state == CEPH_MDS_SESSION_HUNG)
3d7ded4d 4310 ceph_send_cap_releases(mdsc, s);
2f2dc053
SW
4311 mutex_unlock(&s->s_mutex);
4312 ceph_put_mds_session(s);
4313
4314 mutex_lock(&mdsc->mutex);
4315 }
4316 mutex_unlock(&mdsc->mutex);
4317
37c4efc1
YZ
4318 ceph_check_delayed_caps(mdsc);
4319
4320 ceph_queue_cap_reclaim_work(mdsc);
4321
4322 ceph_trim_snapid_map(mdsc);
4323
131d7eb4
YZ
4324 maybe_recover_session(mdsc);
4325
2f2dc053
SW
4326 schedule_delayed(mdsc);
4327}
4328
3d14c5d2 4329int ceph_mdsc_init(struct ceph_fs_client *fsc)
2f2dc053 4330
2f2dc053 4331{
3d14c5d2 4332 struct ceph_mds_client *mdsc;
f9009efa 4333 int err;
3d14c5d2
YS
4334
4335 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4336 if (!mdsc)
4337 return -ENOMEM;
4338 mdsc->fsc = fsc;
2f2dc053
SW
4339 mutex_init(&mdsc->mutex);
4340 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
d37b1d99 4341 if (!mdsc->mdsmap) {
f9009efa
XL
4342 err = -ENOMEM;
4343 goto err_mdsc;
fb3101b6 4344 }
2d06eeb8 4345
50c55aec 4346 fsc->mdsc = mdsc;
2f2dc053 4347 init_completion(&mdsc->safe_umount_waiters);
f3c60c59 4348 init_waitqueue_head(&mdsc->session_close_wq);
2f2dc053
SW
4349 INIT_LIST_HEAD(&mdsc->waiting_for_map);
4350 mdsc->sessions = NULL;
86d8f67b 4351 atomic_set(&mdsc->num_sessions, 0);
2f2dc053
SW
4352 mdsc->max_sessions = 0;
4353 mdsc->stopping = 0;
d557c48d 4354 atomic64_set(&mdsc->quotarealms_count, 0);
0c44a8e0
LH
4355 mdsc->quotarealms_inodes = RB_ROOT;
4356 mutex_init(&mdsc->quotarealms_inodes_mutex);
affbc19a 4357 mdsc->last_snap_seq = 0;
2f2dc053 4358 init_rwsem(&mdsc->snap_rwsem);
a105f00c 4359 mdsc->snap_realms = RB_ROOT;
2f2dc053 4360 INIT_LIST_HEAD(&mdsc->snap_empty);
81c5a148 4361 mdsc->num_snap_realms = 0;
2f2dc053
SW
4362 spin_lock_init(&mdsc->snap_empty_lock);
4363 mdsc->last_tid = 0;
e8a7b8b1 4364 mdsc->oldest_tid = 0;
44ca18f2 4365 mdsc->request_tree = RB_ROOT;
2f2dc053
SW
4366 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4367 mdsc->last_renew_caps = jiffies;
4368 INIT_LIST_HEAD(&mdsc->cap_delay_list);
3a3430af 4369 INIT_LIST_HEAD(&mdsc->cap_wait_list);
2f2dc053
SW
4370 spin_lock_init(&mdsc->cap_delay_lock);
4371 INIT_LIST_HEAD(&mdsc->snap_flush_list);
4372 spin_lock_init(&mdsc->snap_flush_lock);
553adfd9 4373 mdsc->last_cap_flush_tid = 1;
e4500b5e 4374 INIT_LIST_HEAD(&mdsc->cap_flush_list);
2f2dc053 4375 INIT_LIST_HEAD(&mdsc->cap_dirty);
db354052 4376 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
2f2dc053
SW
4377 mdsc->num_cap_flushing = 0;
4378 spin_lock_init(&mdsc->cap_dirty_lock);
4379 init_waitqueue_head(&mdsc->cap_flushing_wq);
37c4efc1 4380 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
fe33032d 4381 atomic_set(&mdsc->cap_reclaim_pending, 0);
f9009efa
XL
4382 err = ceph_metric_init(&mdsc->metric);
4383 if (err)
4384 goto err_mdsmap;
37c4efc1
YZ
4385
4386 spin_lock_init(&mdsc->dentry_list_lock);
4387 INIT_LIST_HEAD(&mdsc->dentry_leases);
4388 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
2d06eeb8 4389
37151668 4390 ceph_caps_init(mdsc);
fe33032d 4391 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
37151668 4392
75c9627e
YZ
4393 spin_lock_init(&mdsc->snapid_map_lock);
4394 mdsc->snapid_map_tree = RB_ROOT;
4395 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4396
10183a69
YZ
4397 init_rwsem(&mdsc->pool_perm_rwsem);
4398 mdsc->pool_perm_tree = RB_ROOT;
4399
dfeb84d4
YZ
4400 strscpy(mdsc->nodename, utsname()->nodename,
4401 sizeof(mdsc->nodename));
5f44f142 4402 return 0;
f9009efa
XL
4403
4404err_mdsmap:
4405 kfree(mdsc->mdsmap);
4406err_mdsc:
4407 kfree(mdsc);
4408 return err;
2f2dc053
SW
4409}
4410
4411/*
4412 * Wait for safe replies on open mds requests. If we time out, drop
4413 * all requests from the tree to avoid dangling dentry refs.
4414 */
4415static void wait_requests(struct ceph_mds_client *mdsc)
4416{
a319bf56 4417 struct ceph_options *opts = mdsc->fsc->client->options;
2f2dc053 4418 struct ceph_mds_request *req;
2f2dc053
SW
4419
4420 mutex_lock(&mdsc->mutex);
44ca18f2 4421 if (__get_oldest_req(mdsc)) {
2f2dc053 4422 mutex_unlock(&mdsc->mutex);
44ca18f2 4423
2f2dc053
SW
4424 dout("wait_requests waiting for requests\n");
4425 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
a319bf56 4426 ceph_timeout_jiffies(opts->mount_timeout));
2f2dc053
SW
4427
4428 /* tear down remaining requests */
44ca18f2
SW
4429 mutex_lock(&mdsc->mutex);
4430 while ((req = __get_oldest_req(mdsc))) {
2f2dc053
SW
4431 dout("wait_requests timed out on tid %llu\n",
4432 req->r_tid);
428138c9 4433 list_del_init(&req->r_wait);
44ca18f2 4434 __unregister_request(mdsc, req);
2f2dc053
SW
4435 }
4436 }
4437 mutex_unlock(&mdsc->mutex);
4438 dout("wait_requests done\n");
4439}
4440
4441/*
4442 * called before mount is ro, and before dentries are torn down.
4443 * (hmm, does this still race with new lookups?)
4444 */
4445void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4446{
4447 dout("pre_umount\n");
4448 mdsc->stopping = 1;
4449
7aac453a 4450 lock_unlock_sessions(mdsc);
afcdaea3 4451 ceph_flush_dirty_caps(mdsc);
2f2dc053 4452 wait_requests(mdsc);
17c688c3
SW
4453
4454 /*
4455 * wait for reply handlers to drop their request refs and
4456 * their inode/dcache refs
4457 */
4458 ceph_msgr_flush();
0c44a8e0
LH
4459
4460 ceph_cleanup_quotarealms_inodes(mdsc);
2f2dc053
SW
4461}
4462
4463/*
4464 * wait for all write mds requests to flush.
4465 */
4466static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4467{
80fc7314 4468 struct ceph_mds_request *req = NULL, *nextreq;
44ca18f2 4469 struct rb_node *n;
2f2dc053
SW
4470
4471 mutex_lock(&mdsc->mutex);
4472 dout("wait_unsafe_requests want %lld\n", want_tid);
80fc7314 4473restart:
44ca18f2
SW
4474 req = __get_oldest_req(mdsc);
4475 while (req && req->r_tid <= want_tid) {
80fc7314
SW
4476 /* find next request */
4477 n = rb_next(&req->r_node);
4478 if (n)
4479 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4480 else
4481 nextreq = NULL;
e8a7b8b1
YZ
4482 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4483 (req->r_op & CEPH_MDS_OP_WRITE)) {
44ca18f2
SW
4484 /* write op */
4485 ceph_mdsc_get_request(req);
80fc7314
SW
4486 if (nextreq)
4487 ceph_mdsc_get_request(nextreq);
44ca18f2
SW
4488 mutex_unlock(&mdsc->mutex);
4489 dout("wait_unsafe_requests wait on %llu (want %llu)\n",
4490 req->r_tid, want_tid);
4491 wait_for_completion(&req->r_safe_completion);
4492 mutex_lock(&mdsc->mutex);
44ca18f2 4493 ceph_mdsc_put_request(req);
80fc7314
SW
4494 if (!nextreq)
4495 break; /* next dne before, so we're done! */
4496 if (RB_EMPTY_NODE(&nextreq->r_node)) {
4497 /* next request was removed from tree */
4498 ceph_mdsc_put_request(nextreq);
4499 goto restart;
4500 }
4501 ceph_mdsc_put_request(nextreq); /* won't go away */
44ca18f2 4502 }
80fc7314 4503 req = nextreq;
2f2dc053
SW
4504 }
4505 mutex_unlock(&mdsc->mutex);
4506 dout("wait_unsafe_requests done\n");
4507}
4508
4509void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4510{
0e294387 4511 u64 want_tid, want_flush;
2f2dc053 4512
52953d55 4513 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
56b7cf95
SW
4514 return;
4515
2f2dc053
SW
4516 dout("sync\n");
4517 mutex_lock(&mdsc->mutex);
4518 want_tid = mdsc->last_tid;
2f2dc053 4519 mutex_unlock(&mdsc->mutex);
2f2dc053 4520
afcdaea3 4521 ceph_flush_dirty_caps(mdsc);
d3383a8e 4522 spin_lock(&mdsc->cap_dirty_lock);
8310b089 4523 want_flush = mdsc->last_cap_flush_tid;
c8799fc4
YZ
4524 if (!list_empty(&mdsc->cap_flush_list)) {
4525 struct ceph_cap_flush *cf =
4526 list_last_entry(&mdsc->cap_flush_list,
4527 struct ceph_cap_flush, g_list);
4528 cf->wake = true;
4529 }
d3383a8e
YZ
4530 spin_unlock(&mdsc->cap_dirty_lock);
4531
0e294387
YZ
4532 dout("sync want tid %lld flush_seq %lld\n",
4533 want_tid, want_flush);
2f2dc053
SW
4534
4535 wait_unsafe_requests(mdsc, want_tid);
0e294387 4536 wait_caps_flush(mdsc, want_flush);
2f2dc053
SW
4537}
4538
f3c60c59
SW
4539/*
4540 * true if all sessions are closed, or we force unmount
4541 */
fcff415c 4542static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
f3c60c59 4543{
52953d55 4544 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
f3c60c59 4545 return true;
fcff415c 4546 return atomic_read(&mdsc->num_sessions) <= skipped;
f3c60c59 4547}
2f2dc053
SW
4548
4549/*
4550 * called after sb is ro.
4551 */
4552void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4553{
a319bf56 4554 struct ceph_options *opts = mdsc->fsc->client->options;
2f2dc053
SW
4555 struct ceph_mds_session *session;
4556 int i;
fcff415c 4557 int skipped = 0;
2f2dc053
SW
4558
4559 dout("close_sessions\n");
4560
2f2dc053 4561 /* close sessions */
f3c60c59
SW
4562 mutex_lock(&mdsc->mutex);
4563 for (i = 0; i < mdsc->max_sessions; i++) {
4564 session = __ceph_lookup_mds_session(mdsc, i);
4565 if (!session)
4566 continue;
2f2dc053 4567 mutex_unlock(&mdsc->mutex);
f3c60c59 4568 mutex_lock(&session->s_mutex);
fcff415c
YZ
4569 if (__close_session(mdsc, session) <= 0)
4570 skipped++;
f3c60c59
SW
4571 mutex_unlock(&session->s_mutex);
4572 ceph_put_mds_session(session);
2f2dc053
SW
4573 mutex_lock(&mdsc->mutex);
4574 }
f3c60c59
SW
4575 mutex_unlock(&mdsc->mutex);
4576
4577 dout("waiting for sessions to close\n");
fcff415c
YZ
4578 wait_event_timeout(mdsc->session_close_wq,
4579 done_closing_sessions(mdsc, skipped),
a319bf56 4580 ceph_timeout_jiffies(opts->mount_timeout));
2f2dc053
SW
4581
4582 /* tear down remaining sessions */
f3c60c59 4583 mutex_lock(&mdsc->mutex);
2f2dc053
SW
4584 for (i = 0; i < mdsc->max_sessions; i++) {
4585 if (mdsc->sessions[i]) {
5b3248c6 4586 session = ceph_get_mds_session(mdsc->sessions[i]);
2600d2dd 4587 __unregister_session(mdsc, session);
2f2dc053
SW
4588 mutex_unlock(&mdsc->mutex);
4589 mutex_lock(&session->s_mutex);
4590 remove_session_caps(session);
4591 mutex_unlock(&session->s_mutex);
4592 ceph_put_mds_session(session);
4593 mutex_lock(&mdsc->mutex);
4594 }
4595 }
2f2dc053 4596 WARN_ON(!list_empty(&mdsc->cap_delay_list));
2f2dc053
SW
4597 mutex_unlock(&mdsc->mutex);
4598
75c9627e 4599 ceph_cleanup_snapid_map(mdsc);
2f2dc053
SW
4600 ceph_cleanup_empty_realms(mdsc);
4601
37c4efc1 4602 cancel_work_sync(&mdsc->cap_reclaim_work);
2f2dc053
SW
4603 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4604
4605 dout("stopped\n");
4606}
4607
48fec5d0
YZ
4608void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4609{
4610 struct ceph_mds_session *session;
4611 int mds;
4612
4613 dout("force umount\n");
4614
4615 mutex_lock(&mdsc->mutex);
4616 for (mds = 0; mds < mdsc->max_sessions; mds++) {
4617 session = __ceph_lookup_mds_session(mdsc, mds);
4618 if (!session)
4619 continue;
d468e729
YZ
4620
4621 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4622 __unregister_session(mdsc, session);
4623 __wake_requests(mdsc, &session->s_waiting);
48fec5d0 4624 mutex_unlock(&mdsc->mutex);
d468e729 4625
48fec5d0
YZ
4626 mutex_lock(&session->s_mutex);
4627 __close_session(mdsc, session);
4628 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4629 cleanup_session_requests(mdsc, session);
4630 remove_session_caps(session);
4631 }
4632 mutex_unlock(&session->s_mutex);
4633 ceph_put_mds_session(session);
d468e729 4634
48fec5d0
YZ
4635 mutex_lock(&mdsc->mutex);
4636 kick_requests(mdsc, mds);
4637 }
4638 __wake_requests(mdsc, &mdsc->waiting_for_map);
4639 mutex_unlock(&mdsc->mutex);
4640}
4641
3d14c5d2 4642static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2f2dc053
SW
4643{
4644 dout("stop\n");
4645 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4646 if (mdsc->mdsmap)
4647 ceph_mdsmap_destroy(mdsc->mdsmap);
4648 kfree(mdsc->sessions);
37151668 4649 ceph_caps_finalize(mdsc);
10183a69 4650 ceph_pool_perm_destroy(mdsc);
2f2dc053
SW
4651}
4652
3d14c5d2
YS
4653void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4654{
4655 struct ceph_mds_client *mdsc = fsc->mdsc;
ef550f6f 4656 dout("mdsc_destroy %p\n", mdsc);
ef550f6f 4657
50c55aec
CX
4658 if (!mdsc)
4659 return;
4660
ef550f6f
SW
4661 /* flush out any connection work with references to us */
4662 ceph_msgr_flush();
4663
62a65f36
YZ
4664 ceph_mdsc_stop(mdsc);
4665
f9009efa
XL
4666 ceph_metric_destroy(&mdsc->metric);
4667
3d14c5d2
YS
4668 fsc->mdsc = NULL;
4669 kfree(mdsc);
ef550f6f 4670 dout("mdsc_destroy %p done\n", mdsc);
3d14c5d2
YS
4671}
4672
430afbad
YZ
4673void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4674{
4675 struct ceph_fs_client *fsc = mdsc->fsc;
4676 const char *mds_namespace = fsc->mount_options->mds_namespace;
4677 void *p = msg->front.iov_base;
4678 void *end = p + msg->front.iov_len;
4679 u32 epoch;
4680 u32 map_len;
4681 u32 num_fs;
4682 u32 mount_fscid = (u32)-1;
4683 u8 struct_v, struct_cv;
4684 int err = -EINVAL;
4685
4686 ceph_decode_need(&p, end, sizeof(u32), bad);
4687 epoch = ceph_decode_32(&p);
4688
4689 dout("handle_fsmap epoch %u\n", epoch);
4690
4691 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4692 struct_v = ceph_decode_8(&p);
4693 struct_cv = ceph_decode_8(&p);
4694 map_len = ceph_decode_32(&p);
4695
4696 ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4697 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4698
4699 num_fs = ceph_decode_32(&p);
4700 while (num_fs-- > 0) {
4701 void *info_p, *info_end;
4702 u32 info_len;
4703 u8 info_v, info_cv;
4704 u32 fscid, namelen;
4705
4706 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4707 info_v = ceph_decode_8(&p);
4708 info_cv = ceph_decode_8(&p);
4709 info_len = ceph_decode_32(&p);
4710 ceph_decode_need(&p, end, info_len, bad);
4711 info_p = p;
4712 info_end = p + info_len;
4713 p = info_end;
4714
4715 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4716 fscid = ceph_decode_32(&info_p);
4717 namelen = ceph_decode_32(&info_p);
4718 ceph_decode_need(&info_p, info_end, namelen, bad);
4719
4720 if (mds_namespace &&
4721 strlen(mds_namespace) == namelen &&
4722 !strncmp(mds_namespace, (char *)info_p, namelen)) {
4723 mount_fscid = fscid;
4724 break;
4725 }
4726 }
4727
4728 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4729 if (mount_fscid != (u32)-1) {
4730 fsc->client->monc.fs_cluster_id = mount_fscid;
4731 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4732 0, true);
4733 ceph_monc_renew_subs(&fsc->client->monc);
4734 } else {
4735 err = -ENOENT;
4736 goto err_out;
4737 }
4738 return;
76bd6ec4 4739
430afbad
YZ
4740bad:
4741 pr_err("error decoding fsmap\n");
4742err_out:
4743 mutex_lock(&mdsc->mutex);
76bd6ec4 4744 mdsc->mdsmap_err = err;
430afbad
YZ
4745 __wake_requests(mdsc, &mdsc->waiting_for_map);
4746 mutex_unlock(&mdsc->mutex);
430afbad 4747}
2f2dc053
SW
4748
4749/*
4750 * handle mds map update.
4751 */
430afbad 4752void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2f2dc053
SW
4753{
4754 u32 epoch;
4755 u32 maplen;
4756 void *p = msg->front.iov_base;
4757 void *end = p + msg->front.iov_len;
4758 struct ceph_mdsmap *newmap, *oldmap;
4759 struct ceph_fsid fsid;
4760 int err = -EINVAL;
4761
4762 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4763 ceph_decode_copy(&p, &fsid, sizeof(fsid));
3d14c5d2 4764 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
0743304d 4765 return;
c89136ea
SW
4766 epoch = ceph_decode_32(&p);
4767 maplen = ceph_decode_32(&p);
2f2dc053
SW
4768 dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4769
4770 /* do we need it? */
2f2dc053
SW
4771 mutex_lock(&mdsc->mutex);
4772 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4773 dout("handle_map epoch %u <= our %u\n",
4774 epoch, mdsc->mdsmap->m_epoch);
4775 mutex_unlock(&mdsc->mutex);
4776 return;
4777 }
4778
4779 newmap = ceph_mdsmap_decode(&p, end);
4780 if (IS_ERR(newmap)) {
4781 err = PTR_ERR(newmap);
4782 goto bad_unlock;
4783 }
4784
4785 /* swap into place */
4786 if (mdsc->mdsmap) {
4787 oldmap = mdsc->mdsmap;
4788 mdsc->mdsmap = newmap;
4789 check_new_map(mdsc, newmap, oldmap);
4790 ceph_mdsmap_destroy(oldmap);
4791 } else {
4792 mdsc->mdsmap = newmap; /* first mds map */
4793 }
719784ba
CX
4794 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4795 MAX_LFS_FILESIZE);
2f2dc053
SW
4796
4797 __wake_requests(mdsc, &mdsc->waiting_for_map);
82dcabad
ID
4798 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4799 mdsc->mdsmap->m_epoch);
2f2dc053
SW
4800
4801 mutex_unlock(&mdsc->mutex);
4802 schedule_delayed(mdsc);
4803 return;
4804
4805bad_unlock:
4806 mutex_unlock(&mdsc->mutex);
4807bad:
4808 pr_err("error decoding mdsmap %d\n", err);
4809 return;
4810}
4811
4812static struct ceph_connection *con_get(struct ceph_connection *con)
4813{
4814 struct ceph_mds_session *s = con->private;
4815
5b3248c6 4816 if (ceph_get_mds_session(s))
2f2dc053 4817 return con;
2f2dc053
SW
4818 return NULL;
4819}
4820
4821static void con_put(struct ceph_connection *con)
4822{
4823 struct ceph_mds_session *s = con->private;
4824
2f2dc053
SW
4825 ceph_put_mds_session(s);
4826}
4827
4828/*
4829 * if the client is unresponsive for long enough, the mds will kill
4830 * the session entirely.
4831 */
4832static void peer_reset(struct ceph_connection *con)
4833{
4834 struct ceph_mds_session *s = con->private;
7e70f0ed 4835 struct ceph_mds_client *mdsc = s->s_mdsc;
2f2dc053 4836
f3ae1b97 4837 pr_warn("mds%d closed our session\n", s->s_mds);
7e70f0ed 4838 send_mds_reconnect(mdsc, s);
2f2dc053
SW
4839}
4840
4841static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4842{
4843 struct ceph_mds_session *s = con->private;
4844 struct ceph_mds_client *mdsc = s->s_mdsc;
4845 int type = le16_to_cpu(msg->hdr.type);
4846
2600d2dd
SW
4847 mutex_lock(&mdsc->mutex);
4848 if (__verify_registered_session(mdsc, s) < 0) {
4849 mutex_unlock(&mdsc->mutex);
4850 goto out;
4851 }
4852 mutex_unlock(&mdsc->mutex);
4853
2f2dc053
SW
4854 switch (type) {
4855 case CEPH_MSG_MDS_MAP:
430afbad
YZ
4856 ceph_mdsc_handle_mdsmap(mdsc, msg);
4857 break;
4858 case CEPH_MSG_FS_MAP_USER:
4859 ceph_mdsc_handle_fsmap(mdsc, msg);
2f2dc053
SW
4860 break;
4861 case CEPH_MSG_CLIENT_SESSION:
4862 handle_session(s, msg);
4863 break;
4864 case CEPH_MSG_CLIENT_REPLY:
4865 handle_reply(s, msg);
4866 break;
4867 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2600d2dd 4868 handle_forward(mdsc, s, msg);
2f2dc053
SW
4869 break;
4870 case CEPH_MSG_CLIENT_CAPS:
4871 ceph_handle_caps(s, msg);
4872 break;
4873 case CEPH_MSG_CLIENT_SNAP:
2600d2dd 4874 ceph_handle_snap(mdsc, s, msg);
2f2dc053
SW
4875 break;
4876 case CEPH_MSG_CLIENT_LEASE:
2600d2dd 4877 handle_lease(mdsc, s, msg);
2f2dc053 4878 break;
fb18a575
LH
4879 case CEPH_MSG_CLIENT_QUOTA:
4880 ceph_handle_quota(mdsc, s, msg);
4881 break;
2f2dc053
SW
4882
4883 default:
4884 pr_err("received unknown message type %d %s\n", type,
4885 ceph_msg_type_name(type));
4886 }
2600d2dd 4887out:
2f2dc053
SW
4888 ceph_msg_put(msg);
4889}
4890
4e7a5dcd
SW
4891/*
4892 * authentication
4893 */
a3530df3
AE
4894
4895/*
4896 * Note: returned pointer is the address of a structure that's
4897 * managed separately. Caller must *not* attempt to free it.
4898 */
4899static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
8f43fb53 4900 int *proto, int force_new)
4e7a5dcd
SW
4901{
4902 struct ceph_mds_session *s = con->private;
4903 struct ceph_mds_client *mdsc = s->s_mdsc;
3d14c5d2 4904 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
74f1869f 4905 struct ceph_auth_handshake *auth = &s->s_auth;
4e7a5dcd 4906
74f1869f 4907 if (force_new && auth->authorizer) {
6c1ea260 4908 ceph_auth_destroy_authorizer(auth->authorizer);
74f1869f 4909 auth->authorizer = NULL;
4e7a5dcd 4910 }
27859f97
SW
4911 if (!auth->authorizer) {
4912 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4913 auth);
0bed9b5c
SW
4914 if (ret)
4915 return ERR_PTR(ret);
27859f97
SW
4916 } else {
4917 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4918 auth);
a255651d 4919 if (ret)
a3530df3 4920 return ERR_PTR(ret);
4e7a5dcd 4921 }
4e7a5dcd 4922 *proto = ac->protocol;
74f1869f 4923
a3530df3 4924 return auth;
4e7a5dcd
SW
4925}
4926
6daca13d
ID
4927static int add_authorizer_challenge(struct ceph_connection *con,
4928 void *challenge_buf, int challenge_buf_len)
4929{
4930 struct ceph_mds_session *s = con->private;
4931 struct ceph_mds_client *mdsc = s->s_mdsc;
4932 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4933
4934 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4935 challenge_buf, challenge_buf_len);
4936}
4e7a5dcd 4937
0dde5848 4938static int verify_authorizer_reply(struct ceph_connection *con)
4e7a5dcd
SW
4939{
4940 struct ceph_mds_session *s = con->private;
4941 struct ceph_mds_client *mdsc = s->s_mdsc;
3d14c5d2 4942 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4e7a5dcd 4943
0dde5848 4944 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4e7a5dcd
SW
4945}
4946
9bd2e6f8
SW
4947static int invalidate_authorizer(struct ceph_connection *con)
4948{
4949 struct ceph_mds_session *s = con->private;
4950 struct ceph_mds_client *mdsc = s->s_mdsc;
3d14c5d2 4951 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
9bd2e6f8 4952
27859f97 4953 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
9bd2e6f8 4954
3d14c5d2 4955 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
9bd2e6f8
SW
4956}
4957
53ded495
AE
4958static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4959 struct ceph_msg_header *hdr, int *skip)
4960{
4961 struct ceph_msg *msg;
4962 int type = (int) le16_to_cpu(hdr->type);
4963 int front_len = (int) le32_to_cpu(hdr->front_len);
4964
4965 if (con->in_msg)
4966 return con->in_msg;
4967
4968 *skip = 0;
4969 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4970 if (!msg) {
4971 pr_err("unable to allocate msg type %d len %d\n",
4972 type, front_len);
4973 return NULL;
4974 }
53ded495
AE
4975
4976 return msg;
4977}
4978
79dbd1ba 4979static int mds_sign_message(struct ceph_msg *msg)
33d07337 4980{
79dbd1ba 4981 struct ceph_mds_session *s = msg->con->private;
33d07337 4982 struct ceph_auth_handshake *auth = &s->s_auth;
79dbd1ba 4983
33d07337
YZ
4984 return ceph_auth_sign_message(auth, msg);
4985}
4986
79dbd1ba 4987static int mds_check_message_signature(struct ceph_msg *msg)
33d07337 4988{
79dbd1ba 4989 struct ceph_mds_session *s = msg->con->private;
33d07337 4990 struct ceph_auth_handshake *auth = &s->s_auth;
79dbd1ba 4991
33d07337
YZ
4992 return ceph_auth_check_message_signature(auth, msg);
4993}
4994
9e32789f 4995static const struct ceph_connection_operations mds_con_ops = {
2f2dc053
SW
4996 .get = con_get,
4997 .put = con_put,
4998 .dispatch = dispatch,
4e7a5dcd 4999 .get_authorizer = get_authorizer,
6daca13d 5000 .add_authorizer_challenge = add_authorizer_challenge,
4e7a5dcd 5001 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 5002 .invalidate_authorizer = invalidate_authorizer,
2f2dc053 5003 .peer_reset = peer_reset,
53ded495 5004 .alloc_msg = mds_alloc_msg,
79dbd1ba
ID
5005 .sign_message = mds_sign_message,
5006 .check_message_signature = mds_check_message_signature,
2f2dc053
SW
5007};
5008
2f2dc053 5009/* eof */