Merge tag 'sound-4.6-rc6' of git://git.kernel.org/pub/scm/linux/kernel/git/tiwai...
[linux-2.6-block.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/gfp.h>
7 #include <linux/sched.h>
8 #include <linux/debugfs.h>
9 #include <linux/seq_file.h>
10 #include <linux/utsname.h>
11 #include <linux/ratelimit.h>
12
13 #include "super.h"
14 #include "mds_client.h"
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/pagelist.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/debugfs.h>
22
23 /*
24  * A cluster of MDS (metadata server) daemons is responsible for
25  * managing the file system namespace (the directory hierarchy and
26  * inodes) and for coordinating shared access to storage.  Metadata is
27  * partitioning hierarchically across a number of servers, and that
28  * partition varies over time as the cluster adjusts the distribution
29  * in order to balance load.
30  *
31  * The MDS client is primarily responsible to managing synchronous
32  * metadata requests for operations like open, unlink, and so forth.
33  * If there is a MDS failure, we find out about it when we (possibly
34  * request and) receive a new MDS map, and can resubmit affected
35  * requests.
36  *
37  * For the most part, though, we take advantage of a lossless
38  * communications channel to the MDS, and do not need to worry about
39  * timing out or resubmitting requests.
40  *
41  * We maintain a stateful "session" with each MDS we interact with.
42  * Within each session, we sent periodic heartbeat messages to ensure
43  * any capabilities or leases we have been issues remain valid.  If
44  * the session times out and goes stale, our leases and capabilities
45  * are no longer valid.
46  */
47
48 struct ceph_reconnect_state {
49         int nr_caps;
50         struct ceph_pagelist *pagelist;
51         bool flock;
52 };
53
54 static void __wake_requests(struct ceph_mds_client *mdsc,
55                             struct list_head *head);
56
57 static const struct ceph_connection_operations mds_con_ops;
58
59
60 /*
61  * mds reply parsing
62  */
63
64 /*
65  * parse individual inode info
66  */
67 static int parse_reply_info_in(void **p, void *end,
68                                struct ceph_mds_reply_info_in *info,
69                                u64 features)
70 {
71         int err = -EIO;
72
73         info->in = *p;
74         *p += sizeof(struct ceph_mds_reply_inode) +
75                 sizeof(*info->in->fragtree.splits) *
76                 le32_to_cpu(info->in->fragtree.nsplits);
77
78         ceph_decode_32_safe(p, end, info->symlink_len, bad);
79         ceph_decode_need(p, end, info->symlink_len, bad);
80         info->symlink = *p;
81         *p += info->symlink_len;
82
83         if (features & CEPH_FEATURE_DIRLAYOUTHASH)
84                 ceph_decode_copy_safe(p, end, &info->dir_layout,
85                                       sizeof(info->dir_layout), bad);
86         else
87                 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
88
89         ceph_decode_32_safe(p, end, info->xattr_len, bad);
90         ceph_decode_need(p, end, info->xattr_len, bad);
91         info->xattr_data = *p;
92         *p += info->xattr_len;
93
94         if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
95                 ceph_decode_64_safe(p, end, info->inline_version, bad);
96                 ceph_decode_32_safe(p, end, info->inline_len, bad);
97                 ceph_decode_need(p, end, info->inline_len, bad);
98                 info->inline_data = *p;
99                 *p += info->inline_len;
100         } else
101                 info->inline_version = CEPH_INLINE_NONE;
102
103         if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
104                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
105                 ceph_decode_need(p, end, info->pool_ns_len, bad);
106                 *p += info->pool_ns_len;
107         } else {
108                 info->pool_ns_len = 0;
109         }
110
111         return 0;
112 bad:
113         return err;
114 }
115
116 /*
117  * parse a normal reply, which may contain a (dir+)dentry and/or a
118  * target inode.
119  */
120 static int parse_reply_info_trace(void **p, void *end,
121                                   struct ceph_mds_reply_info_parsed *info,
122                                   u64 features)
123 {
124         int err;
125
126         if (info->head->is_dentry) {
127                 err = parse_reply_info_in(p, end, &info->diri, features);
128                 if (err < 0)
129                         goto out_bad;
130
131                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
132                         goto bad;
133                 info->dirfrag = *p;
134                 *p += sizeof(*info->dirfrag) +
135                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
136                 if (unlikely(*p > end))
137                         goto bad;
138
139                 ceph_decode_32_safe(p, end, info->dname_len, bad);
140                 ceph_decode_need(p, end, info->dname_len, bad);
141                 info->dname = *p;
142                 *p += info->dname_len;
143                 info->dlease = *p;
144                 *p += sizeof(*info->dlease);
145         }
146
147         if (info->head->is_target) {
148                 err = parse_reply_info_in(p, end, &info->targeti, features);
149                 if (err < 0)
150                         goto out_bad;
151         }
152
153         if (unlikely(*p != end))
154                 goto bad;
155         return 0;
156
157 bad:
158         err = -EIO;
159 out_bad:
160         pr_err("problem parsing mds trace %d\n", err);
161         return err;
162 }
163
164 /*
165  * parse readdir results
166  */
167 static int parse_reply_info_dir(void **p, void *end,
168                                 struct ceph_mds_reply_info_parsed *info,
169                                 u64 features)
170 {
171         u32 num, i = 0;
172         int err;
173
174         info->dir_dir = *p;
175         if (*p + sizeof(*info->dir_dir) > end)
176                 goto bad;
177         *p += sizeof(*info->dir_dir) +
178                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
179         if (*p > end)
180                 goto bad;
181
182         ceph_decode_need(p, end, sizeof(num) + 2, bad);
183         num = ceph_decode_32(p);
184         info->dir_end = ceph_decode_8(p);
185         info->dir_complete = ceph_decode_8(p);
186         if (num == 0)
187                 goto done;
188
189         BUG_ON(!info->dir_in);
190         info->dir_dname = (void *)(info->dir_in + num);
191         info->dir_dname_len = (void *)(info->dir_dname + num);
192         info->dir_dlease = (void *)(info->dir_dname_len + num);
193         if ((unsigned long)(info->dir_dlease + num) >
194             (unsigned long)info->dir_in + info->dir_buf_size) {
195                 pr_err("dir contents are larger than expected\n");
196                 WARN_ON(1);
197                 goto bad;
198         }
199
200         info->dir_nr = num;
201         while (num) {
202                 /* dentry */
203                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
204                 info->dir_dname_len[i] = ceph_decode_32(p);
205                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
206                 info->dir_dname[i] = *p;
207                 *p += info->dir_dname_len[i];
208                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
209                      info->dir_dname[i]);
210                 info->dir_dlease[i] = *p;
211                 *p += sizeof(struct ceph_mds_reply_lease);
212
213                 /* inode */
214                 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
215                 if (err < 0)
216                         goto out_bad;
217                 i++;
218                 num--;
219         }
220
221 done:
222         if (*p != end)
223                 goto bad;
224         return 0;
225
226 bad:
227         err = -EIO;
228 out_bad:
229         pr_err("problem parsing dir contents %d\n", err);
230         return err;
231 }
232
233 /*
234  * parse fcntl F_GETLK results
235  */
236 static int parse_reply_info_filelock(void **p, void *end,
237                                      struct ceph_mds_reply_info_parsed *info,
238                                      u64 features)
239 {
240         if (*p + sizeof(*info->filelock_reply) > end)
241                 goto bad;
242
243         info->filelock_reply = *p;
244         *p += sizeof(*info->filelock_reply);
245
246         if (unlikely(*p != end))
247                 goto bad;
248         return 0;
249
250 bad:
251         return -EIO;
252 }
253
254 /*
255  * parse create results
256  */
257 static int parse_reply_info_create(void **p, void *end,
258                                   struct ceph_mds_reply_info_parsed *info,
259                                   u64 features)
260 {
261         if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
262                 if (*p == end) {
263                         info->has_create_ino = false;
264                 } else {
265                         info->has_create_ino = true;
266                         info->ino = ceph_decode_64(p);
267                 }
268         }
269
270         if (unlikely(*p != end))
271                 goto bad;
272         return 0;
273
274 bad:
275         return -EIO;
276 }
277
278 /*
279  * parse extra results
280  */
281 static int parse_reply_info_extra(void **p, void *end,
282                                   struct ceph_mds_reply_info_parsed *info,
283                                   u64 features)
284 {
285         if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
286                 return parse_reply_info_filelock(p, end, info, features);
287         else if (info->head->op == CEPH_MDS_OP_READDIR ||
288                  info->head->op == CEPH_MDS_OP_LSSNAP)
289                 return parse_reply_info_dir(p, end, info, features);
290         else if (info->head->op == CEPH_MDS_OP_CREATE)
291                 return parse_reply_info_create(p, end, info, features);
292         else
293                 return -EIO;
294 }
295
296 /*
297  * parse entire mds reply
298  */
299 static int parse_reply_info(struct ceph_msg *msg,
300                             struct ceph_mds_reply_info_parsed *info,
301                             u64 features)
302 {
303         void *p, *end;
304         u32 len;
305         int err;
306
307         info->head = msg->front.iov_base;
308         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
309         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
310
311         /* trace */
312         ceph_decode_32_safe(&p, end, len, bad);
313         if (len > 0) {
314                 ceph_decode_need(&p, end, len, bad);
315                 err = parse_reply_info_trace(&p, p+len, info, features);
316                 if (err < 0)
317                         goto out_bad;
318         }
319
320         /* extra */
321         ceph_decode_32_safe(&p, end, len, bad);
322         if (len > 0) {
323                 ceph_decode_need(&p, end, len, bad);
324                 err = parse_reply_info_extra(&p, p+len, info, features);
325                 if (err < 0)
326                         goto out_bad;
327         }
328
329         /* snap blob */
330         ceph_decode_32_safe(&p, end, len, bad);
331         info->snapblob_len = len;
332         info->snapblob = p;
333         p += len;
334
335         if (p != end)
336                 goto bad;
337         return 0;
338
339 bad:
340         err = -EIO;
341 out_bad:
342         pr_err("mds parse_reply err %d\n", err);
343         return err;
344 }
345
346 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
347 {
348         if (!info->dir_in)
349                 return;
350         free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
351 }
352
353
354 /*
355  * sessions
356  */
357 const char *ceph_session_state_name(int s)
358 {
359         switch (s) {
360         case CEPH_MDS_SESSION_NEW: return "new";
361         case CEPH_MDS_SESSION_OPENING: return "opening";
362         case CEPH_MDS_SESSION_OPEN: return "open";
363         case CEPH_MDS_SESSION_HUNG: return "hung";
364         case CEPH_MDS_SESSION_CLOSING: return "closing";
365         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
366         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
367         default: return "???";
368         }
369 }
370
371 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
372 {
373         if (atomic_inc_not_zero(&s->s_ref)) {
374                 dout("mdsc get_session %p %d -> %d\n", s,
375                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
376                 return s;
377         } else {
378                 dout("mdsc get_session %p 0 -- FAIL", s);
379                 return NULL;
380         }
381 }
382
383 void ceph_put_mds_session(struct ceph_mds_session *s)
384 {
385         dout("mdsc put_session %p %d -> %d\n", s,
386              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
387         if (atomic_dec_and_test(&s->s_ref)) {
388                 if (s->s_auth.authorizer)
389                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
390                 kfree(s);
391         }
392 }
393
394 /*
395  * called under mdsc->mutex
396  */
397 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
398                                                    int mds)
399 {
400         struct ceph_mds_session *session;
401
402         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
403                 return NULL;
404         session = mdsc->sessions[mds];
405         dout("lookup_mds_session %p %d\n", session,
406              atomic_read(&session->s_ref));
407         get_session(session);
408         return session;
409 }
410
411 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
412 {
413         if (mds >= mdsc->max_sessions)
414                 return false;
415         return mdsc->sessions[mds];
416 }
417
418 static int __verify_registered_session(struct ceph_mds_client *mdsc,
419                                        struct ceph_mds_session *s)
420 {
421         if (s->s_mds >= mdsc->max_sessions ||
422             mdsc->sessions[s->s_mds] != s)
423                 return -ENOENT;
424         return 0;
425 }
426
427 /*
428  * create+register a new session for given mds.
429  * called under mdsc->mutex.
430  */
431 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
432                                                  int mds)
433 {
434         struct ceph_mds_session *s;
435
436         if (mds >= mdsc->mdsmap->m_max_mds)
437                 return ERR_PTR(-EINVAL);
438
439         s = kzalloc(sizeof(*s), GFP_NOFS);
440         if (!s)
441                 return ERR_PTR(-ENOMEM);
442         s->s_mdsc = mdsc;
443         s->s_mds = mds;
444         s->s_state = CEPH_MDS_SESSION_NEW;
445         s->s_ttl = 0;
446         s->s_seq = 0;
447         mutex_init(&s->s_mutex);
448
449         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
450
451         spin_lock_init(&s->s_gen_ttl_lock);
452         s->s_cap_gen = 0;
453         s->s_cap_ttl = jiffies - 1;
454
455         spin_lock_init(&s->s_cap_lock);
456         s->s_renew_requested = 0;
457         s->s_renew_seq = 0;
458         INIT_LIST_HEAD(&s->s_caps);
459         s->s_nr_caps = 0;
460         s->s_trim_caps = 0;
461         atomic_set(&s->s_ref, 1);
462         INIT_LIST_HEAD(&s->s_waiting);
463         INIT_LIST_HEAD(&s->s_unsafe);
464         s->s_num_cap_releases = 0;
465         s->s_cap_reconnect = 0;
466         s->s_cap_iterator = NULL;
467         INIT_LIST_HEAD(&s->s_cap_releases);
468         INIT_LIST_HEAD(&s->s_cap_flushing);
469         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
470
471         dout("register_session mds%d\n", mds);
472         if (mds >= mdsc->max_sessions) {
473                 int newmax = 1 << get_count_order(mds+1);
474                 struct ceph_mds_session **sa;
475
476                 dout("register_session realloc to %d\n", newmax);
477                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
478                 if (sa == NULL)
479                         goto fail_realloc;
480                 if (mdsc->sessions) {
481                         memcpy(sa, mdsc->sessions,
482                                mdsc->max_sessions * sizeof(void *));
483                         kfree(mdsc->sessions);
484                 }
485                 mdsc->sessions = sa;
486                 mdsc->max_sessions = newmax;
487         }
488         mdsc->sessions[mds] = s;
489         atomic_inc(&mdsc->num_sessions);
490         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
491
492         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
493                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
494
495         return s;
496
497 fail_realloc:
498         kfree(s);
499         return ERR_PTR(-ENOMEM);
500 }
501
502 /*
503  * called under mdsc->mutex
504  */
505 static void __unregister_session(struct ceph_mds_client *mdsc,
506                                struct ceph_mds_session *s)
507 {
508         dout("__unregister_session mds%d %p\n", s->s_mds, s);
509         BUG_ON(mdsc->sessions[s->s_mds] != s);
510         mdsc->sessions[s->s_mds] = NULL;
511         ceph_con_close(&s->s_con);
512         ceph_put_mds_session(s);
513         atomic_dec(&mdsc->num_sessions);
514 }
515
516 /*
517  * drop session refs in request.
518  *
519  * should be last request ref, or hold mdsc->mutex
520  */
521 static void put_request_session(struct ceph_mds_request *req)
522 {
523         if (req->r_session) {
524                 ceph_put_mds_session(req->r_session);
525                 req->r_session = NULL;
526         }
527 }
528
529 void ceph_mdsc_release_request(struct kref *kref)
530 {
531         struct ceph_mds_request *req = container_of(kref,
532                                                     struct ceph_mds_request,
533                                                     r_kref);
534         destroy_reply_info(&req->r_reply_info);
535         if (req->r_request)
536                 ceph_msg_put(req->r_request);
537         if (req->r_reply)
538                 ceph_msg_put(req->r_reply);
539         if (req->r_inode) {
540                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
541                 iput(req->r_inode);
542         }
543         if (req->r_locked_dir)
544                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
545         iput(req->r_target_inode);
546         if (req->r_dentry)
547                 dput(req->r_dentry);
548         if (req->r_old_dentry)
549                 dput(req->r_old_dentry);
550         if (req->r_old_dentry_dir) {
551                 /*
552                  * track (and drop pins for) r_old_dentry_dir
553                  * separately, since r_old_dentry's d_parent may have
554                  * changed between the dir mutex being dropped and
555                  * this request being freed.
556                  */
557                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
558                                   CEPH_CAP_PIN);
559                 iput(req->r_old_dentry_dir);
560         }
561         kfree(req->r_path1);
562         kfree(req->r_path2);
563         if (req->r_pagelist)
564                 ceph_pagelist_release(req->r_pagelist);
565         put_request_session(req);
566         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
567         kfree(req);
568 }
569
570 /*
571  * lookup session, bump ref if found.
572  *
573  * called under mdsc->mutex.
574  */
575 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
576                                              u64 tid)
577 {
578         struct ceph_mds_request *req;
579         struct rb_node *n = mdsc->request_tree.rb_node;
580
581         while (n) {
582                 req = rb_entry(n, struct ceph_mds_request, r_node);
583                 if (tid < req->r_tid)
584                         n = n->rb_left;
585                 else if (tid > req->r_tid)
586                         n = n->rb_right;
587                 else {
588                         ceph_mdsc_get_request(req);
589                         return req;
590                 }
591         }
592         return NULL;
593 }
594
595 static void __insert_request(struct ceph_mds_client *mdsc,
596                              struct ceph_mds_request *new)
597 {
598         struct rb_node **p = &mdsc->request_tree.rb_node;
599         struct rb_node *parent = NULL;
600         struct ceph_mds_request *req = NULL;
601
602         while (*p) {
603                 parent = *p;
604                 req = rb_entry(parent, struct ceph_mds_request, r_node);
605                 if (new->r_tid < req->r_tid)
606                         p = &(*p)->rb_left;
607                 else if (new->r_tid > req->r_tid)
608                         p = &(*p)->rb_right;
609                 else
610                         BUG();
611         }
612
613         rb_link_node(&new->r_node, parent, p);
614         rb_insert_color(&new->r_node, &mdsc->request_tree);
615 }
616
617 /*
618  * Register an in-flight request, and assign a tid.  Link to directory
619  * are modifying (if any).
620  *
621  * Called under mdsc->mutex.
622  */
623 static void __register_request(struct ceph_mds_client *mdsc,
624                                struct ceph_mds_request *req,
625                                struct inode *dir)
626 {
627         req->r_tid = ++mdsc->last_tid;
628         if (req->r_num_caps)
629                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
630                                   req->r_num_caps);
631         dout("__register_request %p tid %lld\n", req, req->r_tid);
632         ceph_mdsc_get_request(req);
633         __insert_request(mdsc, req);
634
635         req->r_uid = current_fsuid();
636         req->r_gid = current_fsgid();
637
638         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
639                 mdsc->oldest_tid = req->r_tid;
640
641         if (dir) {
642                 ihold(dir);
643                 req->r_unsafe_dir = dir;
644         }
645 }
646
647 static void __unregister_request(struct ceph_mds_client *mdsc,
648                                  struct ceph_mds_request *req)
649 {
650         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
651
652         if (req->r_tid == mdsc->oldest_tid) {
653                 struct rb_node *p = rb_next(&req->r_node);
654                 mdsc->oldest_tid = 0;
655                 while (p) {
656                         struct ceph_mds_request *next_req =
657                                 rb_entry(p, struct ceph_mds_request, r_node);
658                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
659                                 mdsc->oldest_tid = next_req->r_tid;
660                                 break;
661                         }
662                         p = rb_next(p);
663                 }
664         }
665
666         rb_erase(&req->r_node, &mdsc->request_tree);
667         RB_CLEAR_NODE(&req->r_node);
668
669         if (req->r_unsafe_dir && req->r_got_unsafe) {
670                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
671                 spin_lock(&ci->i_unsafe_lock);
672                 list_del_init(&req->r_unsafe_dir_item);
673                 spin_unlock(&ci->i_unsafe_lock);
674         }
675         if (req->r_target_inode && req->r_got_unsafe) {
676                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
677                 spin_lock(&ci->i_unsafe_lock);
678                 list_del_init(&req->r_unsafe_target_item);
679                 spin_unlock(&ci->i_unsafe_lock);
680         }
681
682         if (req->r_unsafe_dir) {
683                 iput(req->r_unsafe_dir);
684                 req->r_unsafe_dir = NULL;
685         }
686
687         complete_all(&req->r_safe_completion);
688
689         ceph_mdsc_put_request(req);
690 }
691
692 /*
693  * Choose mds to send request to next.  If there is a hint set in the
694  * request (e.g., due to a prior forward hint from the mds), use that.
695  * Otherwise, consult frag tree and/or caps to identify the
696  * appropriate mds.  If all else fails, choose randomly.
697  *
698  * Called under mdsc->mutex.
699  */
700 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
701 {
702         /*
703          * we don't need to worry about protecting the d_parent access
704          * here because we never renaming inside the snapped namespace
705          * except to resplice to another snapdir, and either the old or new
706          * result is a valid result.
707          */
708         while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
709                 dentry = dentry->d_parent;
710         return dentry;
711 }
712
713 static int __choose_mds(struct ceph_mds_client *mdsc,
714                         struct ceph_mds_request *req)
715 {
716         struct inode *inode;
717         struct ceph_inode_info *ci;
718         struct ceph_cap *cap;
719         int mode = req->r_direct_mode;
720         int mds = -1;
721         u32 hash = req->r_direct_hash;
722         bool is_hash = req->r_direct_is_hash;
723
724         /*
725          * is there a specific mds we should try?  ignore hint if we have
726          * no session and the mds is not up (active or recovering).
727          */
728         if (req->r_resend_mds >= 0 &&
729             (__have_session(mdsc, req->r_resend_mds) ||
730              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
731                 dout("choose_mds using resend_mds mds%d\n",
732                      req->r_resend_mds);
733                 return req->r_resend_mds;
734         }
735
736         if (mode == USE_RANDOM_MDS)
737                 goto random;
738
739         inode = NULL;
740         if (req->r_inode) {
741                 inode = req->r_inode;
742         } else if (req->r_dentry) {
743                 /* ignore race with rename; old or new d_parent is okay */
744                 struct dentry *parent = req->r_dentry->d_parent;
745                 struct inode *dir = d_inode(parent);
746
747                 if (dir->i_sb != mdsc->fsc->sb) {
748                         /* not this fs! */
749                         inode = d_inode(req->r_dentry);
750                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
751                         /* direct snapped/virtual snapdir requests
752                          * based on parent dir inode */
753                         struct dentry *dn = get_nonsnap_parent(parent);
754                         inode = d_inode(dn);
755                         dout("__choose_mds using nonsnap parent %p\n", inode);
756                 } else {
757                         /* dentry target */
758                         inode = d_inode(req->r_dentry);
759                         if (!inode || mode == USE_AUTH_MDS) {
760                                 /* dir + name */
761                                 inode = dir;
762                                 hash = ceph_dentry_hash(dir, req->r_dentry);
763                                 is_hash = true;
764                         }
765                 }
766         }
767
768         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
769              (int)hash, mode);
770         if (!inode)
771                 goto random;
772         ci = ceph_inode(inode);
773
774         if (is_hash && S_ISDIR(inode->i_mode)) {
775                 struct ceph_inode_frag frag;
776                 int found;
777
778                 ceph_choose_frag(ci, hash, &frag, &found);
779                 if (found) {
780                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
781                                 u8 r;
782
783                                 /* choose a random replica */
784                                 get_random_bytes(&r, 1);
785                                 r %= frag.ndist;
786                                 mds = frag.dist[r];
787                                 dout("choose_mds %p %llx.%llx "
788                                      "frag %u mds%d (%d/%d)\n",
789                                      inode, ceph_vinop(inode),
790                                      frag.frag, mds,
791                                      (int)r, frag.ndist);
792                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
793                                     CEPH_MDS_STATE_ACTIVE)
794                                         return mds;
795                         }
796
797                         /* since this file/dir wasn't known to be
798                          * replicated, then we want to look for the
799                          * authoritative mds. */
800                         mode = USE_AUTH_MDS;
801                         if (frag.mds >= 0) {
802                                 /* choose auth mds */
803                                 mds = frag.mds;
804                                 dout("choose_mds %p %llx.%llx "
805                                      "frag %u mds%d (auth)\n",
806                                      inode, ceph_vinop(inode), frag.frag, mds);
807                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
808                                     CEPH_MDS_STATE_ACTIVE)
809                                         return mds;
810                         }
811                 }
812         }
813
814         spin_lock(&ci->i_ceph_lock);
815         cap = NULL;
816         if (mode == USE_AUTH_MDS)
817                 cap = ci->i_auth_cap;
818         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
819                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
820         if (!cap) {
821                 spin_unlock(&ci->i_ceph_lock);
822                 goto random;
823         }
824         mds = cap->session->s_mds;
825         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
826              inode, ceph_vinop(inode), mds,
827              cap == ci->i_auth_cap ? "auth " : "", cap);
828         spin_unlock(&ci->i_ceph_lock);
829         return mds;
830
831 random:
832         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
833         dout("choose_mds chose random mds%d\n", mds);
834         return mds;
835 }
836
837
838 /*
839  * session messages
840  */
841 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
842 {
843         struct ceph_msg *msg;
844         struct ceph_mds_session_head *h;
845
846         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
847                            false);
848         if (!msg) {
849                 pr_err("create_session_msg ENOMEM creating msg\n");
850                 return NULL;
851         }
852         h = msg->front.iov_base;
853         h->op = cpu_to_le32(op);
854         h->seq = cpu_to_le64(seq);
855
856         return msg;
857 }
858
859 /*
860  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
861  * to include additional client metadata fields.
862  */
863 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
864 {
865         struct ceph_msg *msg;
866         struct ceph_mds_session_head *h;
867         int i = -1;
868         int metadata_bytes = 0;
869         int metadata_key_count = 0;
870         struct ceph_options *opt = mdsc->fsc->client->options;
871         void *p;
872
873         const char* metadata[][2] = {
874                 {"hostname", utsname()->nodename},
875                 {"kernel_version", utsname()->release},
876                 {"entity_id", opt->name ? opt->name : ""},
877                 {NULL, NULL}
878         };
879
880         /* Calculate serialized length of metadata */
881         metadata_bytes = 4;  /* map length */
882         for (i = 0; metadata[i][0] != NULL; ++i) {
883                 metadata_bytes += 8 + strlen(metadata[i][0]) +
884                         strlen(metadata[i][1]);
885                 metadata_key_count++;
886         }
887
888         /* Allocate the message */
889         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
890                            GFP_NOFS, false);
891         if (!msg) {
892                 pr_err("create_session_msg ENOMEM creating msg\n");
893                 return NULL;
894         }
895         h = msg->front.iov_base;
896         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
897         h->seq = cpu_to_le64(seq);
898
899         /*
900          * Serialize client metadata into waiting buffer space, using
901          * the format that userspace expects for map<string, string>
902          *
903          * ClientSession messages with metadata are v2
904          */
905         msg->hdr.version = cpu_to_le16(2);
906         msg->hdr.compat_version = cpu_to_le16(1);
907
908         /* The write pointer, following the session_head structure */
909         p = msg->front.iov_base + sizeof(*h);
910
911         /* Number of entries in the map */
912         ceph_encode_32(&p, metadata_key_count);
913
914         /* Two length-prefixed strings for each entry in the map */
915         for (i = 0; metadata[i][0] != NULL; ++i) {
916                 size_t const key_len = strlen(metadata[i][0]);
917                 size_t const val_len = strlen(metadata[i][1]);
918
919                 ceph_encode_32(&p, key_len);
920                 memcpy(p, metadata[i][0], key_len);
921                 p += key_len;
922                 ceph_encode_32(&p, val_len);
923                 memcpy(p, metadata[i][1], val_len);
924                 p += val_len;
925         }
926
927         return msg;
928 }
929
930 /*
931  * send session open request.
932  *
933  * called under mdsc->mutex
934  */
935 static int __open_session(struct ceph_mds_client *mdsc,
936                           struct ceph_mds_session *session)
937 {
938         struct ceph_msg *msg;
939         int mstate;
940         int mds = session->s_mds;
941
942         /* wait for mds to go active? */
943         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
944         dout("open_session to mds%d (%s)\n", mds,
945              ceph_mds_state_name(mstate));
946         session->s_state = CEPH_MDS_SESSION_OPENING;
947         session->s_renew_requested = jiffies;
948
949         /* send connect message */
950         msg = create_session_open_msg(mdsc, session->s_seq);
951         if (!msg)
952                 return -ENOMEM;
953         ceph_con_send(&session->s_con, msg);
954         return 0;
955 }
956
957 /*
958  * open sessions for any export targets for the given mds
959  *
960  * called under mdsc->mutex
961  */
962 static struct ceph_mds_session *
963 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
964 {
965         struct ceph_mds_session *session;
966
967         session = __ceph_lookup_mds_session(mdsc, target);
968         if (!session) {
969                 session = register_session(mdsc, target);
970                 if (IS_ERR(session))
971                         return session;
972         }
973         if (session->s_state == CEPH_MDS_SESSION_NEW ||
974             session->s_state == CEPH_MDS_SESSION_CLOSING)
975                 __open_session(mdsc, session);
976
977         return session;
978 }
979
980 struct ceph_mds_session *
981 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
982 {
983         struct ceph_mds_session *session;
984
985         dout("open_export_target_session to mds%d\n", target);
986
987         mutex_lock(&mdsc->mutex);
988         session = __open_export_target_session(mdsc, target);
989         mutex_unlock(&mdsc->mutex);
990
991         return session;
992 }
993
994 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
995                                           struct ceph_mds_session *session)
996 {
997         struct ceph_mds_info *mi;
998         struct ceph_mds_session *ts;
999         int i, mds = session->s_mds;
1000
1001         if (mds >= mdsc->mdsmap->m_max_mds)
1002                 return;
1003
1004         mi = &mdsc->mdsmap->m_info[mds];
1005         dout("open_export_target_sessions for mds%d (%d targets)\n",
1006              session->s_mds, mi->num_export_targets);
1007
1008         for (i = 0; i < mi->num_export_targets; i++) {
1009                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1010                 if (!IS_ERR(ts))
1011                         ceph_put_mds_session(ts);
1012         }
1013 }
1014
1015 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1016                                            struct ceph_mds_session *session)
1017 {
1018         mutex_lock(&mdsc->mutex);
1019         __open_export_target_sessions(mdsc, session);
1020         mutex_unlock(&mdsc->mutex);
1021 }
1022
1023 /*
1024  * session caps
1025  */
1026
1027 /* caller holds s_cap_lock, we drop it */
1028 static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
1029                                  struct ceph_mds_session *session)
1030         __releases(session->s_cap_lock)
1031 {
1032         LIST_HEAD(tmp_list);
1033         list_splice_init(&session->s_cap_releases, &tmp_list);
1034         session->s_num_cap_releases = 0;
1035         spin_unlock(&session->s_cap_lock);
1036
1037         dout("cleanup_cap_releases mds%d\n", session->s_mds);
1038         while (!list_empty(&tmp_list)) {
1039                 struct ceph_cap *cap;
1040                 /* zero out the in-progress message */
1041                 cap = list_first_entry(&tmp_list,
1042                                         struct ceph_cap, session_caps);
1043                 list_del(&cap->session_caps);
1044                 ceph_put_cap(mdsc, cap);
1045         }
1046 }
1047
1048 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1049                                      struct ceph_mds_session *session)
1050 {
1051         struct ceph_mds_request *req;
1052         struct rb_node *p;
1053
1054         dout("cleanup_session_requests mds%d\n", session->s_mds);
1055         mutex_lock(&mdsc->mutex);
1056         while (!list_empty(&session->s_unsafe)) {
1057                 req = list_first_entry(&session->s_unsafe,
1058                                        struct ceph_mds_request, r_unsafe_item);
1059                 list_del_init(&req->r_unsafe_item);
1060                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1061                                     req->r_tid);
1062                 __unregister_request(mdsc, req);
1063         }
1064         /* zero r_attempts, so kick_requests() will re-send requests */
1065         p = rb_first(&mdsc->request_tree);
1066         while (p) {
1067                 req = rb_entry(p, struct ceph_mds_request, r_node);
1068                 p = rb_next(p);
1069                 if (req->r_session &&
1070                     req->r_session->s_mds == session->s_mds)
1071                         req->r_attempts = 0;
1072         }
1073         mutex_unlock(&mdsc->mutex);
1074 }
1075
1076 /*
1077  * Helper to safely iterate over all caps associated with a session, with
1078  * special care taken to handle a racing __ceph_remove_cap().
1079  *
1080  * Caller must hold session s_mutex.
1081  */
1082 static int iterate_session_caps(struct ceph_mds_session *session,
1083                                  int (*cb)(struct inode *, struct ceph_cap *,
1084                                             void *), void *arg)
1085 {
1086         struct list_head *p;
1087         struct ceph_cap *cap;
1088         struct inode *inode, *last_inode = NULL;
1089         struct ceph_cap *old_cap = NULL;
1090         int ret;
1091
1092         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1093         spin_lock(&session->s_cap_lock);
1094         p = session->s_caps.next;
1095         while (p != &session->s_caps) {
1096                 cap = list_entry(p, struct ceph_cap, session_caps);
1097                 inode = igrab(&cap->ci->vfs_inode);
1098                 if (!inode) {
1099                         p = p->next;
1100                         continue;
1101                 }
1102                 session->s_cap_iterator = cap;
1103                 spin_unlock(&session->s_cap_lock);
1104
1105                 if (last_inode) {
1106                         iput(last_inode);
1107                         last_inode = NULL;
1108                 }
1109                 if (old_cap) {
1110                         ceph_put_cap(session->s_mdsc, old_cap);
1111                         old_cap = NULL;
1112                 }
1113
1114                 ret = cb(inode, cap, arg);
1115                 last_inode = inode;
1116
1117                 spin_lock(&session->s_cap_lock);
1118                 p = p->next;
1119                 if (cap->ci == NULL) {
1120                         dout("iterate_session_caps  finishing cap %p removal\n",
1121                              cap);
1122                         BUG_ON(cap->session != session);
1123                         cap->session = NULL;
1124                         list_del_init(&cap->session_caps);
1125                         session->s_nr_caps--;
1126                         if (cap->queue_release) {
1127                                 list_add_tail(&cap->session_caps,
1128                                               &session->s_cap_releases);
1129                                 session->s_num_cap_releases++;
1130                         } else {
1131                                 old_cap = cap;  /* put_cap it w/o locks held */
1132                         }
1133                 }
1134                 if (ret < 0)
1135                         goto out;
1136         }
1137         ret = 0;
1138 out:
1139         session->s_cap_iterator = NULL;
1140         spin_unlock(&session->s_cap_lock);
1141
1142         iput(last_inode);
1143         if (old_cap)
1144                 ceph_put_cap(session->s_mdsc, old_cap);
1145
1146         return ret;
1147 }
1148
1149 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1150                                   void *arg)
1151 {
1152         struct ceph_inode_info *ci = ceph_inode(inode);
1153         LIST_HEAD(to_remove);
1154         int drop = 0;
1155
1156         dout("removing cap %p, ci is %p, inode is %p\n",
1157              cap, ci, &ci->vfs_inode);
1158         spin_lock(&ci->i_ceph_lock);
1159         __ceph_remove_cap(cap, false);
1160         if (!ci->i_auth_cap) {
1161                 struct ceph_cap_flush *cf;
1162                 struct ceph_mds_client *mdsc =
1163                         ceph_sb_to_client(inode->i_sb)->mdsc;
1164
1165                 while (true) {
1166                         struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
1167                         if (!n)
1168                                 break;
1169                         cf = rb_entry(n, struct ceph_cap_flush, i_node);
1170                         rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
1171                         list_add(&cf->list, &to_remove);
1172                 }
1173
1174                 spin_lock(&mdsc->cap_dirty_lock);
1175
1176                 list_for_each_entry(cf, &to_remove, list)
1177                         rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
1178
1179                 if (!list_empty(&ci->i_dirty_item)) {
1180                         pr_warn_ratelimited(
1181                                 " dropping dirty %s state for %p %lld\n",
1182                                 ceph_cap_string(ci->i_dirty_caps),
1183                                 inode, ceph_ino(inode));
1184                         ci->i_dirty_caps = 0;
1185                         list_del_init(&ci->i_dirty_item);
1186                         drop = 1;
1187                 }
1188                 if (!list_empty(&ci->i_flushing_item)) {
1189                         pr_warn_ratelimited(
1190                                 " dropping dirty+flushing %s state for %p %lld\n",
1191                                 ceph_cap_string(ci->i_flushing_caps),
1192                                 inode, ceph_ino(inode));
1193                         ci->i_flushing_caps = 0;
1194                         list_del_init(&ci->i_flushing_item);
1195                         mdsc->num_cap_flushing--;
1196                         drop = 1;
1197                 }
1198                 spin_unlock(&mdsc->cap_dirty_lock);
1199
1200                 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1201                         list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
1202                         ci->i_prealloc_cap_flush = NULL;
1203                 }
1204         }
1205         spin_unlock(&ci->i_ceph_lock);
1206         while (!list_empty(&to_remove)) {
1207                 struct ceph_cap_flush *cf;
1208                 cf = list_first_entry(&to_remove,
1209                                       struct ceph_cap_flush, list);
1210                 list_del(&cf->list);
1211                 ceph_free_cap_flush(cf);
1212         }
1213         while (drop--)
1214                 iput(inode);
1215         return 0;
1216 }
1217
1218 /*
1219  * caller must hold session s_mutex
1220  */
1221 static void remove_session_caps(struct ceph_mds_session *session)
1222 {
1223         dout("remove_session_caps on %p\n", session);
1224         iterate_session_caps(session, remove_session_caps_cb, NULL);
1225
1226         spin_lock(&session->s_cap_lock);
1227         if (session->s_nr_caps > 0) {
1228                 struct super_block *sb = session->s_mdsc->fsc->sb;
1229                 struct inode *inode;
1230                 struct ceph_cap *cap, *prev = NULL;
1231                 struct ceph_vino vino;
1232                 /*
1233                  * iterate_session_caps() skips inodes that are being
1234                  * deleted, we need to wait until deletions are complete.
1235                  * __wait_on_freeing_inode() is designed for the job,
1236                  * but it is not exported, so use lookup inode function
1237                  * to access it.
1238                  */
1239                 while (!list_empty(&session->s_caps)) {
1240                         cap = list_entry(session->s_caps.next,
1241                                          struct ceph_cap, session_caps);
1242                         if (cap == prev)
1243                                 break;
1244                         prev = cap;
1245                         vino = cap->ci->i_vino;
1246                         spin_unlock(&session->s_cap_lock);
1247
1248                         inode = ceph_find_inode(sb, vino);
1249                         iput(inode);
1250
1251                         spin_lock(&session->s_cap_lock);
1252                 }
1253         }
1254
1255         // drop cap expires and unlock s_cap_lock
1256         cleanup_cap_releases(session->s_mdsc, session);
1257
1258         BUG_ON(session->s_nr_caps > 0);
1259         BUG_ON(!list_empty(&session->s_cap_flushing));
1260 }
1261
1262 /*
1263  * wake up any threads waiting on this session's caps.  if the cap is
1264  * old (didn't get renewed on the client reconnect), remove it now.
1265  *
1266  * caller must hold s_mutex.
1267  */
1268 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1269                               void *arg)
1270 {
1271         struct ceph_inode_info *ci = ceph_inode(inode);
1272
1273         wake_up_all(&ci->i_cap_wq);
1274         if (arg) {
1275                 spin_lock(&ci->i_ceph_lock);
1276                 ci->i_wanted_max_size = 0;
1277                 ci->i_requested_max_size = 0;
1278                 spin_unlock(&ci->i_ceph_lock);
1279         }
1280         return 0;
1281 }
1282
1283 static void wake_up_session_caps(struct ceph_mds_session *session,
1284                                  int reconnect)
1285 {
1286         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1287         iterate_session_caps(session, wake_up_session_cb,
1288                              (void *)(unsigned long)reconnect);
1289 }
1290
1291 /*
1292  * Send periodic message to MDS renewing all currently held caps.  The
1293  * ack will reset the expiration for all caps from this session.
1294  *
1295  * caller holds s_mutex
1296  */
1297 static int send_renew_caps(struct ceph_mds_client *mdsc,
1298                            struct ceph_mds_session *session)
1299 {
1300         struct ceph_msg *msg;
1301         int state;
1302
1303         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1304             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1305                 pr_info("mds%d caps stale\n", session->s_mds);
1306         session->s_renew_requested = jiffies;
1307
1308         /* do not try to renew caps until a recovering mds has reconnected
1309          * with its clients. */
1310         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1311         if (state < CEPH_MDS_STATE_RECONNECT) {
1312                 dout("send_renew_caps ignoring mds%d (%s)\n",
1313                      session->s_mds, ceph_mds_state_name(state));
1314                 return 0;
1315         }
1316
1317         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1318                 ceph_mds_state_name(state));
1319         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1320                                  ++session->s_renew_seq);
1321         if (!msg)
1322                 return -ENOMEM;
1323         ceph_con_send(&session->s_con, msg);
1324         return 0;
1325 }
1326
1327 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1328                              struct ceph_mds_session *session, u64 seq)
1329 {
1330         struct ceph_msg *msg;
1331
1332         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1333              session->s_mds, ceph_session_state_name(session->s_state), seq);
1334         msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1335         if (!msg)
1336                 return -ENOMEM;
1337         ceph_con_send(&session->s_con, msg);
1338         return 0;
1339 }
1340
1341
1342 /*
1343  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1344  *
1345  * Called under session->s_mutex
1346  */
1347 static void renewed_caps(struct ceph_mds_client *mdsc,
1348                          struct ceph_mds_session *session, int is_renew)
1349 {
1350         int was_stale;
1351         int wake = 0;
1352
1353         spin_lock(&session->s_cap_lock);
1354         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1355
1356         session->s_cap_ttl = session->s_renew_requested +
1357                 mdsc->mdsmap->m_session_timeout*HZ;
1358
1359         if (was_stale) {
1360                 if (time_before(jiffies, session->s_cap_ttl)) {
1361                         pr_info("mds%d caps renewed\n", session->s_mds);
1362                         wake = 1;
1363                 } else {
1364                         pr_info("mds%d caps still stale\n", session->s_mds);
1365                 }
1366         }
1367         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1368              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1369              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1370         spin_unlock(&session->s_cap_lock);
1371
1372         if (wake)
1373                 wake_up_session_caps(session, 0);
1374 }
1375
1376 /*
1377  * send a session close request
1378  */
1379 static int request_close_session(struct ceph_mds_client *mdsc,
1380                                  struct ceph_mds_session *session)
1381 {
1382         struct ceph_msg *msg;
1383
1384         dout("request_close_session mds%d state %s seq %lld\n",
1385              session->s_mds, ceph_session_state_name(session->s_state),
1386              session->s_seq);
1387         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1388         if (!msg)
1389                 return -ENOMEM;
1390         ceph_con_send(&session->s_con, msg);
1391         return 0;
1392 }
1393
1394 /*
1395  * Called with s_mutex held.
1396  */
1397 static int __close_session(struct ceph_mds_client *mdsc,
1398                          struct ceph_mds_session *session)
1399 {
1400         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1401                 return 0;
1402         session->s_state = CEPH_MDS_SESSION_CLOSING;
1403         return request_close_session(mdsc, session);
1404 }
1405
1406 /*
1407  * Trim old(er) caps.
1408  *
1409  * Because we can't cache an inode without one or more caps, we do
1410  * this indirectly: if a cap is unused, we prune its aliases, at which
1411  * point the inode will hopefully get dropped to.
1412  *
1413  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1414  * memory pressure from the MDS, though, so it needn't be perfect.
1415  */
1416 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1417 {
1418         struct ceph_mds_session *session = arg;
1419         struct ceph_inode_info *ci = ceph_inode(inode);
1420         int used, wanted, oissued, mine;
1421
1422         if (session->s_trim_caps <= 0)
1423                 return -1;
1424
1425         spin_lock(&ci->i_ceph_lock);
1426         mine = cap->issued | cap->implemented;
1427         used = __ceph_caps_used(ci);
1428         wanted = __ceph_caps_file_wanted(ci);
1429         oissued = __ceph_caps_issued_other(ci, cap);
1430
1431         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1432              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1433              ceph_cap_string(used), ceph_cap_string(wanted));
1434         if (cap == ci->i_auth_cap) {
1435                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1436                     !list_empty(&ci->i_cap_snaps))
1437                         goto out;
1438                 if ((used | wanted) & CEPH_CAP_ANY_WR)
1439                         goto out;
1440         }
1441         /* The inode has cached pages, but it's no longer used.
1442          * we can safely drop it */
1443         if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1444             !(oissued & CEPH_CAP_FILE_CACHE)) {
1445           used = 0;
1446           oissued = 0;
1447         }
1448         if ((used | wanted) & ~oissued & mine)
1449                 goto out;   /* we need these caps */
1450
1451         session->s_trim_caps--;
1452         if (oissued) {
1453                 /* we aren't the only cap.. just remove us */
1454                 __ceph_remove_cap(cap, true);
1455         } else {
1456                 /* try dropping referring dentries */
1457                 spin_unlock(&ci->i_ceph_lock);
1458                 d_prune_aliases(inode);
1459                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1460                      inode, cap, atomic_read(&inode->i_count));
1461                 return 0;
1462         }
1463
1464 out:
1465         spin_unlock(&ci->i_ceph_lock);
1466         return 0;
1467 }
1468
1469 /*
1470  * Trim session cap count down to some max number.
1471  */
1472 static int trim_caps(struct ceph_mds_client *mdsc,
1473                      struct ceph_mds_session *session,
1474                      int max_caps)
1475 {
1476         int trim_caps = session->s_nr_caps - max_caps;
1477
1478         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1479              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1480         if (trim_caps > 0) {
1481                 session->s_trim_caps = trim_caps;
1482                 iterate_session_caps(session, trim_caps_cb, session);
1483                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1484                      session->s_mds, session->s_nr_caps, max_caps,
1485                         trim_caps - session->s_trim_caps);
1486                 session->s_trim_caps = 0;
1487         }
1488
1489         ceph_send_cap_releases(mdsc, session);
1490         return 0;
1491 }
1492
1493 static int check_capsnap_flush(struct ceph_inode_info *ci,
1494                                u64 want_snap_seq)
1495 {
1496         int ret = 1;
1497         spin_lock(&ci->i_ceph_lock);
1498         if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
1499                 struct ceph_cap_snap *capsnap =
1500                         list_first_entry(&ci->i_cap_snaps,
1501                                          struct ceph_cap_snap, ci_item);
1502                 ret = capsnap->follows >= want_snap_seq;
1503         }
1504         spin_unlock(&ci->i_ceph_lock);
1505         return ret;
1506 }
1507
1508 static int check_caps_flush(struct ceph_mds_client *mdsc,
1509                             u64 want_flush_tid)
1510 {
1511         struct rb_node *n;
1512         struct ceph_cap_flush *cf;
1513         int ret = 1;
1514
1515         spin_lock(&mdsc->cap_dirty_lock);
1516         n = rb_first(&mdsc->cap_flush_tree);
1517         cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
1518         if (cf && cf->tid <= want_flush_tid) {
1519                 dout("check_caps_flush still flushing tid %llu <= %llu\n",
1520                      cf->tid, want_flush_tid);
1521                 ret = 0;
1522         }
1523         spin_unlock(&mdsc->cap_dirty_lock);
1524         return ret;
1525 }
1526
1527 /*
1528  * flush all dirty inode data to disk.
1529  *
1530  * returns true if we've flushed through want_flush_tid
1531  */
1532 static void wait_caps_flush(struct ceph_mds_client *mdsc,
1533                             u64 want_flush_tid, u64 want_snap_seq)
1534 {
1535         int mds;
1536
1537         dout("check_caps_flush want %llu snap want %llu\n",
1538              want_flush_tid, want_snap_seq);
1539         mutex_lock(&mdsc->mutex);
1540         for (mds = 0; mds < mdsc->max_sessions; ) {
1541                 struct ceph_mds_session *session = mdsc->sessions[mds];
1542                 struct inode *inode = NULL;
1543
1544                 if (!session) {
1545                         mds++;
1546                         continue;
1547                 }
1548                 get_session(session);
1549                 mutex_unlock(&mdsc->mutex);
1550
1551                 mutex_lock(&session->s_mutex);
1552                 if (!list_empty(&session->s_cap_snaps_flushing)) {
1553                         struct ceph_cap_snap *capsnap =
1554                                 list_first_entry(&session->s_cap_snaps_flushing,
1555                                                  struct ceph_cap_snap,
1556                                                  flushing_item);
1557                         struct ceph_inode_info *ci = capsnap->ci;
1558                         if (!check_capsnap_flush(ci, want_snap_seq)) {
1559                                 dout("check_cap_flush still flushing snap %p "
1560                                      "follows %lld <= %lld to mds%d\n",
1561                                      &ci->vfs_inode, capsnap->follows,
1562                                      want_snap_seq, mds);
1563                                 inode = igrab(&ci->vfs_inode);
1564                         }
1565                 }
1566                 mutex_unlock(&session->s_mutex);
1567                 ceph_put_mds_session(session);
1568
1569                 if (inode) {
1570                         wait_event(mdsc->cap_flushing_wq,
1571                                    check_capsnap_flush(ceph_inode(inode),
1572                                                        want_snap_seq));
1573                         iput(inode);
1574                 } else {
1575                         mds++;
1576                 }
1577
1578                 mutex_lock(&mdsc->mutex);
1579         }
1580         mutex_unlock(&mdsc->mutex);
1581
1582         wait_event(mdsc->cap_flushing_wq,
1583                    check_caps_flush(mdsc, want_flush_tid));
1584
1585         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1586 }
1587
1588 /*
1589  * called under s_mutex
1590  */
1591 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1592                             struct ceph_mds_session *session)
1593 {
1594         struct ceph_msg *msg = NULL;
1595         struct ceph_mds_cap_release *head;
1596         struct ceph_mds_cap_item *item;
1597         struct ceph_cap *cap;
1598         LIST_HEAD(tmp_list);
1599         int num_cap_releases;
1600
1601         spin_lock(&session->s_cap_lock);
1602 again:
1603         list_splice_init(&session->s_cap_releases, &tmp_list);
1604         num_cap_releases = session->s_num_cap_releases;
1605         session->s_num_cap_releases = 0;
1606         spin_unlock(&session->s_cap_lock);
1607
1608         while (!list_empty(&tmp_list)) {
1609                 if (!msg) {
1610                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1611                                         PAGE_SIZE, GFP_NOFS, false);
1612                         if (!msg)
1613                                 goto out_err;
1614                         head = msg->front.iov_base;
1615                         head->num = cpu_to_le32(0);
1616                         msg->front.iov_len = sizeof(*head);
1617                 }
1618                 cap = list_first_entry(&tmp_list, struct ceph_cap,
1619                                         session_caps);
1620                 list_del(&cap->session_caps);
1621                 num_cap_releases--;
1622
1623                 head = msg->front.iov_base;
1624                 le32_add_cpu(&head->num, 1);
1625                 item = msg->front.iov_base + msg->front.iov_len;
1626                 item->ino = cpu_to_le64(cap->cap_ino);
1627                 item->cap_id = cpu_to_le64(cap->cap_id);
1628                 item->migrate_seq = cpu_to_le32(cap->mseq);
1629                 item->seq = cpu_to_le32(cap->issue_seq);
1630                 msg->front.iov_len += sizeof(*item);
1631
1632                 ceph_put_cap(mdsc, cap);
1633
1634                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1635                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1636                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1637                         ceph_con_send(&session->s_con, msg);
1638                         msg = NULL;
1639                 }
1640         }
1641
1642         BUG_ON(num_cap_releases != 0);
1643
1644         spin_lock(&session->s_cap_lock);
1645         if (!list_empty(&session->s_cap_releases))
1646                 goto again;
1647         spin_unlock(&session->s_cap_lock);
1648
1649         if (msg) {
1650                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1651                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1652                 ceph_con_send(&session->s_con, msg);
1653         }
1654         return;
1655 out_err:
1656         pr_err("send_cap_releases mds%d, failed to allocate message\n",
1657                 session->s_mds);
1658         spin_lock(&session->s_cap_lock);
1659         list_splice(&tmp_list, &session->s_cap_releases);
1660         session->s_num_cap_releases += num_cap_releases;
1661         spin_unlock(&session->s_cap_lock);
1662 }
1663
1664 /*
1665  * requests
1666  */
1667
1668 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1669                                     struct inode *dir)
1670 {
1671         struct ceph_inode_info *ci = ceph_inode(dir);
1672         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1673         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1674         size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
1675                       sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1676         int order, num_entries;
1677
1678         spin_lock(&ci->i_ceph_lock);
1679         num_entries = ci->i_files + ci->i_subdirs;
1680         spin_unlock(&ci->i_ceph_lock);
1681         num_entries = max(num_entries, 1);
1682         num_entries = min(num_entries, opt->max_readdir);
1683
1684         order = get_order(size * num_entries);
1685         while (order >= 0) {
1686                 rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
1687                                                         __GFP_NOWARN,
1688                                                         order);
1689                 if (rinfo->dir_in)
1690                         break;
1691                 order--;
1692         }
1693         if (!rinfo->dir_in)
1694                 return -ENOMEM;
1695
1696         num_entries = (PAGE_SIZE << order) / size;
1697         num_entries = min(num_entries, opt->max_readdir);
1698
1699         rinfo->dir_buf_size = PAGE_SIZE << order;
1700         req->r_num_caps = num_entries + 1;
1701         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1702         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1703         return 0;
1704 }
1705
1706 /*
1707  * Create an mds request.
1708  */
1709 struct ceph_mds_request *
1710 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1711 {
1712         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1713
1714         if (!req)
1715                 return ERR_PTR(-ENOMEM);
1716
1717         mutex_init(&req->r_fill_mutex);
1718         req->r_mdsc = mdsc;
1719         req->r_started = jiffies;
1720         req->r_resend_mds = -1;
1721         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1722         INIT_LIST_HEAD(&req->r_unsafe_target_item);
1723         req->r_fmode = -1;
1724         kref_init(&req->r_kref);
1725         INIT_LIST_HEAD(&req->r_wait);
1726         init_completion(&req->r_completion);
1727         init_completion(&req->r_safe_completion);
1728         INIT_LIST_HEAD(&req->r_unsafe_item);
1729
1730         req->r_stamp = current_fs_time(mdsc->fsc->sb);
1731
1732         req->r_op = op;
1733         req->r_direct_mode = mode;
1734         return req;
1735 }
1736
1737 /*
1738  * return oldest (lowest) request, tid in request tree, 0 if none.
1739  *
1740  * called under mdsc->mutex.
1741  */
1742 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1743 {
1744         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1745                 return NULL;
1746         return rb_entry(rb_first(&mdsc->request_tree),
1747                         struct ceph_mds_request, r_node);
1748 }
1749
1750 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1751 {
1752         return mdsc->oldest_tid;
1753 }
1754
1755 /*
1756  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1757  * on build_path_from_dentry in fs/cifs/dir.c.
1758  *
1759  * If @stop_on_nosnap, generate path relative to the first non-snapped
1760  * inode.
1761  *
1762  * Encode hidden .snap dirs as a double /, i.e.
1763  *   foo/.snap/bar -> foo//bar
1764  */
1765 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1766                            int stop_on_nosnap)
1767 {
1768         struct dentry *temp;
1769         char *path;
1770         int len, pos;
1771         unsigned seq;
1772
1773         if (dentry == NULL)
1774                 return ERR_PTR(-EINVAL);
1775
1776 retry:
1777         len = 0;
1778         seq = read_seqbegin(&rename_lock);
1779         rcu_read_lock();
1780         for (temp = dentry; !IS_ROOT(temp);) {
1781                 struct inode *inode = d_inode(temp);
1782                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1783                         len++;  /* slash only */
1784                 else if (stop_on_nosnap && inode &&
1785                          ceph_snap(inode) == CEPH_NOSNAP)
1786                         break;
1787                 else
1788                         len += 1 + temp->d_name.len;
1789                 temp = temp->d_parent;
1790         }
1791         rcu_read_unlock();
1792         if (len)
1793                 len--;  /* no leading '/' */
1794
1795         path = kmalloc(len+1, GFP_NOFS);
1796         if (path == NULL)
1797                 return ERR_PTR(-ENOMEM);
1798         pos = len;
1799         path[pos] = 0;  /* trailing null */
1800         rcu_read_lock();
1801         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1802                 struct inode *inode;
1803
1804                 spin_lock(&temp->d_lock);
1805                 inode = d_inode(temp);
1806                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1807                         dout("build_path path+%d: %p SNAPDIR\n",
1808                              pos, temp);
1809                 } else if (stop_on_nosnap && inode &&
1810                            ceph_snap(inode) == CEPH_NOSNAP) {
1811                         spin_unlock(&temp->d_lock);
1812                         break;
1813                 } else {
1814                         pos -= temp->d_name.len;
1815                         if (pos < 0) {
1816                                 spin_unlock(&temp->d_lock);
1817                                 break;
1818                         }
1819                         strncpy(path + pos, temp->d_name.name,
1820                                 temp->d_name.len);
1821                 }
1822                 spin_unlock(&temp->d_lock);
1823                 if (pos)
1824                         path[--pos] = '/';
1825                 temp = temp->d_parent;
1826         }
1827         rcu_read_unlock();
1828         if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1829                 pr_err("build_path did not end path lookup where "
1830                        "expected, namelen is %d, pos is %d\n", len, pos);
1831                 /* presumably this is only possible if racing with a
1832                    rename of one of the parent directories (we can not
1833                    lock the dentries above us to prevent this, but
1834                    retrying should be harmless) */
1835                 kfree(path);
1836                 goto retry;
1837         }
1838
1839         *base = ceph_ino(d_inode(temp));
1840         *plen = len;
1841         dout("build_path on %p %d built %llx '%.*s'\n",
1842              dentry, d_count(dentry), *base, len, path);
1843         return path;
1844 }
1845
1846 static int build_dentry_path(struct dentry *dentry,
1847                              const char **ppath, int *ppathlen, u64 *pino,
1848                              int *pfreepath)
1849 {
1850         char *path;
1851
1852         if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) {
1853                 *pino = ceph_ino(d_inode(dentry->d_parent));
1854                 *ppath = dentry->d_name.name;
1855                 *ppathlen = dentry->d_name.len;
1856                 return 0;
1857         }
1858         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1859         if (IS_ERR(path))
1860                 return PTR_ERR(path);
1861         *ppath = path;
1862         *pfreepath = 1;
1863         return 0;
1864 }
1865
1866 static int build_inode_path(struct inode *inode,
1867                             const char **ppath, int *ppathlen, u64 *pino,
1868                             int *pfreepath)
1869 {
1870         struct dentry *dentry;
1871         char *path;
1872
1873         if (ceph_snap(inode) == CEPH_NOSNAP) {
1874                 *pino = ceph_ino(inode);
1875                 *ppathlen = 0;
1876                 return 0;
1877         }
1878         dentry = d_find_alias(inode);
1879         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1880         dput(dentry);
1881         if (IS_ERR(path))
1882                 return PTR_ERR(path);
1883         *ppath = path;
1884         *pfreepath = 1;
1885         return 0;
1886 }
1887
1888 /*
1889  * request arguments may be specified via an inode *, a dentry *, or
1890  * an explicit ino+path.
1891  */
1892 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1893                                   const char *rpath, u64 rino,
1894                                   const char **ppath, int *pathlen,
1895                                   u64 *ino, int *freepath)
1896 {
1897         int r = 0;
1898
1899         if (rinode) {
1900                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1901                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1902                      ceph_snap(rinode));
1903         } else if (rdentry) {
1904                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1905                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1906                      *ppath);
1907         } else if (rpath || rino) {
1908                 *ino = rino;
1909                 *ppath = rpath;
1910                 *pathlen = rpath ? strlen(rpath) : 0;
1911                 dout(" path %.*s\n", *pathlen, rpath);
1912         }
1913
1914         return r;
1915 }
1916
1917 /*
1918  * called under mdsc->mutex
1919  */
1920 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1921                                                struct ceph_mds_request *req,
1922                                                int mds, bool drop_cap_releases)
1923 {
1924         struct ceph_msg *msg;
1925         struct ceph_mds_request_head *head;
1926         const char *path1 = NULL;
1927         const char *path2 = NULL;
1928         u64 ino1 = 0, ino2 = 0;
1929         int pathlen1 = 0, pathlen2 = 0;
1930         int freepath1 = 0, freepath2 = 0;
1931         int len;
1932         u16 releases;
1933         void *p, *end;
1934         int ret;
1935
1936         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1937                               req->r_path1, req->r_ino1.ino,
1938                               &path1, &pathlen1, &ino1, &freepath1);
1939         if (ret < 0) {
1940                 msg = ERR_PTR(ret);
1941                 goto out;
1942         }
1943
1944         ret = set_request_path_attr(NULL, req->r_old_dentry,
1945                               req->r_path2, req->r_ino2.ino,
1946                               &path2, &pathlen2, &ino2, &freepath2);
1947         if (ret < 0) {
1948                 msg = ERR_PTR(ret);
1949                 goto out_free1;
1950         }
1951
1952         len = sizeof(*head) +
1953                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1954                 sizeof(struct ceph_timespec);
1955
1956         /* calculate (max) length for cap releases */
1957         len += sizeof(struct ceph_mds_request_release) *
1958                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1959                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1960         if (req->r_dentry_drop)
1961                 len += req->r_dentry->d_name.len;
1962         if (req->r_old_dentry_drop)
1963                 len += req->r_old_dentry->d_name.len;
1964
1965         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1966         if (!msg) {
1967                 msg = ERR_PTR(-ENOMEM);
1968                 goto out_free2;
1969         }
1970
1971         msg->hdr.version = cpu_to_le16(2);
1972         msg->hdr.tid = cpu_to_le64(req->r_tid);
1973
1974         head = msg->front.iov_base;
1975         p = msg->front.iov_base + sizeof(*head);
1976         end = msg->front.iov_base + msg->front.iov_len;
1977
1978         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1979         head->op = cpu_to_le32(req->r_op);
1980         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1981         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1982         head->args = req->r_args;
1983
1984         ceph_encode_filepath(&p, end, ino1, path1);
1985         ceph_encode_filepath(&p, end, ino2, path2);
1986
1987         /* make note of release offset, in case we need to replay */
1988         req->r_request_release_offset = p - msg->front.iov_base;
1989
1990         /* cap releases */
1991         releases = 0;
1992         if (req->r_inode_drop)
1993                 releases += ceph_encode_inode_release(&p,
1994                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
1995                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1996         if (req->r_dentry_drop)
1997                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1998                        mds, req->r_dentry_drop, req->r_dentry_unless);
1999         if (req->r_old_dentry_drop)
2000                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2001                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
2002         if (req->r_old_inode_drop)
2003                 releases += ceph_encode_inode_release(&p,
2004                       d_inode(req->r_old_dentry),
2005                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2006
2007         if (drop_cap_releases) {
2008                 releases = 0;
2009                 p = msg->front.iov_base + req->r_request_release_offset;
2010         }
2011
2012         head->num_releases = cpu_to_le16(releases);
2013
2014         /* time stamp */
2015         {
2016                 struct ceph_timespec ts;
2017                 ceph_encode_timespec(&ts, &req->r_stamp);
2018                 ceph_encode_copy(&p, &ts, sizeof(ts));
2019         }
2020
2021         BUG_ON(p > end);
2022         msg->front.iov_len = p - msg->front.iov_base;
2023         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2024
2025         if (req->r_pagelist) {
2026                 struct ceph_pagelist *pagelist = req->r_pagelist;
2027                 atomic_inc(&pagelist->refcnt);
2028                 ceph_msg_data_add_pagelist(msg, pagelist);
2029                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2030         } else {
2031                 msg->hdr.data_len = 0;
2032         }
2033
2034         msg->hdr.data_off = cpu_to_le16(0);
2035
2036 out_free2:
2037         if (freepath2)
2038                 kfree((char *)path2);
2039 out_free1:
2040         if (freepath1)
2041                 kfree((char *)path1);
2042 out:
2043         return msg;
2044 }
2045
2046 /*
2047  * called under mdsc->mutex if error, under no mutex if
2048  * success.
2049  */
2050 static void complete_request(struct ceph_mds_client *mdsc,
2051                              struct ceph_mds_request *req)
2052 {
2053         if (req->r_callback)
2054                 req->r_callback(mdsc, req);
2055         else
2056                 complete_all(&req->r_completion);
2057 }
2058
2059 /*
2060  * called under mdsc->mutex
2061  */
2062 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2063                                   struct ceph_mds_request *req,
2064                                   int mds, bool drop_cap_releases)
2065 {
2066         struct ceph_mds_request_head *rhead;
2067         struct ceph_msg *msg;
2068         int flags = 0;
2069
2070         req->r_attempts++;
2071         if (req->r_inode) {
2072                 struct ceph_cap *cap =
2073                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2074
2075                 if (cap)
2076                         req->r_sent_on_mseq = cap->mseq;
2077                 else
2078                         req->r_sent_on_mseq = -1;
2079         }
2080         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2081              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2082
2083         if (req->r_got_unsafe) {
2084                 void *p;
2085                 /*
2086                  * Replay.  Do not regenerate message (and rebuild
2087                  * paths, etc.); just use the original message.
2088                  * Rebuilding paths will break for renames because
2089                  * d_move mangles the src name.
2090                  */
2091                 msg = req->r_request;
2092                 rhead = msg->front.iov_base;
2093
2094                 flags = le32_to_cpu(rhead->flags);
2095                 flags |= CEPH_MDS_FLAG_REPLAY;
2096                 rhead->flags = cpu_to_le32(flags);
2097
2098                 if (req->r_target_inode)
2099                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2100
2101                 rhead->num_retry = req->r_attempts - 1;
2102
2103                 /* remove cap/dentry releases from message */
2104                 rhead->num_releases = 0;
2105
2106                 /* time stamp */
2107                 p = msg->front.iov_base + req->r_request_release_offset;
2108                 {
2109                         struct ceph_timespec ts;
2110                         ceph_encode_timespec(&ts, &req->r_stamp);
2111                         ceph_encode_copy(&p, &ts, sizeof(ts));
2112                 }
2113
2114                 msg->front.iov_len = p - msg->front.iov_base;
2115                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2116                 return 0;
2117         }
2118
2119         if (req->r_request) {
2120                 ceph_msg_put(req->r_request);
2121                 req->r_request = NULL;
2122         }
2123         msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2124         if (IS_ERR(msg)) {
2125                 req->r_err = PTR_ERR(msg);
2126                 return PTR_ERR(msg);
2127         }
2128         req->r_request = msg;
2129
2130         rhead = msg->front.iov_base;
2131         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2132         if (req->r_got_unsafe)
2133                 flags |= CEPH_MDS_FLAG_REPLAY;
2134         if (req->r_locked_dir)
2135                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2136         rhead->flags = cpu_to_le32(flags);
2137         rhead->num_fwd = req->r_num_fwd;
2138         rhead->num_retry = req->r_attempts - 1;
2139         rhead->ino = 0;
2140
2141         dout(" r_locked_dir = %p\n", req->r_locked_dir);
2142         return 0;
2143 }
2144
2145 /*
2146  * send request, or put it on the appropriate wait list.
2147  */
2148 static int __do_request(struct ceph_mds_client *mdsc,
2149                         struct ceph_mds_request *req)
2150 {
2151         struct ceph_mds_session *session = NULL;
2152         int mds = -1;
2153         int err = 0;
2154
2155         if (req->r_err || req->r_got_result) {
2156                 if (req->r_aborted)
2157                         __unregister_request(mdsc, req);
2158                 goto out;
2159         }
2160
2161         if (req->r_timeout &&
2162             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2163                 dout("do_request timed out\n");
2164                 err = -EIO;
2165                 goto finish;
2166         }
2167         if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2168                 dout("do_request forced umount\n");
2169                 err = -EIO;
2170                 goto finish;
2171         }
2172
2173         put_request_session(req);
2174
2175         mds = __choose_mds(mdsc, req);
2176         if (mds < 0 ||
2177             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2178                 dout("do_request no mds or not active, waiting for map\n");
2179                 list_add(&req->r_wait, &mdsc->waiting_for_map);
2180                 goto out;
2181         }
2182
2183         /* get, open session */
2184         session = __ceph_lookup_mds_session(mdsc, mds);
2185         if (!session) {
2186                 session = register_session(mdsc, mds);
2187                 if (IS_ERR(session)) {
2188                         err = PTR_ERR(session);
2189                         goto finish;
2190                 }
2191         }
2192         req->r_session = get_session(session);
2193
2194         dout("do_request mds%d session %p state %s\n", mds, session,
2195              ceph_session_state_name(session->s_state));
2196         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2197             session->s_state != CEPH_MDS_SESSION_HUNG) {
2198                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2199                     session->s_state == CEPH_MDS_SESSION_CLOSING)
2200                         __open_session(mdsc, session);
2201                 list_add(&req->r_wait, &session->s_waiting);
2202                 goto out_session;
2203         }
2204
2205         /* send request */
2206         req->r_resend_mds = -1;   /* forget any previous mds hint */
2207
2208         if (req->r_request_started == 0)   /* note request start time */
2209                 req->r_request_started = jiffies;
2210
2211         err = __prepare_send_request(mdsc, req, mds, false);
2212         if (!err) {
2213                 ceph_msg_get(req->r_request);
2214                 ceph_con_send(&session->s_con, req->r_request);
2215         }
2216
2217 out_session:
2218         ceph_put_mds_session(session);
2219 finish:
2220         if (err) {
2221                 dout("__do_request early error %d\n", err);
2222                 req->r_err = err;
2223                 complete_request(mdsc, req);
2224                 __unregister_request(mdsc, req);
2225         }
2226 out:
2227         return err;
2228 }
2229
2230 /*
2231  * called under mdsc->mutex
2232  */
2233 static void __wake_requests(struct ceph_mds_client *mdsc,
2234                             struct list_head *head)
2235 {
2236         struct ceph_mds_request *req;
2237         LIST_HEAD(tmp_list);
2238
2239         list_splice_init(head, &tmp_list);
2240
2241         while (!list_empty(&tmp_list)) {
2242                 req = list_entry(tmp_list.next,
2243                                  struct ceph_mds_request, r_wait);
2244                 list_del_init(&req->r_wait);
2245                 dout(" wake request %p tid %llu\n", req, req->r_tid);
2246                 __do_request(mdsc, req);
2247         }
2248 }
2249
2250 /*
2251  * Wake up threads with requests pending for @mds, so that they can
2252  * resubmit their requests to a possibly different mds.
2253  */
2254 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2255 {
2256         struct ceph_mds_request *req;
2257         struct rb_node *p = rb_first(&mdsc->request_tree);
2258
2259         dout("kick_requests mds%d\n", mds);
2260         while (p) {
2261                 req = rb_entry(p, struct ceph_mds_request, r_node);
2262                 p = rb_next(p);
2263                 if (req->r_got_unsafe)
2264                         continue;
2265                 if (req->r_attempts > 0)
2266                         continue; /* only new requests */
2267                 if (req->r_session &&
2268                     req->r_session->s_mds == mds) {
2269                         dout(" kicking tid %llu\n", req->r_tid);
2270                         list_del_init(&req->r_wait);
2271                         __do_request(mdsc, req);
2272                 }
2273         }
2274 }
2275
2276 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2277                               struct ceph_mds_request *req)
2278 {
2279         dout("submit_request on %p\n", req);
2280         mutex_lock(&mdsc->mutex);
2281         __register_request(mdsc, req, NULL);
2282         __do_request(mdsc, req);
2283         mutex_unlock(&mdsc->mutex);
2284 }
2285
2286 /*
2287  * Synchrously perform an mds request.  Take care of all of the
2288  * session setup, forwarding, retry details.
2289  */
2290 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2291                          struct inode *dir,
2292                          struct ceph_mds_request *req)
2293 {
2294         int err;
2295
2296         dout("do_request on %p\n", req);
2297
2298         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
2299         if (req->r_inode)
2300                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2301         if (req->r_locked_dir)
2302                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2303         if (req->r_old_dentry_dir)
2304                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2305                                   CEPH_CAP_PIN);
2306
2307         /* deny access to directories with pool_ns layouts */
2308         if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
2309             ceph_inode(req->r_inode)->i_pool_ns_len)
2310                 return -EIO;
2311         if (req->r_locked_dir &&
2312             ceph_inode(req->r_locked_dir)->i_pool_ns_len)
2313                 return -EIO;
2314
2315         /* issue */
2316         mutex_lock(&mdsc->mutex);
2317         __register_request(mdsc, req, dir);
2318         __do_request(mdsc, req);
2319
2320         if (req->r_err) {
2321                 err = req->r_err;
2322                 goto out;
2323         }
2324
2325         /* wait */
2326         mutex_unlock(&mdsc->mutex);
2327         dout("do_request waiting\n");
2328         if (!req->r_timeout && req->r_wait_for_completion) {
2329                 err = req->r_wait_for_completion(mdsc, req);
2330         } else {
2331                 long timeleft = wait_for_completion_killable_timeout(
2332                                         &req->r_completion,
2333                                         ceph_timeout_jiffies(req->r_timeout));
2334                 if (timeleft > 0)
2335                         err = 0;
2336                 else if (!timeleft)
2337                         err = -EIO;  /* timed out */
2338                 else
2339                         err = timeleft;  /* killed */
2340         }
2341         dout("do_request waited, got %d\n", err);
2342         mutex_lock(&mdsc->mutex);
2343
2344         /* only abort if we didn't race with a real reply */
2345         if (req->r_got_result) {
2346                 err = le32_to_cpu(req->r_reply_info.head->result);
2347         } else if (err < 0) {
2348                 dout("aborted request %lld with %d\n", req->r_tid, err);
2349
2350                 /*
2351                  * ensure we aren't running concurrently with
2352                  * ceph_fill_trace or ceph_readdir_prepopulate, which
2353                  * rely on locks (dir mutex) held by our caller.
2354                  */
2355                 mutex_lock(&req->r_fill_mutex);
2356                 req->r_err = err;
2357                 req->r_aborted = true;
2358                 mutex_unlock(&req->r_fill_mutex);
2359
2360                 if (req->r_locked_dir &&
2361                     (req->r_op & CEPH_MDS_OP_WRITE))
2362                         ceph_invalidate_dir_request(req);
2363         } else {
2364                 err = req->r_err;
2365         }
2366
2367 out:
2368         mutex_unlock(&mdsc->mutex);
2369         dout("do_request %p done, result %d\n", req, err);
2370         return err;
2371 }
2372
2373 /*
2374  * Invalidate dir's completeness, dentry lease state on an aborted MDS
2375  * namespace request.
2376  */
2377 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2378 {
2379         struct inode *inode = req->r_locked_dir;
2380
2381         dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2382
2383         ceph_dir_clear_complete(inode);
2384         if (req->r_dentry)
2385                 ceph_invalidate_dentry_lease(req->r_dentry);
2386         if (req->r_old_dentry)
2387                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2388 }
2389
2390 /*
2391  * Handle mds reply.
2392  *
2393  * We take the session mutex and parse and process the reply immediately.
2394  * This preserves the logical ordering of replies, capabilities, etc., sent
2395  * by the MDS as they are applied to our local cache.
2396  */
2397 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2398 {
2399         struct ceph_mds_client *mdsc = session->s_mdsc;
2400         struct ceph_mds_request *req;
2401         struct ceph_mds_reply_head *head = msg->front.iov_base;
2402         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2403         struct ceph_snap_realm *realm;
2404         u64 tid;
2405         int err, result;
2406         int mds = session->s_mds;
2407
2408         if (msg->front.iov_len < sizeof(*head)) {
2409                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2410                 ceph_msg_dump(msg);
2411                 return;
2412         }
2413
2414         /* get request, session */
2415         tid = le64_to_cpu(msg->hdr.tid);
2416         mutex_lock(&mdsc->mutex);
2417         req = __lookup_request(mdsc, tid);
2418         if (!req) {
2419                 dout("handle_reply on unknown tid %llu\n", tid);
2420                 mutex_unlock(&mdsc->mutex);
2421                 return;
2422         }
2423         dout("handle_reply %p\n", req);
2424
2425         /* correct session? */
2426         if (req->r_session != session) {
2427                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2428                        " not mds%d\n", tid, session->s_mds,
2429                        req->r_session ? req->r_session->s_mds : -1);
2430                 mutex_unlock(&mdsc->mutex);
2431                 goto out;
2432         }
2433
2434         /* dup? */
2435         if ((req->r_got_unsafe && !head->safe) ||
2436             (req->r_got_safe && head->safe)) {
2437                 pr_warn("got a dup %s reply on %llu from mds%d\n",
2438                            head->safe ? "safe" : "unsafe", tid, mds);
2439                 mutex_unlock(&mdsc->mutex);
2440                 goto out;
2441         }
2442         if (req->r_got_safe) {
2443                 pr_warn("got unsafe after safe on %llu from mds%d\n",
2444                            tid, mds);
2445                 mutex_unlock(&mdsc->mutex);
2446                 goto out;
2447         }
2448
2449         result = le32_to_cpu(head->result);
2450
2451         /*
2452          * Handle an ESTALE
2453          * if we're not talking to the authority, send to them
2454          * if the authority has changed while we weren't looking,
2455          * send to new authority
2456          * Otherwise we just have to return an ESTALE
2457          */
2458         if (result == -ESTALE) {
2459                 dout("got ESTALE on request %llu", req->r_tid);
2460                 req->r_resend_mds = -1;
2461                 if (req->r_direct_mode != USE_AUTH_MDS) {
2462                         dout("not using auth, setting for that now");
2463                         req->r_direct_mode = USE_AUTH_MDS;
2464                         __do_request(mdsc, req);
2465                         mutex_unlock(&mdsc->mutex);
2466                         goto out;
2467                 } else  {
2468                         int mds = __choose_mds(mdsc, req);
2469                         if (mds >= 0 && mds != req->r_session->s_mds) {
2470                                 dout("but auth changed, so resending");
2471                                 __do_request(mdsc, req);
2472                                 mutex_unlock(&mdsc->mutex);
2473                                 goto out;
2474                         }
2475                 }
2476                 dout("have to return ESTALE on request %llu", req->r_tid);
2477         }
2478
2479
2480         if (head->safe) {
2481                 req->r_got_safe = true;
2482                 __unregister_request(mdsc, req);
2483
2484                 if (req->r_got_unsafe) {
2485                         /*
2486                          * We already handled the unsafe response, now do the
2487                          * cleanup.  No need to examine the response; the MDS
2488                          * doesn't include any result info in the safe
2489                          * response.  And even if it did, there is nothing
2490                          * useful we could do with a revised return value.
2491                          */
2492                         dout("got safe reply %llu, mds%d\n", tid, mds);
2493                         list_del_init(&req->r_unsafe_item);
2494
2495                         /* last unsafe request during umount? */
2496                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2497                                 complete_all(&mdsc->safe_umount_waiters);
2498                         mutex_unlock(&mdsc->mutex);
2499                         goto out;
2500                 }
2501         } else {
2502                 req->r_got_unsafe = true;
2503                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2504                 if (req->r_unsafe_dir) {
2505                         struct ceph_inode_info *ci =
2506                                         ceph_inode(req->r_unsafe_dir);
2507                         spin_lock(&ci->i_unsafe_lock);
2508                         list_add_tail(&req->r_unsafe_dir_item,
2509                                       &ci->i_unsafe_dirops);
2510                         spin_unlock(&ci->i_unsafe_lock);
2511                 }
2512         }
2513
2514         dout("handle_reply tid %lld result %d\n", tid, result);
2515         rinfo = &req->r_reply_info;
2516         err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2517         mutex_unlock(&mdsc->mutex);
2518
2519         mutex_lock(&session->s_mutex);
2520         if (err < 0) {
2521                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2522                 ceph_msg_dump(msg);
2523                 goto out_err;
2524         }
2525
2526         /* snap trace */
2527         realm = NULL;
2528         if (rinfo->snapblob_len) {
2529                 down_write(&mdsc->snap_rwsem);
2530                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2531                                 rinfo->snapblob + rinfo->snapblob_len,
2532                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2533                                 &realm);
2534                 downgrade_write(&mdsc->snap_rwsem);
2535         } else {
2536                 down_read(&mdsc->snap_rwsem);
2537         }
2538
2539         /* insert trace into our cache */
2540         mutex_lock(&req->r_fill_mutex);
2541         current->journal_info = req;
2542         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2543         if (err == 0) {
2544                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2545                                     req->r_op == CEPH_MDS_OP_LSSNAP))
2546                         ceph_readdir_prepopulate(req, req->r_session);
2547                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2548         }
2549         current->journal_info = NULL;
2550         mutex_unlock(&req->r_fill_mutex);
2551
2552         up_read(&mdsc->snap_rwsem);
2553         if (realm)
2554                 ceph_put_snap_realm(mdsc, realm);
2555
2556         if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
2557                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2558                 spin_lock(&ci->i_unsafe_lock);
2559                 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2560                 spin_unlock(&ci->i_unsafe_lock);
2561         }
2562 out_err:
2563         mutex_lock(&mdsc->mutex);
2564         if (!req->r_aborted) {
2565                 if (err) {
2566                         req->r_err = err;
2567                 } else {
2568                         req->r_reply =  ceph_msg_get(msg);
2569                         req->r_got_result = true;
2570                 }
2571         } else {
2572                 dout("reply arrived after request %lld was aborted\n", tid);
2573         }
2574         mutex_unlock(&mdsc->mutex);
2575
2576         mutex_unlock(&session->s_mutex);
2577
2578         /* kick calling process */
2579         complete_request(mdsc, req);
2580 out:
2581         ceph_mdsc_put_request(req);
2582         return;
2583 }
2584
2585
2586
2587 /*
2588  * handle mds notification that our request has been forwarded.
2589  */
2590 static void handle_forward(struct ceph_mds_client *mdsc,
2591                            struct ceph_mds_session *session,
2592                            struct ceph_msg *msg)
2593 {
2594         struct ceph_mds_request *req;
2595         u64 tid = le64_to_cpu(msg->hdr.tid);
2596         u32 next_mds;
2597         u32 fwd_seq;
2598         int err = -EINVAL;
2599         void *p = msg->front.iov_base;
2600         void *end = p + msg->front.iov_len;
2601
2602         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2603         next_mds = ceph_decode_32(&p);
2604         fwd_seq = ceph_decode_32(&p);
2605
2606         mutex_lock(&mdsc->mutex);
2607         req = __lookup_request(mdsc, tid);
2608         if (!req) {
2609                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2610                 goto out;  /* dup reply? */
2611         }
2612
2613         if (req->r_aborted) {
2614                 dout("forward tid %llu aborted, unregistering\n", tid);
2615                 __unregister_request(mdsc, req);
2616         } else if (fwd_seq <= req->r_num_fwd) {
2617                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2618                      tid, next_mds, req->r_num_fwd, fwd_seq);
2619         } else {
2620                 /* resend. forward race not possible; mds would drop */
2621                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2622                 BUG_ON(req->r_err);
2623                 BUG_ON(req->r_got_result);
2624                 req->r_attempts = 0;
2625                 req->r_num_fwd = fwd_seq;
2626                 req->r_resend_mds = next_mds;
2627                 put_request_session(req);
2628                 __do_request(mdsc, req);
2629         }
2630         ceph_mdsc_put_request(req);
2631 out:
2632         mutex_unlock(&mdsc->mutex);
2633         return;
2634
2635 bad:
2636         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2637 }
2638
2639 /*
2640  * handle a mds session control message
2641  */
2642 static void handle_session(struct ceph_mds_session *session,
2643                            struct ceph_msg *msg)
2644 {
2645         struct ceph_mds_client *mdsc = session->s_mdsc;
2646         u32 op;
2647         u64 seq;
2648         int mds = session->s_mds;
2649         struct ceph_mds_session_head *h = msg->front.iov_base;
2650         int wake = 0;
2651
2652         /* decode */
2653         if (msg->front.iov_len != sizeof(*h))
2654                 goto bad;
2655         op = le32_to_cpu(h->op);
2656         seq = le64_to_cpu(h->seq);
2657
2658         mutex_lock(&mdsc->mutex);
2659         if (op == CEPH_SESSION_CLOSE)
2660                 __unregister_session(mdsc, session);
2661         /* FIXME: this ttl calculation is generous */
2662         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2663         mutex_unlock(&mdsc->mutex);
2664
2665         mutex_lock(&session->s_mutex);
2666
2667         dout("handle_session mds%d %s %p state %s seq %llu\n",
2668              mds, ceph_session_op_name(op), session,
2669              ceph_session_state_name(session->s_state), seq);
2670
2671         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2672                 session->s_state = CEPH_MDS_SESSION_OPEN;
2673                 pr_info("mds%d came back\n", session->s_mds);
2674         }
2675
2676         switch (op) {
2677         case CEPH_SESSION_OPEN:
2678                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2679                         pr_info("mds%d reconnect success\n", session->s_mds);
2680                 session->s_state = CEPH_MDS_SESSION_OPEN;
2681                 renewed_caps(mdsc, session, 0);
2682                 wake = 1;
2683                 if (mdsc->stopping)
2684                         __close_session(mdsc, session);
2685                 break;
2686
2687         case CEPH_SESSION_RENEWCAPS:
2688                 if (session->s_renew_seq == seq)
2689                         renewed_caps(mdsc, session, 1);
2690                 break;
2691
2692         case CEPH_SESSION_CLOSE:
2693                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2694                         pr_info("mds%d reconnect denied\n", session->s_mds);
2695                 cleanup_session_requests(mdsc, session);
2696                 remove_session_caps(session);
2697                 wake = 2; /* for good measure */
2698                 wake_up_all(&mdsc->session_close_wq);
2699                 break;
2700
2701         case CEPH_SESSION_STALE:
2702                 pr_info("mds%d caps went stale, renewing\n",
2703                         session->s_mds);
2704                 spin_lock(&session->s_gen_ttl_lock);
2705                 session->s_cap_gen++;
2706                 session->s_cap_ttl = jiffies - 1;
2707                 spin_unlock(&session->s_gen_ttl_lock);
2708                 send_renew_caps(mdsc, session);
2709                 break;
2710
2711         case CEPH_SESSION_RECALL_STATE:
2712                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2713                 break;
2714
2715         case CEPH_SESSION_FLUSHMSG:
2716                 send_flushmsg_ack(mdsc, session, seq);
2717                 break;
2718
2719         case CEPH_SESSION_FORCE_RO:
2720                 dout("force_session_readonly %p\n", session);
2721                 spin_lock(&session->s_cap_lock);
2722                 session->s_readonly = true;
2723                 spin_unlock(&session->s_cap_lock);
2724                 wake_up_session_caps(session, 0);
2725                 break;
2726
2727         default:
2728                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2729                 WARN_ON(1);
2730         }
2731
2732         mutex_unlock(&session->s_mutex);
2733         if (wake) {
2734                 mutex_lock(&mdsc->mutex);
2735                 __wake_requests(mdsc, &session->s_waiting);
2736                 if (wake == 2)
2737                         kick_requests(mdsc, mds);
2738                 mutex_unlock(&mdsc->mutex);
2739         }
2740         return;
2741
2742 bad:
2743         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2744                (int)msg->front.iov_len);
2745         ceph_msg_dump(msg);
2746         return;
2747 }
2748
2749
2750 /*
2751  * called under session->mutex.
2752  */
2753 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2754                                    struct ceph_mds_session *session)
2755 {
2756         struct ceph_mds_request *req, *nreq;
2757         struct rb_node *p;
2758         int err;
2759
2760         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2761
2762         mutex_lock(&mdsc->mutex);
2763         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2764                 err = __prepare_send_request(mdsc, req, session->s_mds, true);
2765                 if (!err) {
2766                         ceph_msg_get(req->r_request);
2767                         ceph_con_send(&session->s_con, req->r_request);
2768                 }
2769         }
2770
2771         /*
2772          * also re-send old requests when MDS enters reconnect stage. So that MDS
2773          * can process completed request in clientreplay stage.
2774          */
2775         p = rb_first(&mdsc->request_tree);
2776         while (p) {
2777                 req = rb_entry(p, struct ceph_mds_request, r_node);
2778                 p = rb_next(p);
2779                 if (req->r_got_unsafe)
2780                         continue;
2781                 if (req->r_attempts == 0)
2782                         continue; /* only old requests */
2783                 if (req->r_session &&
2784                     req->r_session->s_mds == session->s_mds) {
2785                         err = __prepare_send_request(mdsc, req,
2786                                                      session->s_mds, true);
2787                         if (!err) {
2788                                 ceph_msg_get(req->r_request);
2789                                 ceph_con_send(&session->s_con, req->r_request);
2790                         }
2791                 }
2792         }
2793         mutex_unlock(&mdsc->mutex);
2794 }
2795
2796 /*
2797  * Encode information about a cap for a reconnect with the MDS.
2798  */
2799 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2800                           void *arg)
2801 {
2802         union {
2803                 struct ceph_mds_cap_reconnect v2;
2804                 struct ceph_mds_cap_reconnect_v1 v1;
2805         } rec;
2806         size_t reclen;
2807         struct ceph_inode_info *ci;
2808         struct ceph_reconnect_state *recon_state = arg;
2809         struct ceph_pagelist *pagelist = recon_state->pagelist;
2810         char *path;
2811         int pathlen, err;
2812         u64 pathbase;
2813         struct dentry *dentry;
2814
2815         ci = cap->ci;
2816
2817         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2818              inode, ceph_vinop(inode), cap, cap->cap_id,
2819              ceph_cap_string(cap->issued));
2820         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2821         if (err)
2822                 return err;
2823
2824         dentry = d_find_alias(inode);
2825         if (dentry) {
2826                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2827                 if (IS_ERR(path)) {
2828                         err = PTR_ERR(path);
2829                         goto out_dput;
2830                 }
2831         } else {
2832                 path = NULL;
2833                 pathlen = 0;
2834         }
2835         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2836         if (err)
2837                 goto out_free;
2838
2839         spin_lock(&ci->i_ceph_lock);
2840         cap->seq = 0;        /* reset cap seq */
2841         cap->issue_seq = 0;  /* and issue_seq */
2842         cap->mseq = 0;       /* and migrate_seq */
2843         cap->cap_gen = cap->session->s_cap_gen;
2844
2845         if (recon_state->flock) {
2846                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2847                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2848                 rec.v2.issued = cpu_to_le32(cap->issued);
2849                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2850                 rec.v2.pathbase = cpu_to_le64(pathbase);
2851                 rec.v2.flock_len = 0;
2852                 reclen = sizeof(rec.v2);
2853         } else {
2854                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2855                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2856                 rec.v1.issued = cpu_to_le32(cap->issued);
2857                 rec.v1.size = cpu_to_le64(inode->i_size);
2858                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2859                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2860                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2861                 rec.v1.pathbase = cpu_to_le64(pathbase);
2862                 reclen = sizeof(rec.v1);
2863         }
2864         spin_unlock(&ci->i_ceph_lock);
2865
2866         if (recon_state->flock) {
2867                 int num_fcntl_locks, num_flock_locks;
2868                 struct ceph_filelock *flocks;
2869
2870 encode_again:
2871                 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2872                 flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2873                                  sizeof(struct ceph_filelock), GFP_NOFS);
2874                 if (!flocks) {
2875                         err = -ENOMEM;
2876                         goto out_free;
2877                 }
2878                 err = ceph_encode_locks_to_buffer(inode, flocks,
2879                                                   num_fcntl_locks,
2880                                                   num_flock_locks);
2881                 if (err) {
2882                         kfree(flocks);
2883                         if (err == -ENOSPC)
2884                                 goto encode_again;
2885                         goto out_free;
2886                 }
2887                 /*
2888                  * number of encoded locks is stable, so copy to pagelist
2889                  */
2890                 rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
2891                                     (num_fcntl_locks+num_flock_locks) *
2892                                     sizeof(struct ceph_filelock));
2893                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2894                 if (!err)
2895                         err = ceph_locks_to_pagelist(flocks, pagelist,
2896                                                      num_fcntl_locks,
2897                                                      num_flock_locks);
2898                 kfree(flocks);
2899         } else {
2900                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2901         }
2902
2903         recon_state->nr_caps++;
2904 out_free:
2905         kfree(path);
2906 out_dput:
2907         dput(dentry);
2908         return err;
2909 }
2910
2911
2912 /*
2913  * If an MDS fails and recovers, clients need to reconnect in order to
2914  * reestablish shared state.  This includes all caps issued through
2915  * this session _and_ the snap_realm hierarchy.  Because it's not
2916  * clear which snap realms the mds cares about, we send everything we
2917  * know about.. that ensures we'll then get any new info the
2918  * recovering MDS might have.
2919  *
2920  * This is a relatively heavyweight operation, but it's rare.
2921  *
2922  * called with mdsc->mutex held.
2923  */
2924 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2925                                struct ceph_mds_session *session)
2926 {
2927         struct ceph_msg *reply;
2928         struct rb_node *p;
2929         int mds = session->s_mds;
2930         int err = -ENOMEM;
2931         int s_nr_caps;
2932         struct ceph_pagelist *pagelist;
2933         struct ceph_reconnect_state recon_state;
2934
2935         pr_info("mds%d reconnect start\n", mds);
2936
2937         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2938         if (!pagelist)
2939                 goto fail_nopagelist;
2940         ceph_pagelist_init(pagelist);
2941
2942         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2943         if (!reply)
2944                 goto fail_nomsg;
2945
2946         mutex_lock(&session->s_mutex);
2947         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2948         session->s_seq = 0;
2949
2950         dout("session %p state %s\n", session,
2951              ceph_session_state_name(session->s_state));
2952
2953         spin_lock(&session->s_gen_ttl_lock);
2954         session->s_cap_gen++;
2955         spin_unlock(&session->s_gen_ttl_lock);
2956
2957         spin_lock(&session->s_cap_lock);
2958         /* don't know if session is readonly */
2959         session->s_readonly = 0;
2960         /*
2961          * notify __ceph_remove_cap() that we are composing cap reconnect.
2962          * If a cap get released before being added to the cap reconnect,
2963          * __ceph_remove_cap() should skip queuing cap release.
2964          */
2965         session->s_cap_reconnect = 1;
2966         /* drop old cap expires; we're about to reestablish that state */
2967         cleanup_cap_releases(mdsc, session);
2968
2969         /* trim unused caps to reduce MDS's cache rejoin time */
2970         if (mdsc->fsc->sb->s_root)
2971                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
2972
2973         ceph_con_close(&session->s_con);
2974         ceph_con_open(&session->s_con,
2975                       CEPH_ENTITY_TYPE_MDS, mds,
2976                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2977
2978         /* replay unsafe requests */
2979         replay_unsafe_requests(mdsc, session);
2980
2981         down_read(&mdsc->snap_rwsem);
2982
2983         /* traverse this session's caps */
2984         s_nr_caps = session->s_nr_caps;
2985         err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
2986         if (err)
2987                 goto fail;
2988
2989         recon_state.nr_caps = 0;
2990         recon_state.pagelist = pagelist;
2991         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2992         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2993         if (err < 0)
2994                 goto fail;
2995
2996         spin_lock(&session->s_cap_lock);
2997         session->s_cap_reconnect = 0;
2998         spin_unlock(&session->s_cap_lock);
2999
3000         /*
3001          * snaprealms.  we provide mds with the ino, seq (version), and
3002          * parent for all of our realms.  If the mds has any newer info,
3003          * it will tell us.
3004          */
3005         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3006                 struct ceph_snap_realm *realm =
3007                         rb_entry(p, struct ceph_snap_realm, node);
3008                 struct ceph_mds_snaprealm_reconnect sr_rec;
3009
3010                 dout(" adding snap realm %llx seq %lld parent %llx\n",
3011                      realm->ino, realm->seq, realm->parent_ino);
3012                 sr_rec.ino = cpu_to_le64(realm->ino);
3013                 sr_rec.seq = cpu_to_le64(realm->seq);
3014                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3015                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3016                 if (err)
3017                         goto fail;
3018         }
3019
3020         if (recon_state.flock)
3021                 reply->hdr.version = cpu_to_le16(2);
3022
3023         /* raced with cap release? */
3024         if (s_nr_caps != recon_state.nr_caps) {
3025                 struct page *page = list_first_entry(&pagelist->head,
3026                                                      struct page, lru);
3027                 __le32 *addr = kmap_atomic(page);
3028                 *addr = cpu_to_le32(recon_state.nr_caps);
3029                 kunmap_atomic(addr);
3030         }
3031
3032         reply->hdr.data_len = cpu_to_le32(pagelist->length);
3033         ceph_msg_data_add_pagelist(reply, pagelist);
3034
3035         ceph_early_kick_flushing_caps(mdsc, session);
3036
3037         ceph_con_send(&session->s_con, reply);
3038
3039         mutex_unlock(&session->s_mutex);
3040
3041         mutex_lock(&mdsc->mutex);
3042         __wake_requests(mdsc, &session->s_waiting);
3043         mutex_unlock(&mdsc->mutex);
3044
3045         up_read(&mdsc->snap_rwsem);
3046         return;
3047
3048 fail:
3049         ceph_msg_put(reply);
3050         up_read(&mdsc->snap_rwsem);
3051         mutex_unlock(&session->s_mutex);
3052 fail_nomsg:
3053         ceph_pagelist_release(pagelist);
3054 fail_nopagelist:
3055         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3056         return;
3057 }
3058
3059
3060 /*
3061  * compare old and new mdsmaps, kicking requests
3062  * and closing out old connections as necessary
3063  *
3064  * called under mdsc->mutex.
3065  */
3066 static void check_new_map(struct ceph_mds_client *mdsc,
3067                           struct ceph_mdsmap *newmap,
3068                           struct ceph_mdsmap *oldmap)
3069 {
3070         int i;
3071         int oldstate, newstate;
3072         struct ceph_mds_session *s;
3073
3074         dout("check_new_map new %u old %u\n",
3075              newmap->m_epoch, oldmap->m_epoch);
3076
3077         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
3078                 if (mdsc->sessions[i] == NULL)
3079                         continue;
3080                 s = mdsc->sessions[i];
3081                 oldstate = ceph_mdsmap_get_state(oldmap, i);
3082                 newstate = ceph_mdsmap_get_state(newmap, i);
3083
3084                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3085                      i, ceph_mds_state_name(oldstate),
3086                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3087                      ceph_mds_state_name(newstate),
3088                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3089                      ceph_session_state_name(s->s_state));
3090
3091                 if (i >= newmap->m_max_mds ||
3092                     memcmp(ceph_mdsmap_get_addr(oldmap, i),
3093                            ceph_mdsmap_get_addr(newmap, i),
3094                            sizeof(struct ceph_entity_addr))) {
3095                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3096                                 /* the session never opened, just close it
3097                                  * out now */
3098                                 __wake_requests(mdsc, &s->s_waiting);
3099                                 __unregister_session(mdsc, s);
3100                         } else {
3101                                 /* just close it */
3102                                 mutex_unlock(&mdsc->mutex);
3103                                 mutex_lock(&s->s_mutex);
3104                                 mutex_lock(&mdsc->mutex);
3105                                 ceph_con_close(&s->s_con);
3106                                 mutex_unlock(&s->s_mutex);
3107                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
3108                         }
3109                 } else if (oldstate == newstate) {
3110                         continue;  /* nothing new with this mds */
3111                 }
3112
3113                 /*
3114                  * send reconnect?
3115                  */
3116                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3117                     newstate >= CEPH_MDS_STATE_RECONNECT) {
3118                         mutex_unlock(&mdsc->mutex);
3119                         send_mds_reconnect(mdsc, s);
3120                         mutex_lock(&mdsc->mutex);
3121                 }
3122
3123                 /*
3124                  * kick request on any mds that has gone active.
3125                  */
3126                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3127                     newstate >= CEPH_MDS_STATE_ACTIVE) {
3128                         if (oldstate != CEPH_MDS_STATE_CREATING &&
3129                             oldstate != CEPH_MDS_STATE_STARTING)
3130                                 pr_info("mds%d recovery completed\n", s->s_mds);
3131                         kick_requests(mdsc, i);
3132                         ceph_kick_flushing_caps(mdsc, s);
3133                         wake_up_session_caps(s, 1);
3134                 }
3135         }
3136
3137         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
3138                 s = mdsc->sessions[i];
3139                 if (!s)
3140                         continue;
3141                 if (!ceph_mdsmap_is_laggy(newmap, i))
3142                         continue;
3143                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3144                     s->s_state == CEPH_MDS_SESSION_HUNG ||
3145                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
3146                         dout(" connecting to export targets of laggy mds%d\n",
3147                              i);
3148                         __open_export_target_sessions(mdsc, s);
3149                 }
3150         }
3151 }
3152
3153
3154
3155 /*
3156  * leases
3157  */
3158
3159 /*
3160  * caller must hold session s_mutex, dentry->d_lock
3161  */
3162 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3163 {
3164         struct ceph_dentry_info *di = ceph_dentry(dentry);
3165
3166         ceph_put_mds_session(di->lease_session);
3167         di->lease_session = NULL;
3168 }
3169
3170 static void handle_lease(struct ceph_mds_client *mdsc,
3171                          struct ceph_mds_session *session,
3172                          struct ceph_msg *msg)
3173 {
3174         struct super_block *sb = mdsc->fsc->sb;
3175         struct inode *inode;
3176         struct dentry *parent, *dentry;
3177         struct ceph_dentry_info *di;
3178         int mds = session->s_mds;
3179         struct ceph_mds_lease *h = msg->front.iov_base;
3180         u32 seq;
3181         struct ceph_vino vino;
3182         struct qstr dname;
3183         int release = 0;
3184
3185         dout("handle_lease from mds%d\n", mds);
3186
3187         /* decode */
3188         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3189                 goto bad;
3190         vino.ino = le64_to_cpu(h->ino);
3191         vino.snap = CEPH_NOSNAP;
3192         seq = le32_to_cpu(h->seq);
3193         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3194         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3195         if (dname.len != get_unaligned_le32(h+1))
3196                 goto bad;
3197
3198         /* lookup inode */
3199         inode = ceph_find_inode(sb, vino);
3200         dout("handle_lease %s, ino %llx %p %.*s\n",
3201              ceph_lease_op_name(h->action), vino.ino, inode,
3202              dname.len, dname.name);
3203
3204         mutex_lock(&session->s_mutex);
3205         session->s_seq++;
3206
3207         if (inode == NULL) {
3208                 dout("handle_lease no inode %llx\n", vino.ino);
3209                 goto release;
3210         }
3211
3212         /* dentry */
3213         parent = d_find_alias(inode);
3214         if (!parent) {
3215                 dout("no parent dentry on inode %p\n", inode);
3216                 WARN_ON(1);
3217                 goto release;  /* hrm... */
3218         }
3219         dname.hash = full_name_hash(dname.name, dname.len);
3220         dentry = d_lookup(parent, &dname);
3221         dput(parent);
3222         if (!dentry)
3223                 goto release;
3224
3225         spin_lock(&dentry->d_lock);
3226         di = ceph_dentry(dentry);
3227         switch (h->action) {
3228         case CEPH_MDS_LEASE_REVOKE:
3229                 if (di->lease_session == session) {
3230                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3231                                 h->seq = cpu_to_le32(di->lease_seq);
3232                         __ceph_mdsc_drop_dentry_lease(dentry);
3233                 }
3234                 release = 1;
3235                 break;
3236
3237         case CEPH_MDS_LEASE_RENEW:
3238                 if (di->lease_session == session &&
3239                     di->lease_gen == session->s_cap_gen &&
3240                     di->lease_renew_from &&
3241                     di->lease_renew_after == 0) {
3242                         unsigned long duration =
3243                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3244
3245                         di->lease_seq = seq;
3246                         dentry->d_time = di->lease_renew_from + duration;
3247                         di->lease_renew_after = di->lease_renew_from +
3248                                 (duration >> 1);
3249                         di->lease_renew_from = 0;
3250                 }
3251                 break;
3252         }
3253         spin_unlock(&dentry->d_lock);
3254         dput(dentry);
3255
3256         if (!release)
3257                 goto out;
3258
3259 release:
3260         /* let's just reuse the same message */
3261         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3262         ceph_msg_get(msg);
3263         ceph_con_send(&session->s_con, msg);
3264
3265 out:
3266         iput(inode);
3267         mutex_unlock(&session->s_mutex);
3268         return;
3269
3270 bad:
3271         pr_err("corrupt lease message\n");
3272         ceph_msg_dump(msg);
3273 }
3274
3275 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3276                               struct inode *inode,
3277                               struct dentry *dentry, char action,
3278                               u32 seq)
3279 {
3280         struct ceph_msg *msg;
3281         struct ceph_mds_lease *lease;
3282         int len = sizeof(*lease) + sizeof(u32);
3283         int dnamelen = 0;
3284
3285         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3286              inode, dentry, ceph_lease_op_name(action), session->s_mds);
3287         dnamelen = dentry->d_name.len;
3288         len += dnamelen;
3289
3290         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3291         if (!msg)
3292                 return;
3293         lease = msg->front.iov_base;
3294         lease->action = action;
3295         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3296         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3297         lease->seq = cpu_to_le32(seq);
3298         put_unaligned_le32(dnamelen, lease + 1);
3299         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3300
3301         /*
3302          * if this is a preemptive lease RELEASE, no need to
3303          * flush request stream, since the actual request will
3304          * soon follow.
3305          */
3306         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3307
3308         ceph_con_send(&session->s_con, msg);
3309 }
3310
3311 /*
3312  * Preemptively release a lease we expect to invalidate anyway.
3313  * Pass @inode always, @dentry is optional.
3314  */
3315 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
3316                              struct dentry *dentry)
3317 {
3318         struct ceph_dentry_info *di;
3319         struct ceph_mds_session *session;
3320         u32 seq;
3321
3322         BUG_ON(inode == NULL);
3323         BUG_ON(dentry == NULL);
3324
3325         /* is dentry lease valid? */
3326         spin_lock(&dentry->d_lock);
3327         di = ceph_dentry(dentry);
3328         if (!di || !di->lease_session ||
3329             di->lease_session->s_mds < 0 ||
3330             di->lease_gen != di->lease_session->s_cap_gen ||
3331             !time_before(jiffies, dentry->d_time)) {
3332                 dout("lease_release inode %p dentry %p -- "
3333                      "no lease\n",
3334                      inode, dentry);
3335                 spin_unlock(&dentry->d_lock);
3336                 return;
3337         }
3338
3339         /* we do have a lease on this dentry; note mds and seq */
3340         session = ceph_get_mds_session(di->lease_session);
3341         seq = di->lease_seq;
3342         __ceph_mdsc_drop_dentry_lease(dentry);
3343         spin_unlock(&dentry->d_lock);
3344
3345         dout("lease_release inode %p dentry %p to mds%d\n",
3346              inode, dentry, session->s_mds);
3347         ceph_mdsc_lease_send_msg(session, inode, dentry,
3348                                  CEPH_MDS_LEASE_RELEASE, seq);
3349         ceph_put_mds_session(session);
3350 }
3351
3352 /*
3353  * drop all leases (and dentry refs) in preparation for umount
3354  */
3355 static void drop_leases(struct ceph_mds_client *mdsc)
3356 {
3357         int i;
3358
3359         dout("drop_leases\n");
3360         mutex_lock(&mdsc->mutex);
3361         for (i = 0; i < mdsc->max_sessions; i++) {
3362                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3363                 if (!s)
3364                         continue;
3365                 mutex_unlock(&mdsc->mutex);
3366                 mutex_lock(&s->s_mutex);
3367                 mutex_unlock(&s->s_mutex);
3368                 ceph_put_mds_session(s);
3369                 mutex_lock(&mdsc->mutex);
3370         }
3371         mutex_unlock(&mdsc->mutex);
3372 }
3373
3374
3375
3376 /*
3377  * delayed work -- periodically trim expired leases, renew caps with mds
3378  */
3379 static void schedule_delayed(struct ceph_mds_client *mdsc)
3380 {
3381         int delay = 5;
3382         unsigned hz = round_jiffies_relative(HZ * delay);
3383         schedule_delayed_work(&mdsc->delayed_work, hz);
3384 }
3385
3386 static void delayed_work(struct work_struct *work)
3387 {
3388         int i;
3389         struct ceph_mds_client *mdsc =
3390                 container_of(work, struct ceph_mds_client, delayed_work.work);
3391         int renew_interval;
3392         int renew_caps;
3393
3394         dout("mdsc delayed_work\n");
3395         ceph_check_delayed_caps(mdsc);
3396
3397         mutex_lock(&mdsc->mutex);
3398         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3399         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3400                                    mdsc->last_renew_caps);
3401         if (renew_caps)
3402                 mdsc->last_renew_caps = jiffies;
3403
3404         for (i = 0; i < mdsc->max_sessions; i++) {
3405                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3406                 if (s == NULL)
3407                         continue;
3408                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3409                         dout("resending session close request for mds%d\n",
3410                              s->s_mds);
3411                         request_close_session(mdsc, s);
3412                         ceph_put_mds_session(s);
3413                         continue;
3414                 }
3415                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3416                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3417                                 s->s_state = CEPH_MDS_SESSION_HUNG;
3418                                 pr_info("mds%d hung\n", s->s_mds);
3419                         }
3420                 }
3421                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3422                         /* this mds is failed or recovering, just wait */
3423                         ceph_put_mds_session(s);
3424                         continue;
3425                 }
3426                 mutex_unlock(&mdsc->mutex);
3427
3428                 mutex_lock(&s->s_mutex);
3429                 if (renew_caps)
3430                         send_renew_caps(mdsc, s);
3431                 else
3432                         ceph_con_keepalive(&s->s_con);
3433                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3434                     s->s_state == CEPH_MDS_SESSION_HUNG)
3435                         ceph_send_cap_releases(mdsc, s);
3436                 mutex_unlock(&s->s_mutex);
3437                 ceph_put_mds_session(s);
3438
3439                 mutex_lock(&mdsc->mutex);
3440         }
3441         mutex_unlock(&mdsc->mutex);
3442
3443         schedule_delayed(mdsc);
3444 }
3445
3446 int ceph_mdsc_init(struct ceph_fs_client *fsc)
3447
3448 {
3449         struct ceph_mds_client *mdsc;
3450
3451         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3452         if (!mdsc)
3453                 return -ENOMEM;
3454         mdsc->fsc = fsc;
3455         fsc->mdsc = mdsc;
3456         mutex_init(&mdsc->mutex);
3457         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3458         if (mdsc->mdsmap == NULL) {
3459                 kfree(mdsc);
3460                 return -ENOMEM;
3461         }
3462
3463         init_completion(&mdsc->safe_umount_waiters);
3464         init_waitqueue_head(&mdsc->session_close_wq);
3465         INIT_LIST_HEAD(&mdsc->waiting_for_map);
3466         mdsc->sessions = NULL;
3467         atomic_set(&mdsc->num_sessions, 0);
3468         mdsc->max_sessions = 0;
3469         mdsc->stopping = 0;
3470         mdsc->last_snap_seq = 0;
3471         init_rwsem(&mdsc->snap_rwsem);
3472         mdsc->snap_realms = RB_ROOT;
3473         INIT_LIST_HEAD(&mdsc->snap_empty);
3474         spin_lock_init(&mdsc->snap_empty_lock);
3475         mdsc->last_tid = 0;
3476         mdsc->oldest_tid = 0;
3477         mdsc->request_tree = RB_ROOT;
3478         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3479         mdsc->last_renew_caps = jiffies;
3480         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3481         spin_lock_init(&mdsc->cap_delay_lock);
3482         INIT_LIST_HEAD(&mdsc->snap_flush_list);
3483         spin_lock_init(&mdsc->snap_flush_lock);
3484         mdsc->last_cap_flush_tid = 1;
3485         mdsc->cap_flush_tree = RB_ROOT;
3486         INIT_LIST_HEAD(&mdsc->cap_dirty);
3487         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3488         mdsc->num_cap_flushing = 0;
3489         spin_lock_init(&mdsc->cap_dirty_lock);
3490         init_waitqueue_head(&mdsc->cap_flushing_wq);
3491         spin_lock_init(&mdsc->dentry_lru_lock);
3492         INIT_LIST_HEAD(&mdsc->dentry_lru);
3493
3494         ceph_caps_init(mdsc);
3495         ceph_adjust_min_caps(mdsc, fsc->min_caps);
3496
3497         init_rwsem(&mdsc->pool_perm_rwsem);
3498         mdsc->pool_perm_tree = RB_ROOT;
3499
3500         return 0;
3501 }
3502
3503 /*
3504  * Wait for safe replies on open mds requests.  If we time out, drop
3505  * all requests from the tree to avoid dangling dentry refs.
3506  */
3507 static void wait_requests(struct ceph_mds_client *mdsc)
3508 {
3509         struct ceph_options *opts = mdsc->fsc->client->options;
3510         struct ceph_mds_request *req;
3511
3512         mutex_lock(&mdsc->mutex);
3513         if (__get_oldest_req(mdsc)) {
3514                 mutex_unlock(&mdsc->mutex);
3515
3516                 dout("wait_requests waiting for requests\n");
3517                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3518                                     ceph_timeout_jiffies(opts->mount_timeout));
3519
3520                 /* tear down remaining requests */
3521                 mutex_lock(&mdsc->mutex);
3522                 while ((req = __get_oldest_req(mdsc))) {
3523                         dout("wait_requests timed out on tid %llu\n",
3524                              req->r_tid);
3525                         __unregister_request(mdsc, req);
3526                 }
3527         }
3528         mutex_unlock(&mdsc->mutex);
3529         dout("wait_requests done\n");
3530 }
3531
3532 /*
3533  * called before mount is ro, and before dentries are torn down.
3534  * (hmm, does this still race with new lookups?)
3535  */
3536 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3537 {
3538         dout("pre_umount\n");
3539         mdsc->stopping = 1;
3540
3541         drop_leases(mdsc);
3542         ceph_flush_dirty_caps(mdsc);
3543         wait_requests(mdsc);
3544
3545         /*
3546          * wait for reply handlers to drop their request refs and
3547          * their inode/dcache refs
3548          */
3549         ceph_msgr_flush();
3550 }
3551
3552 /*
3553  * wait for all write mds requests to flush.
3554  */
3555 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3556 {
3557         struct ceph_mds_request *req = NULL, *nextreq;
3558         struct rb_node *n;
3559
3560         mutex_lock(&mdsc->mutex);
3561         dout("wait_unsafe_requests want %lld\n", want_tid);
3562 restart:
3563         req = __get_oldest_req(mdsc);
3564         while (req && req->r_tid <= want_tid) {
3565                 /* find next request */
3566                 n = rb_next(&req->r_node);
3567                 if (n)
3568                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3569                 else
3570                         nextreq = NULL;
3571                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
3572                     (req->r_op & CEPH_MDS_OP_WRITE)) {
3573                         /* write op */
3574                         ceph_mdsc_get_request(req);
3575                         if (nextreq)
3576                                 ceph_mdsc_get_request(nextreq);
3577                         mutex_unlock(&mdsc->mutex);
3578                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3579                              req->r_tid, want_tid);
3580                         wait_for_completion(&req->r_safe_completion);
3581                         mutex_lock(&mdsc->mutex);
3582                         ceph_mdsc_put_request(req);
3583                         if (!nextreq)
3584                                 break;  /* next dne before, so we're done! */
3585                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3586                                 /* next request was removed from tree */
3587                                 ceph_mdsc_put_request(nextreq);
3588                                 goto restart;
3589                         }
3590                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3591                 }
3592                 req = nextreq;
3593         }
3594         mutex_unlock(&mdsc->mutex);
3595         dout("wait_unsafe_requests done\n");
3596 }
3597
3598 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3599 {
3600         u64 want_tid, want_flush, want_snap;
3601
3602         if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3603                 return;
3604
3605         dout("sync\n");
3606         mutex_lock(&mdsc->mutex);
3607         want_tid = mdsc->last_tid;
3608         mutex_unlock(&mdsc->mutex);
3609
3610         ceph_flush_dirty_caps(mdsc);
3611         spin_lock(&mdsc->cap_dirty_lock);
3612         want_flush = mdsc->last_cap_flush_tid;
3613         spin_unlock(&mdsc->cap_dirty_lock);
3614
3615         down_read(&mdsc->snap_rwsem);
3616         want_snap = mdsc->last_snap_seq;
3617         up_read(&mdsc->snap_rwsem);
3618
3619         dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
3620              want_tid, want_flush, want_snap);
3621
3622         wait_unsafe_requests(mdsc, want_tid);
3623         wait_caps_flush(mdsc, want_flush, want_snap);
3624 }
3625
3626 /*
3627  * true if all sessions are closed, or we force unmount
3628  */
3629 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3630 {
3631         if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
3632                 return true;
3633         return atomic_read(&mdsc->num_sessions) == 0;
3634 }
3635
3636 /*
3637  * called after sb is ro.
3638  */
3639 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3640 {
3641         struct ceph_options *opts = mdsc->fsc->client->options;
3642         struct ceph_mds_session *session;
3643         int i;
3644
3645         dout("close_sessions\n");
3646
3647         /* close sessions */
3648         mutex_lock(&mdsc->mutex);
3649         for (i = 0; i < mdsc->max_sessions; i++) {
3650                 session = __ceph_lookup_mds_session(mdsc, i);
3651                 if (!session)
3652                         continue;
3653                 mutex_unlock(&mdsc->mutex);
3654                 mutex_lock(&session->s_mutex);
3655                 __close_session(mdsc, session);
3656                 mutex_unlock(&session->s_mutex);
3657                 ceph_put_mds_session(session);
3658                 mutex_lock(&mdsc->mutex);
3659         }
3660         mutex_unlock(&mdsc->mutex);
3661
3662         dout("waiting for sessions to close\n");
3663         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3664                            ceph_timeout_jiffies(opts->mount_timeout));
3665
3666         /* tear down remaining sessions */
3667         mutex_lock(&mdsc->mutex);
3668         for (i = 0; i < mdsc->max_sessions; i++) {
3669                 if (mdsc->sessions[i]) {
3670                         session = get_session(mdsc->sessions[i]);
3671                         __unregister_session(mdsc, session);
3672                         mutex_unlock(&mdsc->mutex);
3673                         mutex_lock(&session->s_mutex);
3674                         remove_session_caps(session);
3675                         mutex_unlock(&session->s_mutex);
3676                         ceph_put_mds_session(session);
3677                         mutex_lock(&mdsc->mutex);
3678                 }
3679         }
3680         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3681         mutex_unlock(&mdsc->mutex);
3682
3683         ceph_cleanup_empty_realms(mdsc);
3684
3685         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3686
3687         dout("stopped\n");
3688 }
3689
3690 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
3691 {
3692         struct ceph_mds_session *session;
3693         int mds;
3694
3695         dout("force umount\n");
3696
3697         mutex_lock(&mdsc->mutex);
3698         for (mds = 0; mds < mdsc->max_sessions; mds++) {
3699                 session = __ceph_lookup_mds_session(mdsc, mds);
3700                 if (!session)
3701                         continue;
3702                 mutex_unlock(&mdsc->mutex);
3703                 mutex_lock(&session->s_mutex);
3704                 __close_session(mdsc, session);
3705                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
3706                         cleanup_session_requests(mdsc, session);
3707                         remove_session_caps(session);
3708                 }
3709                 mutex_unlock(&session->s_mutex);
3710                 ceph_put_mds_session(session);
3711                 mutex_lock(&mdsc->mutex);
3712                 kick_requests(mdsc, mds);
3713         }
3714         __wake_requests(mdsc, &mdsc->waiting_for_map);
3715         mutex_unlock(&mdsc->mutex);
3716 }
3717
3718 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3719 {
3720         dout("stop\n");
3721         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3722         if (mdsc->mdsmap)
3723                 ceph_mdsmap_destroy(mdsc->mdsmap);
3724         kfree(mdsc->sessions);
3725         ceph_caps_finalize(mdsc);
3726         ceph_pool_perm_destroy(mdsc);
3727 }
3728
3729 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3730 {
3731         struct ceph_mds_client *mdsc = fsc->mdsc;
3732
3733         dout("mdsc_destroy %p\n", mdsc);
3734         ceph_mdsc_stop(mdsc);
3735
3736         /* flush out any connection work with references to us */
3737         ceph_msgr_flush();
3738
3739         fsc->mdsc = NULL;
3740         kfree(mdsc);
3741         dout("mdsc_destroy %p done\n", mdsc);
3742 }
3743
3744
3745 /*
3746  * handle mds map update.
3747  */
3748 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3749 {
3750         u32 epoch;
3751         u32 maplen;
3752         void *p = msg->front.iov_base;
3753         void *end = p + msg->front.iov_len;
3754         struct ceph_mdsmap *newmap, *oldmap;
3755         struct ceph_fsid fsid;
3756         int err = -EINVAL;
3757
3758         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3759         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3760         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3761                 return;
3762         epoch = ceph_decode_32(&p);
3763         maplen = ceph_decode_32(&p);
3764         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3765
3766         /* do we need it? */
3767         mutex_lock(&mdsc->mutex);
3768         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3769                 dout("handle_map epoch %u <= our %u\n",
3770                      epoch, mdsc->mdsmap->m_epoch);
3771                 mutex_unlock(&mdsc->mutex);
3772                 return;
3773         }
3774
3775         newmap = ceph_mdsmap_decode(&p, end);
3776         if (IS_ERR(newmap)) {
3777                 err = PTR_ERR(newmap);
3778                 goto bad_unlock;
3779         }
3780
3781         /* swap into place */
3782         if (mdsc->mdsmap) {
3783                 oldmap = mdsc->mdsmap;
3784                 mdsc->mdsmap = newmap;
3785                 check_new_map(mdsc, newmap, oldmap);
3786                 ceph_mdsmap_destroy(oldmap);
3787         } else {
3788                 mdsc->mdsmap = newmap;  /* first mds map */
3789         }
3790         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3791
3792         __wake_requests(mdsc, &mdsc->waiting_for_map);
3793         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
3794                           mdsc->mdsmap->m_epoch);
3795
3796         mutex_unlock(&mdsc->mutex);
3797         schedule_delayed(mdsc);
3798         return;
3799
3800 bad_unlock:
3801         mutex_unlock(&mdsc->mutex);
3802 bad:
3803         pr_err("error decoding mdsmap %d\n", err);
3804         return;
3805 }
3806
3807 static struct ceph_connection *con_get(struct ceph_connection *con)
3808 {
3809         struct ceph_mds_session *s = con->private;
3810
3811         if (get_session(s)) {
3812                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3813                 return con;
3814         }
3815         dout("mdsc con_get %p FAIL\n", s);
3816         return NULL;
3817 }
3818
3819 static void con_put(struct ceph_connection *con)
3820 {
3821         struct ceph_mds_session *s = con->private;
3822
3823         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3824         ceph_put_mds_session(s);
3825 }
3826
3827 /*
3828  * if the client is unresponsive for long enough, the mds will kill
3829  * the session entirely.
3830  */
3831 static void peer_reset(struct ceph_connection *con)
3832 {
3833         struct ceph_mds_session *s = con->private;
3834         struct ceph_mds_client *mdsc = s->s_mdsc;
3835
3836         pr_warn("mds%d closed our session\n", s->s_mds);
3837         send_mds_reconnect(mdsc, s);
3838 }
3839
3840 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3841 {
3842         struct ceph_mds_session *s = con->private;
3843         struct ceph_mds_client *mdsc = s->s_mdsc;
3844         int type = le16_to_cpu(msg->hdr.type);
3845
3846         mutex_lock(&mdsc->mutex);
3847         if (__verify_registered_session(mdsc, s) < 0) {
3848                 mutex_unlock(&mdsc->mutex);
3849                 goto out;
3850         }
3851         mutex_unlock(&mdsc->mutex);
3852
3853         switch (type) {
3854         case CEPH_MSG_MDS_MAP:
3855                 ceph_mdsc_handle_map(mdsc, msg);
3856                 break;
3857         case CEPH_MSG_CLIENT_SESSION:
3858                 handle_session(s, msg);
3859                 break;
3860         case CEPH_MSG_CLIENT_REPLY:
3861                 handle_reply(s, msg);
3862                 break;
3863         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3864                 handle_forward(mdsc, s, msg);
3865                 break;
3866         case CEPH_MSG_CLIENT_CAPS:
3867                 ceph_handle_caps(s, msg);
3868                 break;
3869         case CEPH_MSG_CLIENT_SNAP:
3870                 ceph_handle_snap(mdsc, s, msg);
3871                 break;
3872         case CEPH_MSG_CLIENT_LEASE:
3873                 handle_lease(mdsc, s, msg);
3874                 break;
3875
3876         default:
3877                 pr_err("received unknown message type %d %s\n", type,
3878                        ceph_msg_type_name(type));
3879         }
3880 out:
3881         ceph_msg_put(msg);
3882 }
3883
3884 /*
3885  * authentication
3886  */
3887
3888 /*
3889  * Note: returned pointer is the address of a structure that's
3890  * managed separately.  Caller must *not* attempt to free it.
3891  */
3892 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3893                                         int *proto, int force_new)
3894 {
3895         struct ceph_mds_session *s = con->private;
3896         struct ceph_mds_client *mdsc = s->s_mdsc;
3897         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3898         struct ceph_auth_handshake *auth = &s->s_auth;
3899
3900         if (force_new && auth->authorizer) {
3901                 ceph_auth_destroy_authorizer(auth->authorizer);
3902                 auth->authorizer = NULL;
3903         }
3904         if (!auth->authorizer) {
3905                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3906                                                       auth);
3907                 if (ret)
3908                         return ERR_PTR(ret);
3909         } else {
3910                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3911                                                       auth);
3912                 if (ret)
3913                         return ERR_PTR(ret);
3914         }
3915         *proto = ac->protocol;
3916
3917         return auth;
3918 }
3919
3920
3921 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3922 {
3923         struct ceph_mds_session *s = con->private;
3924         struct ceph_mds_client *mdsc = s->s_mdsc;
3925         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3926
3927         return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3928 }
3929
3930 static int invalidate_authorizer(struct ceph_connection *con)
3931 {
3932         struct ceph_mds_session *s = con->private;
3933         struct ceph_mds_client *mdsc = s->s_mdsc;
3934         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3935
3936         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3937
3938         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3939 }
3940
3941 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3942                                 struct ceph_msg_header *hdr, int *skip)
3943 {
3944         struct ceph_msg *msg;
3945         int type = (int) le16_to_cpu(hdr->type);
3946         int front_len = (int) le32_to_cpu(hdr->front_len);
3947
3948         if (con->in_msg)
3949                 return con->in_msg;
3950
3951         *skip = 0;
3952         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3953         if (!msg) {
3954                 pr_err("unable to allocate msg type %d len %d\n",
3955                        type, front_len);
3956                 return NULL;
3957         }
3958
3959         return msg;
3960 }
3961
3962 static int mds_sign_message(struct ceph_msg *msg)
3963 {
3964        struct ceph_mds_session *s = msg->con->private;
3965        struct ceph_auth_handshake *auth = &s->s_auth;
3966
3967        return ceph_auth_sign_message(auth, msg);
3968 }
3969
3970 static int mds_check_message_signature(struct ceph_msg *msg)
3971 {
3972        struct ceph_mds_session *s = msg->con->private;
3973        struct ceph_auth_handshake *auth = &s->s_auth;
3974
3975        return ceph_auth_check_message_signature(auth, msg);
3976 }
3977
3978 static const struct ceph_connection_operations mds_con_ops = {
3979         .get = con_get,
3980         .put = con_put,
3981         .dispatch = dispatch,
3982         .get_authorizer = get_authorizer,
3983         .verify_authorizer_reply = verify_authorizer_reply,
3984         .invalidate_authorizer = invalidate_authorizer,
3985         .peer_reset = peer_reset,
3986         .alloc_msg = mds_alloc_msg,
3987         .sign_message = mds_sign_message,
3988         .check_message_signature = mds_check_message_signature,
3989 };
3990
3991 /* eof */