Merge branch 'for-linus-bugs' of git://git.kernel.org/pub/scm/linux/kernel/git/sage...
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 27 Nov 2013 02:02:46 +0000 (18:02 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 27 Nov 2013 02:02:46 +0000 (18:02 -0800)
Pull ceph bug-fixes from Sage Weil:
 "These include a couple fixes to the new fscache code that went in
  during the last cycle (which will need to go stable@ shortly as well),
  a couple client-side directory fragmentation fixes, a fix for a race
  in the cap release queuing path, and a couple race fixes in the
  request abort and resend code.

  Obviously some of this could have gone into 3.12 final, but I
  preferred to overtest rather than send things in for a late -rc, and
  then my travel schedule intervened"

* 'for-linus-bugs' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: allocate non-zero page to fscache in readpage()
  ceph: wake up 'safe' waiters when unregistering request
  ceph: cleanup aborted requests when re-sending requests.
  ceph: handle race between cap reconnect and cap release
  ceph: set caps count after composing cap reconnect message
  ceph: queue cap release in __ceph_remove_cap()
  ceph: handle frag mismatch between readdir request and reply
  ceph: remove outdated frag information
  ceph: hung on ceph fscache invalidate in some cases

fs/ceph/addr.c
fs/ceph/cache.c
fs/ceph/caps.c
fs/ceph/dir.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h

index 6df8bd481425379006912990ee6f9461eaf3cf1b..1e561c059539542e83a118edb003ffabca08506b 100644 (file)
@@ -216,7 +216,7 @@ static int readpage_nounlock(struct file *filp, struct page *page)
        }
        SetPageUptodate(page);
 
-       if (err == 0)
+       if (err >= 0)
                ceph_readpage_to_fscache(inode, page);
 
 out:
index 7db2e6ca4b8f0b07146c137a80e24567a03d3e43..8c44fdd4e1c39f836b2c8a9b2a7a025f1844d3b3 100644 (file)
@@ -324,6 +324,9 @@ void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
 
+       if (!PageFsCache(page))
+               return;
+
        fscache_wait_on_page_write(ci->fscache, page);
        fscache_uncache_page(ci->fscache, page);
 }
index 13976c33332ec1fd7ca3999053b15b7079c5ab31..3c0a4bd7499645ca8bf90fd1a6ba16f6831c164c 100644 (file)
@@ -897,7 +897,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
  * caller should hold i_ceph_lock.
  * caller will not hold session s_mutex if called from destroy_inode.
  */
-void __ceph_remove_cap(struct ceph_cap *cap)
+void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 {
        struct ceph_mds_session *session = cap->session;
        struct ceph_inode_info *ci = cap->ci;
@@ -909,6 +909,16 @@ void __ceph_remove_cap(struct ceph_cap *cap)
 
        /* remove from session list */
        spin_lock(&session->s_cap_lock);
+       /*
+        * s_cap_reconnect is protected by s_cap_lock. no one changes
+        * s_cap_gen while session is in the reconnect state.
+        */
+       if (queue_release &&
+           (!session->s_cap_reconnect ||
+            cap->cap_gen == session->s_cap_gen))
+               __queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
+                                   cap->mseq, cap->issue_seq);
+
        if (session->s_cap_iterator == cap) {
                /* not yet, we are iterating over this very cap */
                dout("__ceph_remove_cap  delaying %p removal from session %p\n",
@@ -1023,7 +1033,6 @@ void __queue_cap_release(struct ceph_mds_session *session,
        struct ceph_mds_cap_release *head;
        struct ceph_mds_cap_item *item;
 
-       spin_lock(&session->s_cap_lock);
        BUG_ON(!session->s_num_cap_releases);
        msg = list_first_entry(&session->s_cap_releases,
                               struct ceph_msg, list_head);
@@ -1052,7 +1061,6 @@ void __queue_cap_release(struct ceph_mds_session *session,
                     (int)CEPH_CAPS_PER_RELEASE,
                     (int)msg->front.iov_len);
        }
-       spin_unlock(&session->s_cap_lock);
 }
 
 /*
@@ -1067,12 +1075,8 @@ void ceph_queue_caps_release(struct inode *inode)
        p = rb_first(&ci->i_caps);
        while (p) {
                struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
-               struct ceph_mds_session *session = cap->session;
-
-               __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
-                                   cap->mseq, cap->issue_seq);
                p = rb_next(p);
-               __ceph_remove_cap(cap);
+               __ceph_remove_cap(cap, true);
        }
 }
 
@@ -2791,7 +2795,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        }
                        spin_unlock(&mdsc->cap_dirty_lock);
                }
-               __ceph_remove_cap(cap);
+               __ceph_remove_cap(cap, false);
        }
        /* else, we already released it */
 
@@ -2931,9 +2935,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        if (!inode) {
                dout(" i don't have ino %llx\n", vino.ino);
 
-               if (op == CEPH_CAP_OP_IMPORT)
+               if (op == CEPH_CAP_OP_IMPORT) {
+                       spin_lock(&session->s_cap_lock);
                        __queue_cap_release(session, vino.ino, cap_id,
                                            mseq, seq);
+                       spin_unlock(&session->s_cap_lock);
+               }
                goto flush_cap_releases;
        }
 
index 868b61d56cac77f3a8328d5ba4851ec7947fe827..2a0bcaeb189acd18b124aff8d54619667fd97bf2 100644 (file)
@@ -352,8 +352,18 @@ more:
                }
 
                /* note next offset and last dentry name */
