libceph: monc hunt rate is 3s with backoff up to 30s
[linux-2.6-block.git] / net / ceph / mon_client.c
index de85dddc3dc08cfeee81435c4475a6d975c4cd96..fd1cf408fd899ab790d2bcea4a1514190f513a9e 100644 (file)
@@ -122,51 +122,80 @@ static void __close_session(struct ceph_mon_client *monc)
        ceph_msg_revoke(monc->m_subscribe);
        ceph_msg_revoke_incoming(monc->m_subscribe_ack);
        ceph_con_close(&monc->con);
-       monc->cur_mon = -1;
+
        monc->pending_auth = 0;
        ceph_auth_reset(monc->auth);
 }
 
 /*
- * Open a session with a (new) monitor.
+ * Pick a new monitor at random and set cur_mon.  If we are repicking
+ * (i.e. cur_mon is already set), be sure to pick a different one.
  */
-static int __open_session(struct ceph_mon_client *monc)
+static void pick_new_mon(struct ceph_mon_client *monc)
 {
-       char r;
-       int ret;
+       int old_mon = monc->cur_mon;
 
-       if (monc->cur_mon < 0) {
-               get_random_bytes(&r, 1);
-               monc->cur_mon = r % monc->monmap->num_mon;
-               dout("open_session num=%d r=%d -> mon%d\n",
-                    monc->monmap->num_mon, r, monc->cur_mon);
-               monc->sub_sent = 0;
-               monc->sub_renew_after = jiffies;  /* i.e., expired */
-               monc->want_next_osdmap = !!monc->want_next_osdmap;
-
-               dout("open_session mon%d opening\n", monc->cur_mon);
-               ceph_con_open(&monc->con,
-                             CEPH_ENTITY_TYPE_MON, monc->cur_mon,
-                             &monc->monmap->mon_inst[monc->cur_mon].addr);
-
-               /* send an initial keepalive to ensure our timestamp is
-                * valid by the time we are in an OPENED state */
-               ceph_con_keepalive(&monc->con);
-
-               /* initiatiate authentication handshake */
-               ret = ceph_auth_build_hello(monc->auth,
-                                           monc->m_auth->front.iov_base,
-                                           monc->m_auth->front_alloc_len);
-               __send_prepared_auth_request(monc, ret);
+       BUG_ON(monc->monmap->num_mon < 1);
+
+       if (monc->monmap->num_mon == 1) {
+               monc->cur_mon = 0;
        } else {
-               dout("open_session mon%d already open\n", monc->cur_mon);
+               int max = monc->monmap->num_mon;
+               int o = -1;
+               int n;
+
+               if (monc->cur_mon >= 0) {
+                       if (monc->cur_mon < monc->monmap->num_mon)
+                               o = monc->cur_mon;
+                       if (o >= 0)
+                               max--;
+               }
+
+               n = prandom_u32() % max;
+               if (o >= 0 && n >= o)
+                       n++;
+
+               monc->cur_mon = n;
        }
-       return 0;
+
+       dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
+            monc->cur_mon, monc->monmap->num_mon);
 }
 
