ceph: map snapid to anonymous bdev ID
[linux-block.git] / fs / ceph / mds_client.c
index 163fc74bf22174be678f7a826a6eaddc953872c0..f2f57775d2d586c0d5783e5c0c821446bdfa87f6 100644 (file)
@@ -20,6 +20,8 @@
 #include <linux/ceph/auth.h>
 #include <linux/ceph/debugfs.h>
 
+#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
+
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
  * managing the file system namespace (the directory hierarchy and
  */
 
 struct ceph_reconnect_state {
-       int nr_caps;
+       struct ceph_mds_session *session;
+       int nr_caps, nr_realms;
        struct ceph_pagelist *pagelist;
        unsigned msg_version;
+       bool allow_multi;
 };
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
@@ -2777,6 +2781,25 @@ bad:
        pr_err("mdsc_handle_forward decode error err=%d\n", err);
 }
 
+static int __decode_and_drop_session_metadata(void **p, void *end)
+{
+       /* map<string,string> */
+       u32 n;
+       ceph_decode_32_safe(p, end, n, bad);
+       while (n-- > 0) {
+               u32 len;
+               ceph_decode_32_safe(p, end, len, bad);
+               ceph_decode_need(p, end, len, bad);
+               *p += len;
+               ceph_decode_32_safe(p, end, len, bad);
+               ceph_decode_need(p, end, len, bad);
+               *p += len;
+       }
+       return 0;
+bad:
+       return -1;
+}
+
 /*
  * handle a mds session control message
  */