+               rinfo = &req->r_reply_info;
+               if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+                       frag = le32_to_cpu(rinfo->dir_dir->frag);
+                       if (ceph_frag_is_leftmost(frag))
+                               fi->next_offset = 2;
+                       else
+                               fi->next_offset = 0;
+                       off = fi->next_offset;
+               }
                fi->offset = fi->next_offset;
                fi->last_readdir = req;
+               fi->frag = frag;
 
                if (req->r_reply_info.dir_end) {
                        kfree(fi->last_name);
@@ -363,7 +373,6 @@ more:
                        else
                                fi->next_offset = 0;
                } else {
-                       rinfo = &req->r_reply_info;
                        err = note_last_dentry(fi,
                                       rinfo->dir_dname[rinfo->dir_nr-1],
                                       rinfo->dir_dname_len[rinfo->dir_nr-1]);
index 8549a48115f71b23e1f35ef444caf3eb32dbced3..9a8e396aed89a43a0c824c3b682f96ac817ebc1c 100644 (file)
@@ -577,6 +577,8 @@ static int fill_inode(struct inode *inode,
        int issued = 0, implemented;
        struct timespec mtime, atime, ctime;
        u32 nsplits;
+       struct ceph_inode_frag *frag;
+       struct rb_node *rb_node;
        struct ceph_buffer *xattr_blob = NULL;
        int err = 0;
        int queue_trunc = 0;
@@ -751,15 +753,38 @@ no_change:
        /* FIXME: move me up, if/when version reflects fragtree changes */
        nsplits = le32_to_cpu(info->fragtree.nsplits);
        mutex_lock(&ci->i_fragtree_mutex);
+       rb_node = rb_first(&ci->i_fragtree);
        for (i = 0; i < nsplits; i++) {
                u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-               struct ceph_inode_frag *frag = __get_or_create_frag(ci, id);
-
-               if (IS_ERR(frag))
-                       continue;
+               frag = NULL;
+               while (rb_node) {
+                       frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+                       if (ceph_frag_compare(frag->frag, id) >= 0) {
+                               if (frag->frag != id)
+                                       frag = NULL;
+                               else
+                                       rb_node = rb_next(rb_node);
+                               break;
+                       }
+                       rb_node = rb_next(rb_node);
+                       rb_erase(&frag->node, &ci->i_fragtree);
+                       kfree(frag);
+                       frag = NULL;
+               }
+               if (!frag) {
+                       frag = __get_or_create_frag(ci, id);
+                       if (IS_ERR(frag))
+                               continue;
+               }
                frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
                dout(" frag %x split by %d\n", frag->frag, frag->split_by);
        }
+       while (rb_node) {
+               frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+               rb_node = rb_next(rb_node);
+               rb_erase(&frag->node, &ci->i_fragtree);
+               kfree(frag);
+       }
        mutex_unlock(&ci->i_fragtree_mutex);
 
        /* were we issued a capability? */
@@ -1250,8 +1275,20 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        int err = 0, i;
        struct inode *snapdir = NULL;
        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
-       u64 frag = le32_to_cpu(rhead->args.readdir.frag);
        struct ceph_dentry_info *di;
+       u64 r_readdir_offset = req->r_readdir_offset;
+       u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+
+       if (rinfo->dir_dir &&
+           le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+               dout("readdir_prepopulate got new frag %x -> %x\n",
+                    frag, le32_to_cpu(rinfo->dir_dir->frag));
+               frag = le32_to_cpu(rinfo->dir_dir->frag);
+               if (ceph_frag_is_leftmost(frag))
+                       r_readdir_offset = 2;
+               else
+                       r_readdir_offset = 0;
+       }
 
        if (req->r_aborted)
                return readdir_prepopulate_inodes_only(req, session);
@@ -1315,7 +1352,7 @@ retry_lookup:
                }
 
                di = dn->d_fsdata;
-               di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
+               di->offset = ceph_make_fpos(frag, i + r_readdir_offset);
 
                /* inode */
                if (dn->d_inode) {
index b7bda5d9611da031aaf6f104ece9fa6351993070..d90861f452107cc47b7242e8ea66dc1257f7c235 100644 (file)
@@ -43,6 +43,7 @@
  */
 
 struct ceph_reconnect_state {
+       int nr_caps;
        struct ceph_pagelist *pagelist;
        bool flock;
 };
@@ -443,6 +444,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
        s->s_num_cap_releases = 0;
+       s->s_cap_reconnect = 0;
        s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_releases_done);
@@ -642,6 +644,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
                req->r_unsafe_dir = NULL;
        }
 