-static bool __sub_expired(struct ceph_mon_client *monc)
+/*
+ * Open a session with a new monitor.
+ */
+static void __open_session(struct ceph_mon_client *monc)
 {
-       return time_after_eq(jiffies, monc->sub_renew_after);
+       int ret;
+
+       pick_new_mon(monc);
+
+       if (monc->had_a_connection) {
+               monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
+               if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
+                       monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
+       }
+
+       monc->sub_renew_after = jiffies; /* i.e., expired */
+       monc->sub_renew_sent = 0;
+
+       dout("%s opening mon%d\n", __func__, monc->cur_mon);
+       ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
+                     &monc->monmap->mon_inst[monc->cur_mon].addr);
+
+       /*
+        * send an initial keepalive to ensure our timestamp is valid
+        * by the time we are in an OPENED state
+        */
+       ceph_con_keepalive(&monc->con);
+
+       /* initiate authentication handshake */
+       ret = ceph_auth_build_hello(monc->auth,
+                                   monc->m_auth->front.iov_base,
+                                   monc->m_auth->front_alloc_len);
+       BUG_ON(ret <= 0);
+       __send_prepared_auth_request(monc, ret);
 }
 
 /*
@@ -174,74 +203,70 @@ static bool __sub_expired(struct ceph_mon_client *monc)
  */
 static void __schedule_delayed(struct ceph_mon_client *monc)
 {
-       struct ceph_options *opt = monc->client->options;
        unsigned long delay;
 
-       if (monc->cur_mon < 0 || __sub_expired(monc)) {
-               delay = 10 * HZ;
-       } else {
-               delay = 20 * HZ;
-               if (opt->monc_ping_timeout > 0)
-                       delay = min(delay, opt->monc_ping_timeout / 3);
-       }
+       if (monc->hunting)
+               delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
+       else
+               delay = CEPH_MONC_PING_INTERVAL;
+
        dout("__schedule_delayed after %lu\n", delay);
        schedule_delayed_work(&monc->delayed_work,
                              round_jiffies_relative(delay));
 }
 
+const char *ceph_sub_str[] = {
+       [CEPH_SUB_MDSMAP] = "mdsmap",
+       [CEPH_SUB_MONMAP] = "monmap",
+       [CEPH_SUB_OSDMAP] = "osdmap",
+};
+
 /*
- * Send subscribe request for mdsmap and/or osdmap.
+ * Send subscribe request for one or more maps, according to
+ * monc->subs.
  */
 static void __send_subscribe(struct ceph_mon_client *monc)
 {
-       dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
-            (unsigned int)monc->sub_sent, __sub_expired(monc),
-            monc->want_next_osdmap);
-       if ((__sub_expired(monc) && !monc->sub_sent) ||
-           monc->want_next_osdmap == 1) {
-               struct ceph_msg *msg = monc->m_subscribe;
-               struct ceph_mon_subscribe_item *i;
-               void *p, *end;
-               int num;
-
-               p = msg->front.iov_base;
-               end = p + msg->front_alloc_len;
-
-               num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
-               ceph_encode_32(&p, num);
-
-               if (monc->want_next_osdmap) {
-                       dout("__send_subscribe to 'osdmap' %u\n",
-                            (unsigned int)monc->have_osdmap);
-                       ceph_encode_string(&p, end, "osdmap", 6);
-                       i = p;
-                       i->have = cpu_to_le64(monc->have_osdmap);
-                       i->onetime = 1;
-                       p += sizeof(*i);
-                       monc->want_next_osdmap = 2;  /* requested */
-               }
-               if (monc->want_mdsmap) {
-                       dout("__send_subscribe to 'mdsmap' %u+\n",
-                            (unsigned int)monc->have_mdsmap);
-                       ceph_encode_string(&p, end, "mdsmap", 6);
-                       i = p;
-                       i->have = cpu_to_le64(monc->have_mdsmap);
-                       i->onetime = 0;
-                       p += sizeof(*i);
-               }
-               ceph_encode_string(&p, end, "monmap", 6);
-               i = p;
-               i->have = 0;
-               i->onetime = 0;
-               p += sizeof(*i);
-
-               msg->front.iov_len = p - msg->front.iov_base;
-               msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-               ceph_msg_revoke(msg);
-               ceph_con_send(&monc->con, ceph_msg_get(msg));
-
-               monc->sub_sent = jiffies | 1;  /* never 0 */
+       struct ceph_msg *msg = monc->m_subscribe;
+       void *p = msg->front.iov_base;
+       void *const end = p + msg->front_alloc_len;
+       int num = 0;
+       int i;
+
+       dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
+
+       BUG_ON(monc->cur_mon < 0);
+
+       if (!monc->sub_renew_sent)
+               monc->sub_renew_sent = jiffies | 1; /* never 0 */
+
+       msg->hdr.version = cpu_to_le16(2);
+
+       for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+               if (monc->subs[i].want)
+                       num++;
+       }
+       BUG_ON(num < 1); /* monmap sub is always there */
+       ceph_encode_32(&p, num);
+       for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+               const char *s = ceph_sub_str[i];
+
+               if (!monc->subs[i].want)
+                       continue;
+
+               dout("%s %s start %llu flags 0x%x\n", __func__, s,
+                    le64_to_cpu(monc->subs[i].item.start),
+                    monc->subs[i].item.flags);
+               ceph_encode_string(&p, end, s, strlen(s));
+               memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
+               p += sizeof(monc->subs[i].item);
        }