@@ -2784,18 +2807,36 @@ static void handle_session(struct ceph_mds_session *session,
                           struct ceph_msg *msg)
 {
        struct ceph_mds_client *mdsc = session->s_mdsc;
+       int mds = session->s_mds;
+       int msg_version = le16_to_cpu(msg->hdr.version);
+       void *p = msg->front.iov_base;
+       void *end = p + msg->front.iov_len;
+       struct ceph_mds_session_head *h;
        u32 op;
        u64 seq;
-       int mds = session->s_mds;
-       struct ceph_mds_session_head *h = msg->front.iov_base;
+       unsigned long features = 0;
        int wake = 0;
 
        /* decode */
-       if (msg->front.iov_len < sizeof(*h))
-               goto bad;
+       ceph_decode_need(&p, end, sizeof(*h), bad);
+       h = p;
+       p += sizeof(*h);
+
        op = le32_to_cpu(h->op);
        seq = le64_to_cpu(h->seq);
 
+       if (msg_version >= 3) {
+               u32 len;
+               /* version >= 2, metadata */
+               if (__decode_and_drop_session_metadata(&p, end) < 0)
+                       goto bad;
+               /* version >= 3, feature bits */
+               ceph_decode_32_safe(&p, end, len, bad);
+               ceph_decode_need(&p, end, len, bad);
+               memcpy(&features, p, min_t(size_t, len, sizeof(features)));
+               p += len;
+       }
+
        mutex_lock(&mdsc->mutex);
        if (op == CEPH_SESSION_CLOSE) {
                get_session(session);
@@ -2821,6 +2862,7 @@ static void handle_session(struct ceph_mds_session *session,
                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
                        pr_info("mds%d reconnect success\n", session->s_mds);
                session->s_state = CEPH_MDS_SESSION_OPEN;
+               session->s_features = features;
                renewed_caps(mdsc, session, 0);
                wake = 1;
                if (mdsc->stopping)
@@ -2947,6 +2989,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
        mutex_unlock(&mdsc->mutex);
 }
 
+static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
+{
+       struct ceph_msg *reply;
+       struct ceph_pagelist *_pagelist;
+       struct page *page;
+       __le32 *addr;
+       int err = -ENOMEM;
+
+       if (!recon_state->allow_multi)
+               return -ENOSPC;
+
+       /* can't handle message that contains both caps and realm */
+       BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
+
+       /* pre-allocate new pagelist */
+       _pagelist = ceph_pagelist_alloc(GFP_NOFS);
+       if (!_pagelist)
+               return -ENOMEM;
+
+       reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
+       if (!reply)
+               goto fail_msg;
+
+       /* placeholder for nr_caps */
+       err = ceph_pagelist_encode_32(_pagelist, 0);
+       if (err < 0)
+               goto fail;
+
+       if (recon_state->nr_caps) {
+               /* currently encoding caps */
+               err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
+               if (err)
+                       goto fail;
+       } else {
+               /* placeholder for nr_realms (currently encoding relams) */
+               err = ceph_pagelist_encode_32(_pagelist, 0);
+               if (err < 0)
+                       goto fail;
+       }
+
+       err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
+       if (err)
+               goto fail;
+
+       page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
+       addr = kmap_atomic(page);
+       if (recon_state->nr_caps) {
+               /* currently encoding caps */
+               *addr = cpu_to_le32(recon_state->nr_caps);
+       } else {
+               /* currently encoding relams */
+               *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
+       }
+       kunmap_atomic(addr);
+
+       reply->hdr.version = cpu_to_le16(5);
+       reply->hdr.compat_version = cpu_to_le16(4);
+
+       reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
+       ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
+
+       ceph_con_send(&recon_state->session->s_con, reply);
+       ceph_pagelist_release(recon_state->pagelist);
+
+       recon_state->pagelist = _pagelist;
+       recon_state->nr_caps = 0;
+       recon_state->nr_realms = 0;
+       recon_state->msg_version = 5;
+       return 0;
+fail:
+       ceph_msg_put(reply);
+fail_msg:
+       ceph_pagelist_release(_pagelist);
+       return err;
+}
+
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
@@ -2966,9 +3084,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
             inode, ceph_vinop(inode), cap, cap->cap_id,
             ceph_cap_string(cap->issued));
-       err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
-       if (err)
-               return err;
 
        spin_lock(&ci->i_ceph_lock);
        cap->seq = 0;        /* reset cap seq */
@@ -3008,7 +3123,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        if (recon_state->msg_version >= 2) {
                int num_fcntl_locks, num_flock_locks;
                struct ceph_filelock *flocks = NULL;
-               size_t struct_len, total_len = 0;
+               size_t struct_len, total_len = sizeof(u64);
                u8 struct_v = 0;
 
 encode_again:
@@ -3043,7 +3158,7 @@ encode_again:
 
                if (recon_state->msg_version >= 3) {
                        /* version, compat_version and struct_len */
-                       total_len = 2 * sizeof(u8) + sizeof(u32);
+                       total_len += 2 * sizeof(u8) + sizeof(u32);
                        struct_v = 2;
                }
                /*
@@ -3060,12 +3175,19 @@ encode_again:
                        struct_len += sizeof(u64); /* snap_follows */
 
                total_len += struct_len;
-               err = ceph_pagelist_reserve(pagelist, total_len);
-               if (err) {
-                       kfree(flocks);
-                       goto out_err;
+
+               if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
+                       err = send_reconnect_partial(recon_state);
+                       if (err)
+                               goto out_freeflocks;
+                       pagelist = recon_state->pagelist;
                }
 
+               err = ceph_pagelist_reserve(pagelist, total_len);
+               if (err)
+                       goto out_freeflocks;
+
+               ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
                if (recon_state->msg_version >= 3) {
                        ceph_pagelist_encode_8(pagelist, struct_v);
                        ceph_pagelist_encode_8(pagelist, 1);
@@ -3077,7 +3199,7 @@ encode_again:
                                       num_fcntl_locks, num_flock_locks);
                if (struct_v >= 2)
                        ceph_pagelist_encode_64(pagelist, snap_follows);
-
+out_freeflocks:
                kfree(flocks);
        } else {
                u64 pathbase = 0;
@@ -3098,20 +3220,81 @@ encode_again:
                }
 
                err = ceph_pagelist_reserve(pagelist,
-                               pathlen + sizeof(u32) + sizeof(rec.v1));
+                                           sizeof(u64) + sizeof(u32) +
+                                           pathlen + sizeof(rec.v1));
                if (err) {
-                       kfree(path);
-                       goto out_err;
+                       goto out_freepath;
                }
 
+               ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
                ceph_pagelist_encode_string(pagelist, path, pathlen);
                ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
-
+out_freepath:
                kfree(path);
        }
 
-       recon_state->nr_caps++;
 out_err:
+       if (err >= 0)
+               recon_state->nr_caps++;
+       return err;
+}
+
+static int encode_snap_realms(struct ceph_mds_client *mdsc,
+                             struct ceph_reconnect_state *recon_state)
+{
+       struct rb_node *p;
+       struct ceph_pagelist *pagelist = recon_state->pagelist;
+       int err = 0;
+
+       if (recon_state->msg_version >= 4) {
+               err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
+               if (err < 0)
+                       goto fail;
+       }
+
+       /*
+        * snaprealms.  we provide mds with the ino, seq (version), and
+        * parent for all of our realms.  If the mds has any newer info,
+        * it will tell us.
+        */
+       for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
+               struct ceph_snap_realm *realm =
+                      rb_entry(p, struct ceph_snap_realm, node);
+               struct ceph_mds_snaprealm_reconnect sr_rec;
+
+               if (recon_state->msg_version >= 4) {
+                       size_t need = sizeof(u8) * 2 + sizeof(u32) +
+                                     sizeof(sr_rec);
+
+                       if (pagelist->length + need > RECONNECT_MAX_SIZE) {
+                               err = send_reconnect_partial(recon_state);
+                               if (err)
+                                       goto fail;
+                               pagelist = recon_state->pagelist;
+                       }
+
+                       err = ceph_pagelist_reserve(pagelist, need);
+                       if (err)
+                               goto fail;
+
+                       ceph_pagelist_encode_8(pagelist, 1);
+                       ceph_pagelist_encode_8(pagelist, 1);
+                       ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
+               }
+
+               dout(" adding snap realm %llx seq %lld parent %llx\n",
+                    realm->ino, realm->seq, realm->parent_ino);
+               sr_rec.ino = cpu_to_le64(realm->ino);
+               sr_rec.seq = cpu_to_le64(realm->seq);
+               sr_rec.parent = cpu_to_le64(realm->parent_ino);
+
+               err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+               if (err)
+                       goto fail;
+
+               recon_state->nr_realms++;
+       }
+fail:
        return err;
 }
 
@@ -3132,18 +3315,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
                               struct ceph_mds_session *session)
 {
        struct ceph_msg *reply;
-       struct rb_node *p;
        int mds = session->s_mds;
        int err = -ENOMEM;
-       int s_nr_caps;
-       struct ceph_pagelist *pagelist;
-       struct ceph_reconnect_state recon_state;
+       struct ceph_reconnect_state recon_state = {
+               .session = session,
+       };
        LIST_HEAD(dispose);
 
        pr_info("mds%d reconnect start\n", mds);
 
-       pagelist = ceph_pagelist_alloc(GFP_NOFS);
-       if (!pagelist)
+       recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
+       if (!recon_state.pagelist)
                goto fail_nopagelist;
 
        reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
@@ -3187,63 +3369,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        /* replay unsafe requests */
        replay_unsafe_requests(mdsc, session);
 
+       ceph_early_kick_flushing_caps(mdsc, session);
+
        down_read(&mdsc->snap_rwsem);
 
-       /* traverse this session's caps */
-       s_nr_caps = session->s_nr_caps;
-       err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
+       /* placeholder for nr_caps */
+       err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
        if (err)
                goto fail;
 
-       recon_state.nr_caps = 0;
-       recon_state.pagelist = pagelist;
-       if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+       if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
                recon_state.msg_version = 3;
-       else
+               recon_state.allow_multi = true;
+       } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
+               recon_state.msg_version = 3;
+       } else {
                recon_state.msg_version = 2;
+       }
+       /* trsaverse this session's caps */
        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
-       if (err < 0)
-               goto fail;
 
        spin_lock(&session->s_cap_lock);
        session->s_cap_reconnect = 0;
        spin_unlock(&session->s_cap_lock);
 
-       /*
-        * snaprealms.  we provide mds with the ino, seq (version), and
-        * parent for all of our realms.  If the mds has any newer info,
-        * it will tell us.
-        */
-       for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
-               struct ceph_snap_realm *realm =
-                       rb_entry(p, struct ceph_snap_realm, node);
-               struct ceph_mds_snaprealm_reconnect sr_rec;
+       if (err < 0)
+               goto fail;
 