+       complete_all(&req->r_safe_completion);
+
        ceph_mdsc_put_request(req);
 }
 
@@ -986,7 +990,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
        dout("removing cap %p, ci is %p, inode is %p\n",
             cap, ci, &ci->vfs_inode);
        spin_lock(&ci->i_ceph_lock);
-       __ceph_remove_cap(cap);
+       __ceph_remove_cap(cap, false);
        if (!__ceph_is_any_real_caps(ci)) {
                struct ceph_mds_client *mdsc =
                        ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -1231,9 +1235,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
        session->s_trim_caps--;
        if (oissued) {
                /* we aren't the only cap.. just remove us */
-               __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
-                                   cap->mseq, cap->issue_seq);
-               __ceph_remove_cap(cap);
+               __ceph_remove_cap(cap, true);
        } else {
                /* try to drop referring dentries */
                spin_unlock(&ci->i_ceph_lock);
@@ -1416,7 +1418,6 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
        unsigned num;
 
        dout("discard_cap_releases mds%d\n", session->s_mds);
-       spin_lock(&session->s_cap_lock);
 
        /* zero out the in-progress message */
        msg = list_first_entry(&session->s_cap_releases,
@@ -1443,8 +1444,6 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
                msg->front.iov_len = sizeof(*head);
                list_add(&msg->list_head, &session->s_cap_releases);
        }
-
-       spin_unlock(&session->s_cap_lock);
 }
 
 /*
@@ -1875,8 +1874,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
        int mds = -1;
        int err = -EAGAIN;
 
-       if (req->r_err || req->r_got_result)
+       if (req->r_err || req->r_got_result) {
+               if (req->r_aborted)
+                       __unregister_request(mdsc, req);
                goto out;
+       }
 
        if (req->r_timeout &&
            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
@@ -2186,7 +2188,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        if (head->safe) {
                req->r_got_safe = true;
                __unregister_request(mdsc, req);
-               complete_all(&req->r_safe_completion);
 
                if (req->r_got_unsafe) {
                        /*
@@ -2238,8 +2239,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
        if (err == 0) {
                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
-                                   req->r_op == CEPH_MDS_OP_LSSNAP) &&
-                   rinfo->dir_nr)
+                                   req->r_op == CEPH_MDS_OP_LSSNAP))
                        ceph_readdir_prepopulate(req, req->r_session);
                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
        }
@@ -2490,6 +2490,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        cap->seq = 0;        /* reset cap seq */
        cap->issue_seq = 0;  /* and issue_seq */
        cap->mseq = 0;       /* and migrate_seq */
+       cap->cap_gen = cap->session->s_cap_gen;
 
        if (recon_state->flock) {
                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -2552,6 +2553,8 @@ encode_again:
        } else {
                err = ceph_pagelist_append(pagelist, &rec, reclen);
        }
+
+       recon_state->nr_caps++;
 out_free:
        kfree(path);
 out_dput:
@@ -2579,6 +2582,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        struct rb_node *p;
        int mds = session->s_mds;
        int err = -ENOMEM;
+       int s_nr_caps;
        struct ceph_pagelist *pagelist;
        struct ceph_reconnect_state recon_state;
 
@@ -2610,20 +2614,38 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        dout("session %p state %s\n", session,
             session_state_name(session->s_state));
 
+       spin_lock(&session->s_gen_ttl_lock);
+       session->s_cap_gen++;
+       spin_unlock(&session->s_gen_ttl_lock);
+
+       spin_lock(&session->s_cap_lock);
+       /*
+        * notify __ceph_remove_cap() that we are composing cap reconnect.
+        * If a cap get released before being added to the cap reconnect,
+        * __ceph_remove_cap() should skip queuing cap release.
+        */
+       session->s_cap_reconnect = 1;
        /* drop old cap expires; we're about to reestablish that state */
        discard_cap_releases(mdsc, session);
+       spin_unlock(&session->s_cap_lock);
 
        /* traverse this session's caps */
-       err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
+       s_nr_caps = session->s_nr_caps;
+       err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
        if (err)
                goto fail;
 
+       recon_state.nr_caps = 0;
        recon_state.pagelist = pagelist;
        recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
        if (err < 0)
                goto fail;
 
+       spin_lock(&session->s_cap_lock);
+       session->s_cap_reconnect = 0;
+       spin_unlock(&session->s_cap_lock);
+
        /*
         * snaprealms.  we provide mds with the ino, seq (version), and
         * parent for all of our realms.  If the mds has any newer info,
@@ -2646,11 +2668,18 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 
        if (recon_state.flock)
                reply->hdr.version = cpu_to_le16(2);
-       if (pagelist->length) {
-               /* set up outbound data if we have any */
-               reply->hdr.data_len = cpu_to_le32(pagelist->length);
-               ceph_msg_data_add_pagelist(reply, pagelist);
+
+       /* raced with cap release? */
+       if (s_nr_caps != recon_state.nr_caps) {
+               struct page *page = list_first_entry(&pagelist->head,
+                                                    struct page, lru);
+               __le32 *addr = kmap_atomic(page);
+               *addr = cpu_to_le32(recon_state.nr_caps);
+               kunmap_atomic(addr);
        }
+
+       reply->hdr.data_len = cpu_to_le32(pagelist->length);
+       ceph_msg_data_add_pagelist(reply, pagelist);
        ceph_con_send(&session->s_con, reply);
 
        mutex_unlock(&session->s_mutex);
index c2a19fbbe5177b619b7a3d7e6132b626df8c8508..4c053d099ae4e60400dbcbdcce21844138ba8a47 100644 (file)
@@ -132,6 +132,7 @@ struct ceph_mds_session {
        struct list_head  s_caps;     /* all caps issued by this session */
        int               s_nr_caps, s_trim_caps;
        int               s_num_cap_releases;
+       int               s_cap_reconnect;
        struct list_head  s_cap_releases; /* waiting cap_release messages */
        struct list_head  s_cap_releases_done; /* ready to send */
        struct ceph_cap  *s_cap_iterator;
index 6014b0a3c405cb12dfb62fdac7887f83a4977b96..ef4ac38bb614a911680668fe52f6e7fa272d94ce 100644 (file)
@@ -741,13 +741,7 @@ extern int ceph_add_cap(struct inode *inode,
                        int fmode, unsigned issued, unsigned wanted,
                        unsigned cap, unsigned seq, u64 realmino, int flags,
                        struct ceph_cap_reservation *caps_reservation);
-extern void __ceph_remove_cap(struct ceph_cap *cap);
-static inline void ceph_remove_cap(struct ceph_cap *cap)
-{
-       spin_lock(&cap->ci->i_ceph_lock);
-       __ceph_remove_cap(cap);
-       spin_unlock(&cap->ci->i_ceph_lock);
-}
+extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
                         struct ceph_cap *cap);