+
+       BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
+       msg->front.iov_len = p - msg->front.iov_base;
+       msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+       ceph_msg_revoke(msg);
+       ceph_con_send(&monc->con, ceph_msg_get(msg));
 }
 
 static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -255,15 +280,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
        seconds = le32_to_cpu(h->duration);
 
        mutex_lock(&monc->mutex);
-       if (monc->hunting) {
-               pr_info("mon%d %s session established\n",
-                       monc->cur_mon,
-                       ceph_pr_addr(&monc->con.peer_addr.in_addr));
-               monc->hunting = false;
+       if (monc->sub_renew_sent) {
+               monc->sub_renew_after = monc->sub_renew_sent +
+                                           (seconds >> 1) * HZ - 1;
+               dout("%s sent %lu duration %d renew after %lu\n", __func__,
+                    monc->sub_renew_sent, seconds, monc->sub_renew_after);
+               monc->sub_renew_sent = 0;
+       } else {
+               dout("%s sent %lu renew after %lu, ignoring\n", __func__,
+                    monc->sub_renew_sent, monc->sub_renew_after);
        }
-       dout("handle_subscribe_ack after %d seconds\n", seconds);
-       monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
-       monc->sub_sent = 0;
        mutex_unlock(&monc->mutex);
        return;
 bad:
@@ -272,36 +298,82 @@ bad:
 }
 
 /*
- * Keep track of which maps we have
+ * Register interest in a map
+ *
+ * @sub: one of CEPH_SUB_*
+ * @epoch: X for "every map since X", or 0 for "just the latest"
  */
-int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
+static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
+                                u32 epoch, bool continuous)
+{
+       __le64 start = cpu_to_le64(epoch);
+       u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
+
+       dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
+            epoch, continuous);
+
+       if (monc->subs[sub].want &&
+           monc->subs[sub].item.start == start &&
+           monc->subs[sub].item.flags == flags)
+               return false;
+
+       monc->subs[sub].item.start = start;
+       monc->subs[sub].item.flags = flags;
+       monc->subs[sub].want = true;
+
+       return true;
+}
+
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+                       bool continuous)
 {
+       bool need_request;
+
        mutex_lock(&monc->mutex);
-       monc->have_mdsmap = got;
+       need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
        mutex_unlock(&monc->mutex);
-       return 0;
+
+       return need_request;
+}
+EXPORT_SYMBOL(ceph_monc_want_map);
+
+/*
+ * Keep track of which maps we have
+ *
+ * @sub: one of CEPH_SUB_*
+ */
+static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
+                               u32 epoch)
+{
+       dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
+
+       if (monc->subs[sub].want) {
+               if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
+                       monc->subs[sub].want = false;
+               else
+                       monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
+       }
+
+       monc->subs[sub].have = epoch;
 }
-EXPORT_SYMBOL(ceph_monc_got_mdsmap);
 
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 {
        mutex_lock(&monc->mutex);
-       monc->have_osdmap = got;
-       monc->want_next_osdmap = 0;
+       __ceph_monc_got_map(monc, sub, epoch);
        mutex_unlock(&monc->mutex);
-       return 0;
 }
+EXPORT_SYMBOL(ceph_monc_got_map);
 
 /*
  * Register interest in the next osdmap
  */
 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 {
-       dout("request_next_osdmap have %u\n", monc->have_osdmap);
+       dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
        mutex_lock(&monc->mutex);
-       if (!monc->want_next_osdmap)
-               monc->want_next_osdmap = 1;
-       if (monc->want_next_osdmap < 2)
+       if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
+                                monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
                __send_subscribe(monc);
        mutex_unlock(&monc->mutex);
 }
@@ -320,15 +392,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
        long ret;
 
        mutex_lock(&monc->mutex);
-       while (monc->have_osdmap < epoch) {
+       while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
                mutex_unlock(&monc->mutex);
 
                if (timeout && time_after_eq(jiffies, started + timeout))
                        return -ETIMEDOUT;
 
                ret = wait_event_interruptible_timeout(monc->client->auth_wq,
-                                               monc->have_osdmap >= epoch,
-                                               ceph_timeout_jiffies(timeout));
+                                    monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
+                                    ceph_timeout_jiffies(timeout));
                if (ret < 0)
                        return ret;
 
@@ -341,11 +413,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 
 /*
- *
+ * Open a session with a random monitor.  Request monmap and osdmap,
+ * which are waited upon in __ceph_open_session().
  */
 int ceph_monc_open_session(struct ceph_mon_client *monc)
 {
        mutex_lock(&monc->mutex);
+       __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
+       __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
        __open_session(monc);
        __schedule_delayed(monc);
        mutex_unlock(&monc->mutex);
@@ -353,29 +428,15 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)
 }
 EXPORT_SYMBOL(ceph_monc_open_session);
 
-/*
- * We require the fsid and global_id in order to initialize our
- * debugfs dir.
- */
-static bool have_debugfs_info(struct ceph_mon_client *monc)
-{
-       dout("have_debugfs_info fsid %d globalid %lld\n",
-            (int)monc->client->have_fsid, monc->auth->global_id);
-       return monc->client->have_fsid && monc->auth->global_id > 0;
-}
-
 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
                                 struct ceph_msg *msg)
 {
        struct ceph_client *client = monc->client;
        struct ceph_monmap *monmap = NULL, *old = monc->monmap;
        void *p, *end;
-       int had_debugfs_info, init_debugfs = 0;
 
        mutex_lock(&monc->mutex);
 
-       had_debugfs_info = have_debugfs_info(monc);
-
        dout("handle_monmap\n");
        p = msg->front.iov_base;
        end = p + msg->front.iov_len;
@@ -395,29 +456,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
        client->monc.monmap = monmap;
        kfree(old);
 
-       if (!client->have_fsid) {
-               client->have_fsid = true;
-               if (!had_debugfs_info && have_debugfs_info(monc)) {
-                       pr_info("client%lld fsid %pU\n",
-                               ceph_client_id(monc->client),
-                               &monc->client->fsid);
-                       init_debugfs = 1;
-               }
-               mutex_unlock(&monc->mutex);
-
-               if (init_debugfs) {
-                       /*
-                        * do debugfs initialization without mutex to avoid
-                        * creating a locking dependency
-                        */
-                       ceph_debugfs_client_init(monc->client);
-               }
+       __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
+       client->have_fsid = true;
 
-               goto out_unlocked;
-       }
 out:
        mutex_unlock(&monc->mutex);
-out_unlocked:
        wake_up_all(&client->auth_wq);
 }
 
@@ -748,10 +791,9 @@ static void delayed_work(struct work_struct *work)
                __close_session(monc);
                __open_session(monc);  /* continue hunting */
        } else {
-               struct ceph_options *opt = monc->client->options;
                int is_auth = ceph_auth_is_authenticated(monc->auth);
                if (ceph_con_keepalive_expired(&monc->con,
-                                              opt->monc_ping_timeout)) {
+                                              CEPH_MONC_PING_TIMEOUT)) {
                        dout("monc keepalive timeout\n");
                        is_auth = 0;
                        __close_session(monc);
@@ -764,8 +806,14 @@ static void delayed_work(struct work_struct *work)
                        __validate_auth(monc);
                }
 
-               if (is_auth)
-                       __send_subscribe(monc);
+               if (is_auth) {
+                       unsigned long now = jiffies;
+
+                       dout("%s renew subs? now %lu renew after %lu\n",
+                            __func__, now, monc->sub_renew_after);
+                       if (time_after_eq(now, monc->sub_renew_after))
+                               __send_subscribe(monc);
+               }
        }
        __schedule_delayed(monc);
        mutex_unlock(&monc->mutex);
@@ -854,16 +902,15 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
        monc->cur_mon = -1;
        monc->hunting = true;
        monc->sub_renew_after = jiffies;
-       monc->sub_sent = 0;
+       monc->sub_renew_sent = 0;
+       monc->had_a_connection = false;
+       monc->hunt_mult = 1;
 
        INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
        monc->generic_request_tree = RB_ROOT;
        monc->num_generic_requests = 0;
        monc->last_tid = 0;
 
-       monc->have_mdsmap = 0;
-       monc->have_osdmap = 0;
-       monc->want_next_osdmap = 1;
        return 0;
 
 out_auth_reply:
@@ -888,7 +935,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
 
        mutex_lock(&monc->mutex);
        __close_session(monc);
-
+       monc->cur_mon = -1;
        mutex_unlock(&monc->mutex);
 
        /*
@@ -910,26 +957,40 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
 }
 EXPORT_SYMBOL(ceph_monc_stop);
 
+static void finish_hunting(struct ceph_mon_client *monc)
+{
+       if (monc->hunting) {
+               dout("%s found mon%d\n", __func__, monc->cur_mon);
+               monc->hunting = false;
+               monc->had_a_connection = true;
+               monc->hunt_mult /= 2; /* reduce by 50% */
+               if (monc->hunt_mult < 1)
+                       monc->hunt_mult = 1;
+       }
+}
+
 static void handle_auth_reply(struct ceph_mon_client *monc,
                              struct ceph_msg *msg)
 {
        int ret;
        int was_auth = 0;
-       int had_debugfs_info, init_debugfs = 0;
 
        mutex_lock(&monc->mutex);
-       had_debugfs_info = have_debugfs_info(monc);
        was_auth = ceph_auth_is_authenticated(monc->auth);
        monc->pending_auth = 0;
        ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
                                     msg->front.iov_len,
                                     monc->m_auth->front.iov_base,
                                     monc->m_auth->front_alloc_len);
+       if (ret > 0) {
+               __send_prepared_auth_request(monc, ret);
+               goto out;
+       }
+
+       finish_hunting(monc);
+
        if (ret < 0) {
                monc->client->auth_err = ret;
-               wake_up_all(&monc->client->auth_wq);
-       } else if (ret > 0) {
-               __send_prepared_auth_request(monc, ret);
        } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
                dout("authenticated, starting session\n");
 
@@ -939,23 +1000,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
 
                __send_subscribe(monc);
                __resend_generic_request(monc);
-       }
 
-       if (!had_debugfs_info && have_debugfs_info(monc)) {
-               pr_info("client%lld fsid %pU\n",
-                       ceph_client_id(monc->client),
-                       &monc->client->fsid);
-               init_debugfs = 1;
+               pr_info("mon%d %s session established\n", monc->cur_mon,
+                       ceph_pr_addr(&monc->con.peer_addr.in_addr));
        }
-       mutex_unlock(&monc->mutex);
 
-       if (init_debugfs) {
-               /*
-                * do debugfs initialization without mutex to avoid
-                * creating a locking dependency
-                */
-               ceph_debugfs_client_init(monc->client);
-       }
+out:
+       mutex_unlock(&monc->mutex);
+       if (monc->client->auth_err < 0)
+               wake_up_all(&monc->client->auth_wq);
 }
 
 static int __validate_auth(struct ceph_mon_client *monc)