-               dout(" adding snap realm %llx seq %lld parent %llx\n",
-                    realm->ino, realm->seq, realm->parent_ino);
-               sr_rec.ino = cpu_to_le64(realm->ino);
-               sr_rec.seq = cpu_to_le64(realm->seq);
-               sr_rec.parent = cpu_to_le64(realm->parent_ino);
-               err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
-               if (err)
-                       goto fail;
+       /* check if all realms can be encoded into current message */
+       if (mdsc->num_snap_realms) {
+               size_t total_len =
+                       recon_state.pagelist->length +
+                       mdsc->num_snap_realms *
+                       sizeof(struct ceph_mds_snaprealm_reconnect);
+               if (recon_state.msg_version >= 4) {
+                       /* number of realms */
+                       total_len += sizeof(u32);
+                       /* version, compat_version and struct_len */
+                       total_len += mdsc->num_snap_realms *
+                                    (2 * sizeof(u8) + sizeof(u32));
+               }
+               if (total_len > RECONNECT_MAX_SIZE) {
+                       if (!recon_state.allow_multi) {
+                               err = -ENOSPC;
+                               goto fail;
+                       }
+                       if (recon_state.nr_caps) {
+                               err = send_reconnect_partial(&recon_state);
+                               if (err)
+                                       goto fail;
+                       }
+                       recon_state.msg_version = 5;
+               }
        }
 
-       reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+       err = encode_snap_realms(mdsc, &recon_state);
+       if (err < 0)
+               goto fail;
 
-       /* raced with cap release? */
-       if (s_nr_caps != recon_state.nr_caps) {
-               struct page *page = list_first_entry(&pagelist->head,
-                                                    struct page, lru);
+       if (recon_state.msg_version >= 5) {
+               err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
+               if (err < 0)
+                       goto fail;
+       }
+
+       if (recon_state.nr_caps || recon_state.nr_realms) {
+               struct page *page =
+                       list_first_entry(&recon_state.pagelist->head,
+                                       struct page, lru);
                __le32 *addr = kmap_atomic(page);
-               *addr = cpu_to_le32(recon_state.nr_caps);
+               if (recon_state.nr_caps) {
+                       WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
+                       *addr = cpu_to_le32(recon_state.nr_caps);
+               } else if (recon_state.msg_version >= 4) {
+                       *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
+               }
                kunmap_atomic(addr);
        }
 
-       reply->hdr.data_len = cpu_to_le32(pagelist->length);
-       ceph_msg_data_add_pagelist(reply, pagelist);
+       reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+       if (recon_state.msg_version >= 4)
+               reply->hdr.compat_version = cpu_to_le16(4);
 
-       ceph_early_kick_flushing_caps(mdsc, session);
+       reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
+       ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
 
        ceph_con_send(&session->s_con, reply);
 
@@ -3254,7 +3463,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        mutex_unlock(&mdsc->mutex);
 
        up_read(&mdsc->snap_rwsem);
-       ceph_pagelist_release(pagelist);
+       ceph_pagelist_release(recon_state.pagelist);
        return;
 
 fail:
@@ -3262,7 +3471,7 @@ fail:
        up_read(&mdsc->snap_rwsem);
        mutex_unlock(&session->s_mutex);
 fail_nomsg:
-       ceph_pagelist_release(pagelist);
+       ceph_pagelist_release(recon_state.pagelist);
 fail_nopagelist:
        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
        return;
@@ -3582,6 +3791,8 @@ static void delayed_work(struct work_struct *work)
        dout("mdsc delayed_work\n");
        ceph_check_delayed_caps(mdsc);
 
+       ceph_trim_snapid_map(mdsc);
+
        mutex_lock(&mdsc->mutex);
        renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
        renew_caps = time_after_eq(jiffies, HZ*renew_interval +
@@ -3660,6 +3871,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        init_rwsem(&mdsc->snap_rwsem);
        mdsc->snap_realms = RB_ROOT;
        INIT_LIST_HEAD(&mdsc->snap_empty);
+       mdsc->num_snap_realms = 0;
        spin_lock_init(&mdsc->snap_empty_lock);
        mdsc->last_tid = 0;
        mdsc->oldest_tid = 0;
@@ -3683,6 +3895,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        ceph_caps_init(mdsc);
        ceph_adjust_min_caps(mdsc, fsc->min_caps);
 
+       spin_lock_init(&mdsc->snapid_map_lock);
+       mdsc->snapid_map_tree = RB_ROOT;
+       INIT_LIST_HEAD(&mdsc->snapid_map_lru);
+
        init_rwsem(&mdsc->pool_perm_rwsem);
        mdsc->pool_perm_tree = RB_ROOT;
 
@@ -3876,6 +4092,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        WARN_ON(!list_empty(&mdsc->cap_delay_list));
        mutex_unlock(&mdsc->mutex);
 
+       ceph_cleanup_snapid_map(mdsc);
+
        ceph_cleanup_empty_realms(mdsc);
 
        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */