80f5339d097bdaa5af28719534f5db172b179ded
[linux-2.6-block.git] / fs / ceph / dir.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/spinlock.h>
4 #include <linux/fs_struct.h>
5 #include <linux/namei.h>
6 #include <linux/slab.h>
7 #include <linux/sched.h>
8 #include <linux/xattr.h>
9
10 #include "super.h"
11 #include "mds_client.h"
12
13 /*
14  * Directory operations: readdir, lookup, create, link, unlink,
15  * rename, etc.
16  */
17
18 /*
19  * Ceph MDS operations are specified in terms of a base ino and
20  * relative path.  Thus, the client can specify an operation on a
21  * specific inode (e.g., a getattr due to fstat(2)), or as a path
22  * relative to, say, the root directory.
23  *
24  * Normally, we limit ourselves to strict inode ops (no path component)
25  * or dentry operations (a single path component relative to an ino).  The
26  * exception to this is open_root_dentry(), which will open the mount
27  * point by name.
28  */
29
30 const struct dentry_operations ceph_dentry_ops;
31
32 /*
33  * Initialize ceph dentry state.
34  */
35 int ceph_init_dentry(struct dentry *dentry)
36 {
37         struct ceph_dentry_info *di;
38
39         if (dentry->d_fsdata)
40                 return 0;
41
42         di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
43         if (!di)
44                 return -ENOMEM;          /* oh well */
45
46         spin_lock(&dentry->d_lock);
47         if (dentry->d_fsdata) {
48                 /* lost a race */
49                 kmem_cache_free(ceph_dentry_cachep, di);
50                 goto out_unlock;
51         }
52
53         di->dentry = dentry;
54         di->lease_session = NULL;
55         di->time = jiffies;
56         /* avoid reordering d_fsdata setup so that the check above is safe */
57         smp_mb();
58         dentry->d_fsdata = di;
59         ceph_dentry_lru_add(dentry);
60 out_unlock:
61         spin_unlock(&dentry->d_lock);
62         return 0;
63 }
64
65 /*
66  * for f_pos for readdir:
67  * - hash order:
68  *      (0xff << 52) | ((24 bits hash) << 28) |
69  *      (the nth entry has hash collision);
70  * - frag+name order;
71  *      ((frag value) << 28) | (the nth entry in frag);
72  */
73 #define OFFSET_BITS     28
74 #define OFFSET_MASK     ((1 << OFFSET_BITS) - 1)
75 #define HASH_ORDER      (0xffull << (OFFSET_BITS + 24))
76 loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
77 {
78         loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
79         if (hash_order)
80                 fpos |= HASH_ORDER;
81         return fpos;
82 }
83
84 static bool is_hash_order(loff_t p)
85 {
86         return (p & HASH_ORDER) == HASH_ORDER;
87 }
88
89 static unsigned fpos_frag(loff_t p)
90 {
91         return p >> OFFSET_BITS;
92 }
93
94 static unsigned fpos_hash(loff_t p)
95 {
96         return ceph_frag_value(fpos_frag(p));
97 }
98
99 static unsigned fpos_off(loff_t p)
100 {
101         return p & OFFSET_MASK;
102 }
103
104 static int fpos_cmp(loff_t l, loff_t r)
105 {
106         int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
107         if (v)
108                 return v;
109         return (int)(fpos_off(l) - fpos_off(r));
110 }
111
112 /*
113  * make note of the last dentry we read, so we can
114  * continue at the same lexicographical point,
115  * regardless of what dir changes take place on the
116  * server.
117  */
118 static int note_last_dentry(struct ceph_file_info *fi, const char *name,
119                             int len, unsigned next_offset)
120 {
121         char *buf = kmalloc(len+1, GFP_KERNEL);
122         if (!buf)
123                 return -ENOMEM;
124         kfree(fi->last_name);
125         fi->last_name = buf;
126         memcpy(fi->last_name, name, len);
127         fi->last_name[len] = 0;
128         fi->next_offset = next_offset;
129         dout("note_last_dentry '%s'\n", fi->last_name);
130         return 0;
131 }
132
133
134 static struct dentry *
135 __dcache_find_get_entry(struct dentry *parent, u64 idx,
136                         struct ceph_readdir_cache_control *cache_ctl)
137 {
138         struct inode *dir = d_inode(parent);
139         struct dentry *dentry;
140         unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
141         loff_t ptr_pos = idx * sizeof(struct dentry *);
142         pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
143
144         if (ptr_pos >= i_size_read(dir))
145                 return NULL;
146
147         if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
148                 ceph_readdir_cache_release(cache_ctl);
149                 cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
150                 if (!cache_ctl->page) {
151                         dout(" page %lu not found\n", ptr_pgoff);
152                         return ERR_PTR(-EAGAIN);
153                 }
154                 /* reading/filling the cache are serialized by
155                    i_mutex, no need to use page lock */
156                 unlock_page(cache_ctl->page);
157                 cache_ctl->dentries = kmap(cache_ctl->page);
158         }
159
160         cache_ctl->index = idx & idx_mask;
161
162         rcu_read_lock();
163         spin_lock(&parent->d_lock);
164         /* check i_size again here, because empty directory can be
165          * marked as complete while not holding the i_mutex. */
166         if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
167                 dentry = cache_ctl->dentries[cache_ctl->index];
168         else
169                 dentry = NULL;
170         spin_unlock(&parent->d_lock);
171         if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
172                 dentry = NULL;
173         rcu_read_unlock();
174         return dentry ? : ERR_PTR(-EAGAIN);
175 }
176
177 /*
178  * When possible, we try to satisfy a readdir by peeking at the
179  * dcache.  We make this work by carefully ordering dentries on
180  * d_child when we initially get results back from the MDS, and
181  * falling back to a "normal" sync readdir if any dentries in the dir
182  * are dropped.
183  *
184  * Complete dir indicates that we have all dentries in the dir.  It is
185  * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
186  * the MDS if/when the directory is modified).
187  */
188 static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
189                             u32 shared_gen)
190 {
191         struct ceph_file_info *fi = file->private_data;
192         struct dentry *parent = file->f_path.dentry;
193         struct inode *dir = d_inode(parent);
194         struct dentry *dentry, *last = NULL;
195         struct ceph_dentry_info *di;
196         struct ceph_readdir_cache_control cache_ctl = {};
197         u64 idx = 0;
198         int err = 0;
199
200         dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
201
202         /* search start position */
203         if (ctx->pos > 2) {
204                 u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
205                 while (count > 0) {
206                         u64 step = count >> 1;
207                         dentry = __dcache_find_get_entry(parent, idx + step,
208                                                          &cache_ctl);
209                         if (!dentry) {
210                                 /* use linar search */
211                                 idx = 0;
212                                 break;
213                         }
214                         if (IS_ERR(dentry)) {
215                                 err = PTR_ERR(dentry);
216                                 goto out;
217                         }
218                         di = ceph_dentry(dentry);
219                         spin_lock(&dentry->d_lock);
220                         if (fpos_cmp(di->offset, ctx->pos) < 0) {
221                                 idx += step + 1;
222                                 count -= step + 1;
223                         } else {
224                                 count = step;
225                         }
226                         spin_unlock(&dentry->d_lock);
227                         dput(dentry);
228                 }
229
230                 dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
231         }
232
233
234         for (;;) {
235                 bool emit_dentry = false;
236                 dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
237                 if (!dentry) {
238                         fi->flags |= CEPH_F_ATEND;
239                         err = 0;
240                         break;
241                 }
242                 if (IS_ERR(dentry)) {
243                         err = PTR_ERR(dentry);
244                         goto out;
245                 }
246
247                 di = ceph_dentry(dentry);
248                 spin_lock(&dentry->d_lock);
249                 if (di->lease_shared_gen == shared_gen &&
250                     d_really_is_positive(dentry) &&
251                     fpos_cmp(ctx->pos, di->offset) <= 0) {
252                         emit_dentry = true;
253                 }
254                 spin_unlock(&dentry->d_lock);
255
256                 if (emit_dentry) {
257                         dout(" %llx dentry %p %pd %p\n", di->offset,
258                              dentry, dentry, d_inode(dentry));
259                         ctx->pos = di->offset;
260                         if (!dir_emit(ctx, dentry->d_name.name,
261                                       dentry->d_name.len,
262                                       ceph_translate_ino(dentry->d_sb,
263                                                          d_inode(dentry)->i_ino),
264                                       d_inode(dentry)->i_mode >> 12)) {
265                                 dput(dentry);
266                                 err = 0;
267                                 break;
268                         }
269                         ctx->pos++;
270
271                         if (last)
272                                 dput(last);
273                         last = dentry;
274                 } else {
275                         dput(dentry);
276                 }
277         }
278 out:
279         ceph_readdir_cache_release(&cache_ctl);
280         if (last) {
281                 int ret;
282                 di = ceph_dentry(last);
283                 ret = note_last_dentry(fi, last->d_name.name, last->d_name.len,
284                                        fpos_off(di->offset) + 1);
285                 if (ret < 0)
286                         err = ret;
287                 dput(last);
288         }
289         return err;
290 }
291
292 static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
293 {
294         if (!fi->last_readdir)
295                 return true;
296         if (is_hash_order(pos))
297                 return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
298         else
299                 return fi->frag != fpos_frag(pos);
300 }
301
302 static int ceph_readdir(struct file *file, struct dir_context *ctx)
303 {
304         struct ceph_file_info *fi = file->private_data;
305         struct inode *inode = file_inode(file);
306         struct ceph_inode_info *ci = ceph_inode(inode);
307         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
308         struct ceph_mds_client *mdsc = fsc->mdsc;
309         int i;
310         int err;
311         u32 ftype;
312         struct ceph_mds_reply_info_parsed *rinfo;
313
314         dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
315         if (fi->flags & CEPH_F_ATEND)
316                 return 0;
317
318         /* always start with . and .. */
319         if (ctx->pos == 0) {
320                 dout("readdir off 0 -> '.'\n");
321                 if (!dir_emit(ctx, ".", 1, 
322                             ceph_translate_ino(inode->i_sb, inode->i_ino),
323                             inode->i_mode >> 12))
324                         return 0;
325                 ctx->pos = 1;
326         }
327         if (ctx->pos == 1) {
328                 ino_t ino = parent_ino(file->f_path.dentry);
329                 dout("readdir off 1 -> '..'\n");
330                 if (!dir_emit(ctx, "..", 2,
331                             ceph_translate_ino(inode->i_sb, ino),
332                             inode->i_mode >> 12))
333                         return 0;
334                 ctx->pos = 2;
335         }
336
337         /* can we use the dcache? */
338         spin_lock(&ci->i_ceph_lock);
339         if (ceph_test_mount_opt(fsc, DCACHE) &&
340             !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
341             ceph_snap(inode) != CEPH_SNAPDIR &&
342             __ceph_dir_is_complete_ordered(ci) &&
343             __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
344                 u32 shared_gen = ci->i_shared_gen;
345                 spin_unlock(&ci->i_ceph_lock);
346                 err = __dcache_readdir(file, ctx, shared_gen);
347                 if (err != -EAGAIN)
348                         return err;
349         } else {
350                 spin_unlock(&ci->i_ceph_lock);
351         }
352
353         /* proceed with a normal readdir */
354 more:
355         /* do we have the correct frag content buffered? */
356         if (need_send_readdir(fi, ctx->pos)) {
357                 struct ceph_mds_request *req;
358                 unsigned frag;
359                 int op = ceph_snap(inode) == CEPH_SNAPDIR ?
360                         CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
361
362                 /* discard old result, if any */
363                 if (fi->last_readdir) {
364                         ceph_mdsc_put_request(fi->last_readdir);
365                         fi->last_readdir = NULL;
366                 }
367
368                 if (is_hash_order(ctx->pos)) {
369                         frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
370                                                 NULL, NULL);
371                 } else {
372                         frag = fpos_frag(ctx->pos);
373                 }
374
375                 dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
376                      ceph_vinop(inode), frag, fi->last_name);
377                 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
378                 if (IS_ERR(req))
379                         return PTR_ERR(req);
380                 err = ceph_alloc_readdir_reply_buffer(req, inode);
381                 if (err) {
382                         ceph_mdsc_put_request(req);
383                         return err;
384                 }
385                 /* hints to request -> mds selection code */
386                 req->r_direct_mode = USE_AUTH_MDS;
387                 req->r_direct_hash = ceph_frag_value(frag);
388                 req->r_direct_is_hash = true;
389                 if (fi->last_name) {
390                         req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
391                         if (!req->r_path2) {
392                                 ceph_mdsc_put_request(req);
393                                 return -ENOMEM;
394                         }
395                 }
396                 req->r_dir_release_cnt = fi->dir_release_count;
397                 req->r_dir_ordered_cnt = fi->dir_ordered_count;
398                 req->r_readdir_cache_idx = fi->readdir_cache_idx;
399                 req->r_readdir_offset = fi->next_offset;
400                 req->r_args.readdir.frag = cpu_to_le32(frag);
401                 req->r_args.readdir.flags =
402                                 cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
403
404                 req->r_inode = inode;
405                 ihold(inode);
406                 req->r_dentry = dget(file->f_path.dentry);
407                 err = ceph_mdsc_do_request(mdsc, NULL, req);
408                 if (err < 0) {
409                         ceph_mdsc_put_request(req);
410                         return err;
411                 }
412                 dout("readdir got and parsed readdir result=%d on "
413                      "frag %x, end=%d, complete=%d, hash_order=%d\n",
414                      err, frag,
415                      (int)req->r_reply_info.dir_end,
416                      (int)req->r_reply_info.dir_complete,
417                      (int)req->r_reply_info.hash_order);
418
419                 rinfo = &req->r_reply_info;
420                 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
421                         frag = le32_to_cpu(rinfo->dir_dir->frag);
422                         if (!rinfo->hash_order) {
423                                 fi->next_offset = req->r_readdir_offset;
424                                 /* adjust ctx->pos to beginning of frag */
425                                 ctx->pos = ceph_make_fpos(frag,
426                                                           fi->next_offset,
427                                                           false);
428                         }
429                 }
430
431                 fi->frag = frag;
432                 fi->last_readdir = req;
433
434                 if (req->r_did_prepopulate) {
435                         fi->readdir_cache_idx = req->r_readdir_cache_idx;
436                         if (fi->readdir_cache_idx < 0) {
437                                 /* preclude from marking dir ordered */
438                                 fi->dir_ordered_count = 0;
439                         } else if (ceph_frag_is_leftmost(frag) &&
440                                    fi->next_offset == 2) {
441                                 /* note dir version at start of readdir so
442                                  * we can tell if any dentries get dropped */
443                                 fi->dir_release_count = req->r_dir_release_cnt;
444                                 fi->dir_ordered_count = req->r_dir_ordered_cnt;
445                         }
446                 } else {
447                         dout("readdir !did_prepopulate");
448                         /* disable readdir cache */
449                         fi->readdir_cache_idx = -1;
450                         /* preclude from marking dir complete */
451                         fi->dir_release_count = 0;
452                 }
453
454                 /* note next offset and last dentry name */
455                 if (rinfo->dir_nr > 0) {
456                         struct ceph_mds_reply_dir_entry *rde =
457                                         rinfo->dir_entries + (rinfo->dir_nr-1);
458                         unsigned next_offset = req->r_reply_info.dir_end ?
459                                         2 : (fpos_off(rde->offset) + 1);
460                         err = note_last_dentry(fi, rde->name, rde->name_len,
461                                                next_offset);
462                         if (err)
463                                 return err;
464                 } else if (req->r_reply_info.dir_end) {
465                         fi->next_offset = 2;
466                         /* keep last name */
467                 }
468         }
469
470         rinfo = &fi->last_readdir->r_reply_info;
471         dout("readdir frag %x num %d pos %llx chunk first %llx\n",
472              fi->frag, rinfo->dir_nr, ctx->pos,
473              rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
474
475         i = 0;
476         /* search start position */
477         if (rinfo->dir_nr > 0) {
478                 int step, nr = rinfo->dir_nr;
479                 while (nr > 0) {
480                         step = nr >> 1;
481                         if (rinfo->dir_entries[i + step].offset < ctx->pos) {
482                                 i +=  step + 1;
483                                 nr -= step + 1;
484                         } else {
485                                 nr = step;
486                         }
487                 }
488         }
489         for (; i < rinfo->dir_nr; i++) {
490                 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
491                 struct ceph_vino vino;
492                 ino_t ino;
493
494                 BUG_ON(rde->offset < ctx->pos);
495
496                 ctx->pos = rde->offset;
497                 dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
498                      i, rinfo->dir_nr, ctx->pos,
499                      rde->name_len, rde->name, &rde->inode.in);
500
501                 BUG_ON(!rde->inode.in);
502                 ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
503                 vino.ino = le64_to_cpu(rde->inode.in->ino);
504                 vino.snap = le64_to_cpu(rde->inode.in->snapid);
505                 ino = ceph_vino_to_ino(vino);
506
507                 if (!dir_emit(ctx, rde->name, rde->name_len,
508                               ceph_translate_ino(inode->i_sb, ino), ftype)) {
509                         dout("filldir stopping us...\n");
510                         return 0;
511                 }
512                 ctx->pos++;
513         }
514
515         if (fi->next_offset > 2) {
516                 ceph_mdsc_put_request(fi->last_readdir);
517                 fi->last_readdir = NULL;
518                 goto more;
519         }
520
521         /* more frags? */
522         if (!ceph_frag_is_rightmost(fi->frag)) {
523                 unsigned frag = ceph_frag_next(fi->frag);
524                 if (is_hash_order(ctx->pos)) {
525                         loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
526                                                         fi->next_offset, true);
527                         if (new_pos > ctx->pos)
528                                 ctx->pos = new_pos;
529                         /* keep last_name */
530                 } else {
531                         ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
532                         kfree(fi->last_name);
533                         fi->last_name = NULL;
534                 }
535                 dout("readdir next frag is %x\n", frag);
536                 goto more;
537         }
538         fi->flags |= CEPH_F_ATEND;
539
540         /*
541          * if dir_release_count still matches the dir, no dentries
542          * were released during the whole readdir, and we should have
543          * the complete dir contents in our cache.
544          */
545         if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) {
546                 spin_lock(&ci->i_ceph_lock);
547                 if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) {
548                         dout(" marking %p complete and ordered\n", inode);
549                         /* use i_size to track number of entries in
550                          * readdir cache */
551                         BUG_ON(fi->readdir_cache_idx < 0);
552                         i_size_write(inode, fi->readdir_cache_idx *
553                                      sizeof(struct dentry*));
554                 } else {
555                         dout(" marking %p complete\n", inode);
556                 }
557                 __ceph_dir_set_complete(ci, fi->dir_release_count,
558                                         fi->dir_ordered_count);
559                 spin_unlock(&ci->i_ceph_lock);
560         }
561
562         dout("readdir %p file %p done.\n", inode, file);
563         return 0;
564 }
565
566 static void reset_readdir(struct ceph_file_info *fi)
567 {
568         if (fi->last_readdir) {
569                 ceph_mdsc_put_request(fi->last_readdir);
570                 fi->last_readdir = NULL;
571         }
572         kfree(fi->last_name);
573         fi->last_name = NULL;
574         fi->dir_release_count = 0;
575         fi->readdir_cache_idx = -1;
576         fi->next_offset = 2;  /* compensate for . and .. */
577         fi->flags &= ~CEPH_F_ATEND;
578 }
579
580 /*
581  * discard buffered readdir content on seekdir(0), or seek to new frag,
582  * or seek prior to current chunk
583  */
584 static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
585 {
586         struct ceph_mds_reply_info_parsed *rinfo;
587         loff_t chunk_offset;
588         if (new_pos == 0)
589                 return true;
590         if (is_hash_order(new_pos)) {
591                 /* no need to reset last_name for a forward seek when
592                  * dentries are sotred in hash order */
593         } else if (fi->frag != fpos_frag(new_pos)) {
594                 return true;
595         }
596         rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
597         if (!rinfo || !rinfo->dir_nr)
598                 return true;
599         chunk_offset = rinfo->dir_entries[0].offset;
600         return new_pos < chunk_offset ||
601                is_hash_order(new_pos) != is_hash_order(chunk_offset);
602 }
603
604 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
605 {
606         struct ceph_file_info *fi = file->private_data;
607         struct inode *inode = file->f_mapping->host;
608         loff_t retval;
609
610         inode_lock(inode);
611         retval = -EINVAL;
612         switch (whence) {
613         case SEEK_CUR:
614                 offset += file->f_pos;
615         case SEEK_SET:
616                 break;
617         case SEEK_END:
618                 retval = -EOPNOTSUPP;
619         default:
620                 goto out;
621         }
622
623         if (offset >= 0) {
624                 if (need_reset_readdir(fi, offset)) {
625                         dout("dir_llseek dropping %p content\n", file);
626                         reset_readdir(fi);
627                 } else if (is_hash_order(offset) && offset > file->f_pos) {
628                         /* for hash offset, we don't know if a forward seek
629                          * is within same frag */
630                         fi->dir_release_count = 0;
631                         fi->readdir_cache_idx = -1;
632                 }
633
634                 if (offset != file->f_pos) {
635                         file->f_pos = offset;
636                         file->f_version = 0;
637                         fi->flags &= ~CEPH_F_ATEND;
638                 }
639                 retval = offset;
640         }
641 out:
642         inode_unlock(inode);
643         return retval;
644 }
645
646 /*
647  * Handle lookups for the hidden .snap directory.
648  */
649 int ceph_handle_snapdir(struct ceph_mds_request *req,
650                         struct dentry *dentry, int err)
651 {
652         struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
653         struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */
654
655         /* .snap dir? */
656         if (err == -ENOENT &&
657             ceph_snap(parent) == CEPH_NOSNAP &&
658             strcmp(dentry->d_name.name,
659                    fsc->mount_options->snapdir_name) == 0) {
660                 struct inode *inode = ceph_get_snapdir(parent);
661                 dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n",
662                      dentry, dentry, inode);
663                 BUG_ON(!d_unhashed(dentry));
664                 d_add(dentry, inode);
665                 err = 0;
666         }
667         return err;
668 }
669
670 /*
671  * Figure out final result of a lookup/open request.
672  *
673  * Mainly, make sure we return the final req->r_dentry (if it already
674  * existed) in place of the original VFS-provided dentry when they
675  * differ.
676  *
677  * Gracefully handle the case where the MDS replies with -ENOENT and
678  * no trace (which it may do, at its discretion, e.g., if it doesn't
679  * care to issue a lease on the negative dentry).
680  */
681 struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
682                                   struct dentry *dentry, int err)
683 {
684         if (err == -ENOENT) {
685                 /* no trace? */
686                 err = 0;
687                 if (!req->r_reply_info.head->is_dentry) {
688                         dout("ENOENT and no trace, dentry %p inode %p\n",
689                              dentry, d_inode(dentry));
690                         if (d_really_is_positive(dentry)) {
691                                 d_drop(dentry);
692                                 err = -ENOENT;
693                         } else {
694                                 d_add(dentry, NULL);
695                         }
696                 }
697         }
698         if (err)
699                 dentry = ERR_PTR(err);
700         else if (dentry != req->r_dentry)
701                 dentry = dget(req->r_dentry);   /* we got spliced */
702         else
703                 dentry = NULL;
704         return dentry;
705 }
706
707 static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
708 {
709         return ceph_ino(inode) == CEPH_INO_ROOT &&
710                 strncmp(dentry->d_name.name, ".ceph", 5) == 0;
711 }
712
713 /*
714  * Look up a single dir entry.  If there is a lookup intent, inform
715  * the MDS so that it gets our 'caps wanted' value in a single op.
716  */
717 static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
718                                   unsigned int flags)
719 {
720         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
721         struct ceph_mds_client *mdsc = fsc->mdsc;
722         struct ceph_mds_request *req;
723         int op;
724         int mask;
725         int err;
726
727         dout("lookup %p dentry %p '%pd'\n",
728              dir, dentry, dentry);
729
730         if (dentry->d_name.len > NAME_MAX)
731                 return ERR_PTR(-ENAMETOOLONG);
732
733         err = ceph_init_dentry(dentry);
734         if (err < 0)
735                 return ERR_PTR(err);
736
737         /* can we conclude ENOENT locally? */
738         if (d_really_is_negative(dentry)) {
739                 struct ceph_inode_info *ci = ceph_inode(dir);
740                 struct ceph_dentry_info *di = ceph_dentry(dentry);
741
742                 spin_lock(&ci->i_ceph_lock);
743                 dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags);
744                 if (strncmp(dentry->d_name.name,
745                             fsc->mount_options->snapdir_name,
746                             dentry->d_name.len) &&
747                     !is_root_ceph_dentry(dir, dentry) &&
748                     ceph_test_mount_opt(fsc, DCACHE) &&
749                     __ceph_dir_is_complete(ci) &&
750                     (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) {
751                         spin_unlock(&ci->i_ceph_lock);
752                         dout(" dir %p complete, -ENOENT\n", dir);
753                         d_add(dentry, NULL);
754                         di->lease_shared_gen = ci->i_shared_gen;
755                         return NULL;
756                 }
757                 spin_unlock(&ci->i_ceph_lock);
758         }
759
760         op = ceph_snap(dir) == CEPH_SNAPDIR ?
761                 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
762         req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
763         if (IS_ERR(req))
764                 return ERR_CAST(req);
765         req->r_dentry = dget(dentry);
766         req->r_num_caps = 2;
767
768         mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
769         if (ceph_security_xattr_wanted(dir))
770                 mask |= CEPH_CAP_XATTR_SHARED;
771         req->r_args.getattr.mask = cpu_to_le32(mask);
772
773         req->r_locked_dir = dir;
774         err = ceph_mdsc_do_request(mdsc, NULL, req);
775         err = ceph_handle_snapdir(req, dentry, err);
776         dentry = ceph_finish_lookup(req, dentry, err);
777         ceph_mdsc_put_request(req);  /* will dput(dentry) */
778         dout("lookup result=%p\n", dentry);
779         return dentry;
780 }
781
782 /*
783  * If we do a create but get no trace back from the MDS, follow up with
784  * a lookup (the VFS expects us to link up the provided dentry).
785  */
786 int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
787 {
788         struct dentry *result = ceph_lookup(dir, dentry, 0);
789
790         if (result && !IS_ERR(result)) {
791                 /*
792                  * We created the item, then did a lookup, and found
793                  * it was already linked to another inode we already
794                  * had in our cache (and thus got spliced). To not
795                  * confuse VFS (especially when inode is a directory),
796                  * we don't link our dentry to that inode, return an
797                  * error instead.
798                  *
799                  * This event should be rare and it happens only when
800                  * we talk to old MDS. Recent MDS does not send traceless
801                  * reply for request that creates new inode.
802                  */
803                 d_drop(result);
804                 return -ESTALE;
805         }
806         return PTR_ERR(result);
807 }
808
809 static int ceph_mknod(struct inode *dir, struct dentry *dentry,
810                       umode_t mode, dev_t rdev)
811 {
812         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
813         struct ceph_mds_client *mdsc = fsc->mdsc;
814         struct ceph_mds_request *req;
815         struct ceph_acls_info acls = {};
816         int err;
817
818         if (ceph_snap(dir) != CEPH_NOSNAP)
819                 return -EROFS;
820
821         err = ceph_pre_init_acls(dir, &mode, &acls);
822         if (err < 0)
823                 return err;
824
825         dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
826              dir, dentry, mode, rdev);
827         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
828         if (IS_ERR(req)) {
829                 err = PTR_ERR(req);
830                 goto out;
831         }
832         req->r_dentry = dget(dentry);
833         req->r_num_caps = 2;
834         req->r_locked_dir = dir;
835         req->r_args.mknod.mode = cpu_to_le32(mode);
836         req->r_args.mknod.rdev = cpu_to_le32(rdev);
837         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
838         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
839         if (acls.pagelist) {
840                 req->r_pagelist = acls.pagelist;
841                 acls.pagelist = NULL;
842         }
843         err = ceph_mdsc_do_request(mdsc, dir, req);
844         if (!err && !req->r_reply_info.head->is_dentry)
845                 err = ceph_handle_notrace_create(dir, dentry);
846         ceph_mdsc_put_request(req);
847 out:
848         if (!err)
849                 ceph_init_inode_acls(d_inode(dentry), &acls);
850         else
851                 d_drop(dentry);
852         ceph_release_acls_info(&acls);
853         return err;
854 }
855
856 static int ceph_create(struct inode *dir, struct dentry *dentry, umode_t mode,
857                        bool excl)
858 {
859         return ceph_mknod(dir, dentry, mode, 0);
860 }
861
862 static int ceph_symlink(struct inode *dir, struct dentry *dentry,
863                             const char *dest)
864 {
865         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
866         struct ceph_mds_client *mdsc = fsc->mdsc;
867         struct ceph_mds_request *req;
868         int err;
869
870         if (ceph_snap(dir) != CEPH_NOSNAP)
871                 return -EROFS;
872
873         dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
874         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
875         if (IS_ERR(req)) {
876                 err = PTR_ERR(req);
877                 goto out;
878         }
879         req->r_path2 = kstrdup(dest, GFP_KERNEL);
880         if (!req->r_path2) {
881                 err = -ENOMEM;
882                 ceph_mdsc_put_request(req);
883                 goto out;
884         }
885         req->r_locked_dir = dir;
886         req->r_dentry = dget(dentry);
887         req->r_num_caps = 2;
888         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
889         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
890         err = ceph_mdsc_do_request(mdsc, dir, req);
891         if (!err && !req->r_reply_info.head->is_dentry)
892                 err = ceph_handle_notrace_create(dir, dentry);
893         ceph_mdsc_put_request(req);
894 out:
895         if (err)
896                 d_drop(dentry);
897         return err;
898 }
899
900 static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
901 {
902         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
903         struct ceph_mds_client *mdsc = fsc->mdsc;
904         struct ceph_mds_request *req;
905         struct ceph_acls_info acls = {};
906         int err = -EROFS;
907         int op;
908
909         if (ceph_snap(dir) == CEPH_SNAPDIR) {
910                 /* mkdir .snap/foo is a MKSNAP */
911                 op = CEPH_MDS_OP_MKSNAP;
912                 dout("mksnap dir %p snap '%pd' dn %p\n", dir,
913                      dentry, dentry);
914         } else if (ceph_snap(dir) == CEPH_NOSNAP) {
915                 dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
916                 op = CEPH_MDS_OP_MKDIR;
917         } else {
918                 goto out;
919         }
920
921         mode |= S_IFDIR;
922         err = ceph_pre_init_acls(dir, &mode, &acls);
923         if (err < 0)
924                 goto out;
925
926         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
927         if (IS_ERR(req)) {
928                 err = PTR_ERR(req);
929                 goto out;
930         }
931
932         req->r_dentry = dget(dentry);
933         req->r_num_caps = 2;
934         req->r_locked_dir = dir;
935         req->r_args.mkdir.mode = cpu_to_le32(mode);
936         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
937         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
938         if (acls.pagelist) {
939                 req->r_pagelist = acls.pagelist;
940                 acls.pagelist = NULL;
941         }
942         err = ceph_mdsc_do_request(mdsc, dir, req);
943         if (!err &&
944             !req->r_reply_info.head->is_target &&
945             !req->r_reply_info.head->is_dentry)
946                 err = ceph_handle_notrace_create(dir, dentry);
947         ceph_mdsc_put_request(req);
948 out:
949         if (!err)
950                 ceph_init_inode_acls(d_inode(dentry), &acls);
951         else
952                 d_drop(dentry);
953         ceph_release_acls_info(&acls);
954         return err;
955 }
956
957 static int ceph_link(struct dentry *old_dentry, struct inode *dir,
958                      struct dentry *dentry)
959 {
960         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
961         struct ceph_mds_client *mdsc = fsc->mdsc;
962         struct ceph_mds_request *req;
963         int err;
964
965         if (ceph_snap(dir) != CEPH_NOSNAP)
966                 return -EROFS;
967
968         dout("link in dir %p old_dentry %p dentry %p\n", dir,
969              old_dentry, dentry);
970         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
971         if (IS_ERR(req)) {
972                 d_drop(dentry);
973                 return PTR_ERR(req);
974         }
975         req->r_dentry = dget(dentry);
976         req->r_num_caps = 2;
977         req->r_old_dentry = dget(old_dentry);
978         req->r_locked_dir = dir;
979         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
980         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
981         /* release LINK_SHARED on source inode (mds will lock it) */
982         req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
983         err = ceph_mdsc_do_request(mdsc, dir, req);
984         if (err) {
985                 d_drop(dentry);
986         } else if (!req->r_reply_info.head->is_dentry) {
987                 ihold(d_inode(old_dentry));
988                 d_instantiate(dentry, d_inode(old_dentry));
989         }
990         ceph_mdsc_put_request(req);
991         return err;
992 }
993
994 /*
995  * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps.  If it
996  * looks like the link count will hit 0, drop any other caps (other
997  * than PIN) we don't specifically want (due to the file still being
998  * open).
999  */
1000 static int drop_caps_for_unlink(struct inode *inode)
1001 {
1002         struct ceph_inode_info *ci = ceph_inode(inode);
1003         int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
1004
1005         spin_lock(&ci->i_ceph_lock);
1006         if (inode->i_nlink == 1) {
1007                 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
1008                 ci->i_ceph_flags |= CEPH_I_NODELAY;
1009         }
1010         spin_unlock(&ci->i_ceph_lock);
1011         return drop;
1012 }
1013
1014 /*
1015  * rmdir and unlink are differ only by the metadata op code
1016  */
1017 static int ceph_unlink(struct inode *dir, struct dentry *dentry)
1018 {
1019         struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
1020         struct ceph_mds_client *mdsc = fsc->mdsc;
1021         struct inode *inode = d_inode(dentry);
1022         struct ceph_mds_request *req;
1023         int err = -EROFS;
1024         int op;
1025
1026         if (ceph_snap(dir) == CEPH_SNAPDIR) {
1027                 /* rmdir .snap/foo is RMSNAP */
1028                 dout("rmsnap dir %p '%pd' dn %p\n", dir, dentry, dentry);
1029                 op = CEPH_MDS_OP_RMSNAP;
1030         } else if (ceph_snap(dir) == CEPH_NOSNAP) {
1031                 dout("unlink/rmdir dir %p dn %p inode %p\n",
1032                      dir, dentry, inode);
1033                 op = d_is_dir(dentry) ?
1034                         CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
1035         } else
1036                 goto out;
1037         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1038         if (IS_ERR(req)) {
1039                 err = PTR_ERR(req);
1040                 goto out;
1041         }
1042         req->r_dentry = dget(dentry);
1043         req->r_num_caps = 2;
1044         req->r_locked_dir = dir;
1045         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
1046         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1047         req->r_inode_drop = drop_caps_for_unlink(inode);
1048         err = ceph_mdsc_do_request(mdsc, dir, req);
1049         if (!err && !req->r_reply_info.head->is_dentry)
1050                 d_delete(dentry);
1051         ceph_mdsc_put_request(req);
1052 out:
1053         return err;
1054 }
1055
1056 static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
1057                        struct inode *new_dir, struct dentry *new_dentry,
1058                        unsigned int flags)
1059 {
1060         struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb);
1061         struct ceph_mds_client *mdsc = fsc->mdsc;
1062         struct ceph_mds_request *req;
1063         int op = CEPH_MDS_OP_RENAME;
1064         int err;
1065
1066         if (flags)
1067                 return -EINVAL;
1068
1069         if (ceph_snap(old_dir) != ceph_snap(new_dir))
1070                 return -EXDEV;
1071         if (ceph_snap(old_dir) != CEPH_NOSNAP) {
1072                 if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR)
1073                         op = CEPH_MDS_OP_RENAMESNAP;
1074                 else
1075                         return -EROFS;
1076         }
1077         dout("rename dir %p dentry %p to dir %p dentry %p\n",
1078              old_dir, old_dentry, new_dir, new_dentry);
1079         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1080         if (IS_ERR(req))
1081                 return PTR_ERR(req);
1082         ihold(old_dir);
1083         req->r_dentry = dget(new_dentry);
1084         req->r_num_caps = 2;
1085         req->r_old_dentry = dget(old_dentry);
1086         req->r_old_dentry_dir = old_dir;
1087         req->r_locked_dir = new_dir;
1088         req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
1089         req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
1090         req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
1091         req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1092         /* release LINK_RDCACHE on source inode (mds will lock it) */
1093         req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
1094         if (d_really_is_positive(new_dentry))
1095                 req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry));
1096         err = ceph_mdsc_do_request(mdsc, old_dir, req);
1097         if (!err && !req->r_reply_info.head->is_dentry) {
1098                 /*
1099                  * Normally d_move() is done by fill_trace (called by
1100                  * do_request, above).  If there is no trace, we need
1101                  * to do it here.
1102                  */
1103
1104                 /* d_move screws up sibling dentries' offsets */
1105                 ceph_dir_clear_complete(old_dir);
1106                 ceph_dir_clear_complete(new_dir);
1107
1108                 d_move(old_dentry, new_dentry);
1109
1110                 /* ensure target dentry is invalidated, despite
1111                    rehashing bug in vfs_rename_dir */
1112                 ceph_invalidate_dentry_lease(new_dentry);
1113         }
1114         ceph_mdsc_put_request(req);
1115         return err;
1116 }
1117
1118 /*
1119  * Ensure a dentry lease will no longer revalidate.
1120  */
1121 void ceph_invalidate_dentry_lease(struct dentry *dentry)
1122 {
1123         spin_lock(&dentry->d_lock);
1124         ceph_dentry(dentry)->time = jiffies;
1125         ceph_dentry(dentry)->lease_shared_gen = 0;
1126         spin_unlock(&dentry->d_lock);
1127 }
1128
1129 /*
1130  * Check if dentry lease is valid.  If not, delete the lease.  Try to
1131  * renew if the least is more than half up.
1132  */
1133 static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
1134                                  struct inode *dir)
1135 {
1136         struct ceph_dentry_info *di;
1137         struct ceph_mds_session *s;
1138         int valid = 0;
1139         u32 gen;
1140         unsigned long ttl;
1141         struct ceph_mds_session *session = NULL;
1142         u32 seq = 0;
1143
1144         spin_lock(&dentry->d_lock);
1145         di = ceph_dentry(dentry);
1146         if (di && di->lease_session) {
1147                 s = di->lease_session;
1148                 spin_lock(&s->s_gen_ttl_lock);
1149                 gen = s->s_cap_gen;
1150                 ttl = s->s_cap_ttl;
1151                 spin_unlock(&s->s_gen_ttl_lock);
1152
1153                 if (di->lease_gen == gen &&
1154                     time_before(jiffies, di->time) &&
1155                     time_before(jiffies, ttl)) {
1156                         valid = 1;
1157                         if (di->lease_renew_after &&
1158                             time_after(jiffies, di->lease_renew_after)) {
1159                                 /*
1160                                  * We should renew. If we're in RCU walk mode
1161                                  * though, we can't do that so just return
1162                                  * -ECHILD.
1163                                  */
1164                                 if (flags & LOOKUP_RCU) {
1165                                         valid = -ECHILD;
1166                                 } else {
1167                                         session = ceph_get_mds_session(s);
1168                                         seq = di->lease_seq;
1169                                         di->lease_renew_after = 0;
1170                                         di->lease_renew_from = jiffies;
1171                                 }
1172                         }
1173                 }
1174         }
1175         spin_unlock(&dentry->d_lock);
1176
1177         if (session) {
1178                 ceph_mdsc_lease_send_msg(session, dir, dentry,
1179                                          CEPH_MDS_LEASE_RENEW, seq);
1180                 ceph_put_mds_session(session);
1181         }
1182         dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid);
1183         return valid;
1184 }
1185
1186 /*
1187  * Check if directory-wide content lease/cap is valid.
1188  */
1189 static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
1190 {
1191         struct ceph_inode_info *ci = ceph_inode(dir);
1192         struct ceph_dentry_info *di = ceph_dentry(dentry);
1193         int valid = 0;
1194
1195         spin_lock(&ci->i_ceph_lock);
1196         if (ci->i_shared_gen == di->lease_shared_gen)
1197                 valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
1198         spin_unlock(&ci->i_ceph_lock);
1199         dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
1200              dir, (unsigned)ci->i_shared_gen, dentry,
1201              (unsigned)di->lease_shared_gen, valid);
1202         return valid;
1203 }
1204
1205 /*
1206  * Check if cached dentry can be trusted.
1207  */
1208 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
1209 {
1210         int valid = 0;
1211         struct dentry *parent;
1212         struct inode *dir;
1213
1214         if (flags & LOOKUP_RCU) {
1215                 parent = ACCESS_ONCE(dentry->d_parent);
1216                 dir = d_inode_rcu(parent);
1217                 if (!dir)
1218                         return -ECHILD;
1219         } else {
1220                 parent = dget_parent(dentry);
1221                 dir = d_inode(parent);
1222         }
1223
1224         dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
1225              dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
1226
1227         /* always trust cached snapped dentries, snapdir dentry */
1228         if (ceph_snap(dir) != CEPH_NOSNAP) {
1229                 dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
1230                      dentry, d_inode(dentry));
1231                 valid = 1;
1232         } else if (d_really_is_positive(dentry) &&
1233                    ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
1234                 valid = 1;
1235         } else {
1236                 valid = dentry_lease_is_valid(dentry, flags, dir);
1237                 if (valid == -ECHILD)
1238                         return valid;
1239                 if (valid || dir_lease_is_valid(dir, dentry)) {
1240                         if (d_really_is_positive(dentry))
1241                                 valid = ceph_is_any_caps(d_inode(dentry));
1242                         else
1243                                 valid = 1;
1244                 }
1245         }
1246
1247         if (!valid) {
1248                 struct ceph_mds_client *mdsc =
1249                         ceph_sb_to_client(dir->i_sb)->mdsc;
1250                 struct ceph_mds_request *req;
1251                 int op, mask, err;
1252
1253                 if (flags & LOOKUP_RCU)
1254                         return -ECHILD;
1255
1256                 op = ceph_snap(dir) == CEPH_SNAPDIR ?
1257                         CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
1258                 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
1259                 if (!IS_ERR(req)) {
1260                         req->r_dentry = dget(dentry);
1261                         req->r_num_caps = 2;
1262
1263                         mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
1264                         if (ceph_security_xattr_wanted(dir))
1265                                 mask |= CEPH_CAP_XATTR_SHARED;
1266                         req->r_args.getattr.mask = mask;
1267
1268                         req->r_locked_dir = dir;
1269                         err = ceph_mdsc_do_request(mdsc, NULL, req);
1270                         if (err == 0 || err == -ENOENT) {
1271                                 if (dentry == req->r_dentry) {
1272                                         valid = !d_unhashed(dentry);
1273                                 } else {
1274                                         d_invalidate(req->r_dentry);
1275                                         err = -EAGAIN;
1276                                 }
1277                         }
1278                         ceph_mdsc_put_request(req);
1279                         dout("d_revalidate %p lookup result=%d\n",
1280                              dentry, err);
1281                 }
1282         }
1283
1284         dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
1285         if (valid) {
1286                 ceph_dentry_lru_touch(dentry);
1287         } else {
1288                 ceph_dir_clear_complete(dir);
1289         }
1290
1291         if (!(flags & LOOKUP_RCU))
1292                 dput(parent);
1293         return valid;
1294 }
1295
1296 /*
1297  * Release our ceph_dentry_info.
1298  */
1299 static void ceph_d_release(struct dentry *dentry)
1300 {
1301         struct ceph_dentry_info *di = ceph_dentry(dentry);
1302
1303         dout("d_release %p\n", dentry);
1304         ceph_dentry_lru_del(dentry);
1305
1306         spin_lock(&dentry->d_lock);
1307         dentry->d_fsdata = NULL;
1308         spin_unlock(&dentry->d_lock);
1309
1310         if (di->lease_session)
1311                 ceph_put_mds_session(di->lease_session);
1312         kmem_cache_free(ceph_dentry_cachep, di);
1313 }
1314
1315 /*
1316  * When the VFS prunes a dentry from the cache, we need to clear the
1317  * complete flag on the parent directory.
1318  *
1319  * Called under dentry->d_lock.
1320  */
1321 static void ceph_d_prune(struct dentry *dentry)
1322 {
1323         dout("ceph_d_prune %p\n", dentry);
1324
1325         /* do we have a valid parent? */
1326         if (IS_ROOT(dentry))
1327                 return;
1328
1329         /* if we are not hashed, we don't affect dir's completeness */
1330         if (d_unhashed(dentry))
1331                 return;
1332
1333         if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
1334                 return;
1335
1336         /*
1337          * we hold d_lock, so d_parent is stable, and d_fsdata is never
1338          * cleared until d_release
1339          */
1340         ceph_dir_clear_complete(d_inode(dentry->d_parent));
1341 }
1342
1343 /*
1344  * read() on a dir.  This weird interface hack only works if mounted
1345  * with '-o dirstat'.
1346  */
1347 static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
1348                              loff_t *ppos)
1349 {
1350         struct ceph_file_info *cf = file->private_data;
1351         struct inode *inode = file_inode(file);
1352         struct ceph_inode_info *ci = ceph_inode(inode);
1353         int left;
1354         const int bufsize = 1024;
1355
1356         if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT))
1357                 return -EISDIR;
1358
1359         if (!cf->dir_info) {
1360                 cf->dir_info = kmalloc(bufsize, GFP_KERNEL);
1361                 if (!cf->dir_info)
1362                         return -ENOMEM;
1363                 cf->dir_info_len =
1364                         snprintf(cf->dir_info, bufsize,
1365                                 "entries:   %20lld\n"
1366                                 " files:    %20lld\n"
1367                                 " subdirs:  %20lld\n"
1368                                 "rentries:  %20lld\n"
1369                                 " rfiles:   %20lld\n"
1370                                 " rsubdirs: %20lld\n"
1371                                 "rbytes:    %20lld\n"
1372                                 "rctime:    %10ld.%09ld\n",
1373                                 ci->i_files + ci->i_subdirs,
1374                                 ci->i_files,
1375                                 ci->i_subdirs,
1376                                 ci->i_rfiles + ci->i_rsubdirs,
1377                                 ci->i_rfiles,
1378                                 ci->i_rsubdirs,
1379                                 ci->i_rbytes,
1380                                 (long)ci->i_rctime.tv_sec,
1381                                 (long)ci->i_rctime.tv_nsec);
1382         }
1383
1384         if (*ppos >= cf->dir_info_len)
1385                 return 0;
1386         size = min_t(unsigned, size, cf->dir_info_len-*ppos);
1387         left = copy_to_user(buf, cf->dir_info + *ppos, size);
1388         if (left == size)
1389                 return -EFAULT;
1390         *ppos += (size - left);
1391         return size - left;
1392 }
1393
1394 /*
1395  * We maintain a private dentry LRU.
1396  *
1397  * FIXME: this needs to be changed to a per-mds lru to be useful.
1398  */
1399 void ceph_dentry_lru_add(struct dentry *dn)
1400 {
1401         struct ceph_dentry_info *di = ceph_dentry(dn);
1402         struct ceph_mds_client *mdsc;
1403
1404         dout("dentry_lru_add %p %p '%pd'\n", di, dn, dn);
1405         mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1406         spin_lock(&mdsc->dentry_lru_lock);
1407         list_add_tail(&di->lru, &mdsc->dentry_lru);
1408         mdsc->num_dentry++;
1409         spin_unlock(&mdsc->dentry_lru_lock);
1410 }
1411
1412 void ceph_dentry_lru_touch(struct dentry *dn)
1413 {
1414         struct ceph_dentry_info *di = ceph_dentry(dn);
1415         struct ceph_mds_client *mdsc;
1416
1417         dout("dentry_lru_touch %p %p '%pd' (offset %lld)\n", di, dn, dn,
1418              di->offset);
1419         mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1420         spin_lock(&mdsc->dentry_lru_lock);
1421         list_move_tail(&di->lru, &mdsc->dentry_lru);
1422         spin_unlock(&mdsc->dentry_lru_lock);
1423 }
1424
1425 void ceph_dentry_lru_del(struct dentry *dn)
1426 {
1427         struct ceph_dentry_info *di = ceph_dentry(dn);
1428         struct ceph_mds_client *mdsc;
1429
1430         dout("dentry_lru_del %p %p '%pd'\n", di, dn, dn);
1431         mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
1432         spin_lock(&mdsc->dentry_lru_lock);
1433         list_del_init(&di->lru);
1434         mdsc->num_dentry--;
1435         spin_unlock(&mdsc->dentry_lru_lock);
1436 }
1437
1438 /*
1439  * Return name hash for a given dentry.  This is dependent on
1440  * the parent directory's hash function.
1441  */
1442 unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
1443 {
1444         struct ceph_inode_info *dci = ceph_inode(dir);
1445
1446         switch (dci->i_dir_layout.dl_dir_hash) {
1447         case 0: /* for backward compat */
1448         case CEPH_STR_HASH_LINUX:
1449                 return dn->d_name.hash;
1450
1451         default:
1452                 return ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
1453                                      dn->d_name.name, dn->d_name.len);
1454         }
1455 }
1456
1457 const struct file_operations ceph_dir_fops = {
1458         .read = ceph_read_dir,
1459         .iterate = ceph_readdir,
1460         .llseek = ceph_dir_llseek,
1461         .open = ceph_open,
1462         .release = ceph_release,
1463         .unlocked_ioctl = ceph_ioctl,
1464         .fsync = ceph_fsync,
1465 };
1466
1467 const struct file_operations ceph_snapdir_fops = {
1468         .iterate = ceph_readdir,
1469         .llseek = ceph_dir_llseek,
1470         .open = ceph_open,
1471         .release = ceph_release,
1472 };
1473
1474 const struct inode_operations ceph_dir_iops = {
1475         .lookup = ceph_lookup,
1476         .permission = ceph_permission,
1477         .getattr = ceph_getattr,
1478         .setattr = ceph_setattr,
1479         .listxattr = ceph_listxattr,
1480         .get_acl = ceph_get_acl,
1481         .set_acl = ceph_set_acl,
1482         .mknod = ceph_mknod,
1483         .symlink = ceph_symlink,
1484         .mkdir = ceph_mkdir,
1485         .link = ceph_link,
1486         .unlink = ceph_unlink,
1487         .rmdir = ceph_unlink,
1488         .rename = ceph_rename,
1489         .create = ceph_create,
1490         .atomic_open = ceph_atomic_open,
1491 };
1492
1493 const struct inode_operations ceph_snapdir_iops = {
1494         .lookup = ceph_lookup,
1495         .permission = ceph_permission,
1496         .getattr = ceph_getattr,
1497         .mkdir = ceph_mkdir,
1498         .rmdir = ceph_unlink,
1499         .rename = ceph_rename,
1500 };
1501
1502 const struct dentry_operations ceph_dentry_ops = {
1503         .d_revalidate = ceph_d_revalidate,
1504         .d_release = ceph_d_release,
1505         .d_prune = ceph_d_prune,
1506 };