libceph: pick a different monitor when reconnecting
[linux-2.6-block.git] / net / ceph / mon_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/types.h>
5 #include <linux/slab.h>
6 #include <linux/random.h>
7 #include <linux/sched.h>
8
9 #include <linux/ceph/mon_client.h>
10 #include <linux/ceph/libceph.h>
11 #include <linux/ceph/debugfs.h>
12 #include <linux/ceph/decode.h>
13 #include <linux/ceph/auth.h>
14
15 /*
16  * Interact with Ceph monitor cluster.  Handle requests for new map
17  * versions, and periodically resend as needed.  Also implement
18  * statfs() and umount().
19  *
20  * A small cluster of Ceph "monitors" are responsible for managing critical
21  * cluster configuration and state information.  An odd number (e.g., 3, 5)
22  * of cmon daemons use a modified version of the Paxos part-time parliament
23  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
24  * list of clients who have mounted the file system.
25  *
26  * We maintain an open, active session with a monitor at all times in order to
27  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
28  * TCP socket to ensure we detect a failure.  If the connection does break, we
29  * randomly hunt for a new monitor.  Once the connection is reestablished, we
30  * resend any outstanding requests.
31  */
32
33 static const struct ceph_connection_operations mon_con_ops;
34
35 static int __validate_auth(struct ceph_mon_client *monc);
36
37 /*
38  * Decode a monmap blob (e.g., during mount).
39  */
40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
41 {
42         struct ceph_monmap *m = NULL;
43         int i, err = -EINVAL;
44         struct ceph_fsid fsid;
45         u32 epoch, num_mon;
46         u16 version;
47         u32 len;
48
49         ceph_decode_32_safe(&p, end, len, bad);
50         ceph_decode_need(&p, end, len, bad);
51
52         dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
53
54         ceph_decode_16_safe(&p, end, version, bad);
55
56         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
57         ceph_decode_copy(&p, &fsid, sizeof(fsid));
58         epoch = ceph_decode_32(&p);
59
60         num_mon = ceph_decode_32(&p);
61         ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
62
63         if (num_mon >= CEPH_MAX_MON)
64                 goto bad;
65         m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
66         if (m == NULL)
67                 return ERR_PTR(-ENOMEM);
68         m->fsid = fsid;
69         m->epoch = epoch;
70         m->num_mon = num_mon;
71         ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
72         for (i = 0; i < num_mon; i++)
73                 ceph_decode_addr(&m->mon_inst[i].addr);
74
75         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
76              m->num_mon);
77         for (i = 0; i < m->num_mon; i++)
78                 dout("monmap_decode  mon%d is %s\n", i,
79                      ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
80         return m;
81
82 bad:
83         dout("monmap_decode failed with %d\n", err);
84         kfree(m);
85         return ERR_PTR(err);
86 }
87
88 /*
89  * return true if *addr is included in the monmap.
90  */
91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
92 {
93         int i;
94
95         for (i = 0; i < m->num_mon; i++)
96                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
97                         return 1;
98         return 0;
99 }
100
101 /*
102  * Send an auth request.
103  */
104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
105 {
106         monc->pending_auth = 1;
107         monc->m_auth->front.iov_len = len;
108         monc->m_auth->hdr.front_len = cpu_to_le32(len);
109         ceph_msg_revoke(monc->m_auth);
110         ceph_msg_get(monc->m_auth);  /* keep our ref */
111         ceph_con_send(&monc->con, monc->m_auth);
112 }
113
114 /*
115  * Close monitor session, if any.
116  */
117 static void __close_session(struct ceph_mon_client *monc)
118 {
119         dout("__close_session closing mon%d\n", monc->cur_mon);
120         ceph_msg_revoke(monc->m_auth);
121         ceph_msg_revoke_incoming(monc->m_auth_reply);
122         ceph_msg_revoke(monc->m_subscribe);
123         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
124         ceph_con_close(&monc->con);
125
126         monc->pending_auth = 0;
127         ceph_auth_reset(monc->auth);
128 }
129
130 /*
131  * Pick a new monitor at random and set cur_mon.  If we are repicking
132  * (i.e. cur_mon is already set), be sure to pick a different one.
133  */
134 static void pick_new_mon(struct ceph_mon_client *monc)
135 {
136         int old_mon = monc->cur_mon;
137
138         BUG_ON(monc->monmap->num_mon < 1);
139
140         if (monc->monmap->num_mon == 1) {
141                 monc->cur_mon = 0;
142         } else {
143                 int max = monc->monmap->num_mon;
144                 int o = -1;
145                 int n;
146
147                 if (monc->cur_mon >= 0) {
148                         if (monc->cur_mon < monc->monmap->num_mon)
149                                 o = monc->cur_mon;
150                         if (o >= 0)
151                                 max--;
152                 }
153
154                 n = prandom_u32() % max;
155                 if (o >= 0 && n >= o)
156                         n++;
157
158                 monc->cur_mon = n;
159         }
160
161         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
162              monc->cur_mon, monc->monmap->num_mon);
163 }
164
165 /*
166  * Open a session with a new monitor.
167  */
168 static void __open_session(struct ceph_mon_client *monc)
169 {
170         int ret;
171
172         pick_new_mon(monc);
173
174         monc->sub_renew_after = jiffies; /* i.e., expired */
175         monc->sub_renew_sent = 0;
176
177         dout("%s opening mon%d\n", __func__, monc->cur_mon);
178         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
179                       &monc->monmap->mon_inst[monc->cur_mon].addr);
180
181         /*
182          * send an initial keepalive to ensure our timestamp is valid
183          * by the time we are in an OPENED state
184          */
185         ceph_con_keepalive(&monc->con);
186
187         /* initiate authentication handshake */
188         ret = ceph_auth_build_hello(monc->auth,
189                                     monc->m_auth->front.iov_base,
190                                     monc->m_auth->front_alloc_len);
191         BUG_ON(ret <= 0);
192         __send_prepared_auth_request(monc, ret);
193 }
194
195 static bool __sub_expired(struct ceph_mon_client *monc)
196 {
197         return time_after_eq(jiffies, monc->sub_renew_after);
198 }
199
200 /*
201  * Reschedule delayed work timer.
202  */
203 static void __schedule_delayed(struct ceph_mon_client *monc)
204 {
205         struct ceph_options *opt = monc->client->options;
206         unsigned long delay;
207
208         if (monc->cur_mon < 0 || __sub_expired(monc)) {
209                 delay = 10 * HZ;
210         } else {
211                 delay = 20 * HZ;
212                 if (opt->monc_ping_timeout > 0)
213                         delay = min(delay, opt->monc_ping_timeout / 3);
214         }
215         dout("__schedule_delayed after %lu\n", delay);
216         schedule_delayed_work(&monc->delayed_work,
217                               round_jiffies_relative(delay));
218 }
219
220 const char *ceph_sub_str[] = {
221         [CEPH_SUB_MDSMAP] = "mdsmap",
222         [CEPH_SUB_MONMAP] = "monmap",
223         [CEPH_SUB_OSDMAP] = "osdmap",
224 };
225
226 /*
227  * Send subscribe request for one or more maps, according to
228  * monc->subs.
229  */
230 static void __send_subscribe(struct ceph_mon_client *monc)
231 {
232         struct ceph_msg *msg = monc->m_subscribe;
233         void *p = msg->front.iov_base;
234         void *const end = p + msg->front_alloc_len;
235         int num = 0;
236         int i;
237
238         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
239
240         BUG_ON(monc->cur_mon < 0);
241
242         if (!monc->sub_renew_sent)
243                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
244
245         msg->hdr.version = cpu_to_le16(2);
246
247         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
248                 if (monc->subs[i].want)
249                         num++;
250         }
251         BUG_ON(num < 1); /* monmap sub is always there */
252         ceph_encode_32(&p, num);
253         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
254                 const char *s = ceph_sub_str[i];
255
256                 if (!monc->subs[i].want)
257                         continue;
258
259                 dout("%s %s start %llu flags 0x%x\n", __func__, s,
260                      le64_to_cpu(monc->subs[i].item.start),
261                      monc->subs[i].item.flags);
262                 ceph_encode_string(&p, end, s, strlen(s));
263                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
264                 p += sizeof(monc->subs[i].item);
265         }
266
267         BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
268         msg->front.iov_len = p - msg->front.iov_base;
269         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
270         ceph_msg_revoke(msg);
271         ceph_con_send(&monc->con, ceph_msg_get(msg));
272 }
273
274 static void handle_subscribe_ack(struct ceph_mon_client *monc,
275                                  struct ceph_msg *msg)
276 {
277         unsigned int seconds;
278         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
279
280         if (msg->front.iov_len < sizeof(*h))
281                 goto bad;
282         seconds = le32_to_cpu(h->duration);
283
284         mutex_lock(&monc->mutex);
285         if (monc->sub_renew_sent) {
286                 monc->sub_renew_after = monc->sub_renew_sent +
287                                             (seconds >> 1) * HZ - 1;
288                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
289                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
290                 monc->sub_renew_sent = 0;
291         } else {
292                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
293                      monc->sub_renew_sent, monc->sub_renew_after);
294         }
295         mutex_unlock(&monc->mutex);
296         return;
297 bad:
298         pr_err("got corrupt subscribe-ack msg\n");
299         ceph_msg_dump(msg);
300 }
301
302 /*
303  * Register interest in a map
304  *
305  * @sub: one of CEPH_SUB_*
306  * @epoch: X for "every map since X", or 0 for "just the latest"
307  */
308 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
309                                  u32 epoch, bool continuous)
310 {
311         __le64 start = cpu_to_le64(epoch);
312         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
313
314         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
315              epoch, continuous);
316
317         if (monc->subs[sub].want &&
318             monc->subs[sub].item.start == start &&
319             monc->subs[sub].item.flags == flags)
320                 return false;
321
322         monc->subs[sub].item.start = start;
323         monc->subs[sub].item.flags = flags;
324         monc->subs[sub].want = true;
325
326         return true;
327 }
328
329 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
330                         bool continuous)
331 {
332         bool need_request;
333
334         mutex_lock(&monc->mutex);
335         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
336         mutex_unlock(&monc->mutex);
337
338         return need_request;
339 }
340 EXPORT_SYMBOL(ceph_monc_want_map);
341
342 /*
343  * Keep track of which maps we have
344  *
345  * @sub: one of CEPH_SUB_*
346  */
347 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
348                                 u32 epoch)
349 {
350         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
351
352         if (monc->subs[sub].want) {
353                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
354                         monc->subs[sub].want = false;
355                 else
356                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
357         }
358
359         monc->subs[sub].have = epoch;
360 }
361
362 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
363 {
364         mutex_lock(&monc->mutex);
365         __ceph_monc_got_map(monc, sub, epoch);
366         mutex_unlock(&monc->mutex);
367 }
368 EXPORT_SYMBOL(ceph_monc_got_map);
369
370 /*
371  * Register interest in the next osdmap
372  */
373 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
374 {
375         dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
376         mutex_lock(&monc->mutex);
377         if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
378                                  monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
379                 __send_subscribe(monc);
380         mutex_unlock(&monc->mutex);
381 }
382 EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
383
384 /*
385  * Wait for an osdmap with a given epoch.
386  *
387  * @epoch: epoch to wait for
388  * @timeout: in jiffies, 0 means "wait forever"
389  */
390 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
391                           unsigned long timeout)
392 {
393         unsigned long started = jiffies;
394         long ret;
395
396         mutex_lock(&monc->mutex);
397         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
398                 mutex_unlock(&monc->mutex);
399
400                 if (timeout && time_after_eq(jiffies, started + timeout))
401                         return -ETIMEDOUT;
402
403                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
404                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
405                                      ceph_timeout_jiffies(timeout));
406                 if (ret < 0)
407                         return ret;
408
409                 mutex_lock(&monc->mutex);
410         }
411
412         mutex_unlock(&monc->mutex);
413         return 0;
414 }
415 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
416
417 /*
418  * Open a session with a random monitor.  Request monmap and osdmap,
419  * which are waited upon in __ceph_open_session().
420  */
421 int ceph_monc_open_session(struct ceph_mon_client *monc)
422 {
423         mutex_lock(&monc->mutex);
424         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
425         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
426         __open_session(monc);
427         __schedule_delayed(monc);
428         mutex_unlock(&monc->mutex);
429         return 0;
430 }
431 EXPORT_SYMBOL(ceph_monc_open_session);
432
433 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
434                                  struct ceph_msg *msg)
435 {
436         struct ceph_client *client = monc->client;
437         struct ceph_monmap *monmap = NULL, *old = monc->monmap;
438         void *p, *end;
439
440         mutex_lock(&monc->mutex);
441
442         dout("handle_monmap\n");
443         p = msg->front.iov_base;
444         end = p + msg->front.iov_len;
445
446         monmap = ceph_monmap_decode(p, end);
447         if (IS_ERR(monmap)) {
448                 pr_err("problem decoding monmap, %d\n",
449                        (int)PTR_ERR(monmap));
450                 goto out;
451         }
452
453         if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
454                 kfree(monmap);
455                 goto out;
456         }
457
458         client->monc.monmap = monmap;
459         kfree(old);
460
461         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
462         client->have_fsid = true;
463
464 out:
465         mutex_unlock(&monc->mutex);
466         wake_up_all(&client->auth_wq);
467 }
468
469 /*
470  * generic requests (currently statfs, mon_get_version)
471  */
472 static struct ceph_mon_generic_request *__lookup_generic_req(
473         struct ceph_mon_client *monc, u64 tid)
474 {
475         struct ceph_mon_generic_request *req;
476         struct rb_node *n = monc->generic_request_tree.rb_node;
477
478         while (n) {
479                 req = rb_entry(n, struct ceph_mon_generic_request, node);
480                 if (tid < req->tid)
481                         n = n->rb_left;
482                 else if (tid > req->tid)
483                         n = n->rb_right;
484                 else
485                         return req;
486         }
487         return NULL;
488 }
489
490 static void __insert_generic_request(struct ceph_mon_client *monc,
491                             struct ceph_mon_generic_request *new)
492 {
493         struct rb_node **p = &monc->generic_request_tree.rb_node;
494         struct rb_node *parent = NULL;
495         struct ceph_mon_generic_request *req = NULL;
496
497         while (*p) {
498                 parent = *p;
499                 req = rb_entry(parent, struct ceph_mon_generic_request, node);
500                 if (new->tid < req->tid)
501                         p = &(*p)->rb_left;
502                 else if (new->tid > req->tid)
503                         p = &(*p)->rb_right;
504                 else
505                         BUG();
506         }
507
508         rb_link_node(&new->node, parent, p);
509         rb_insert_color(&new->node, &monc->generic_request_tree);
510 }
511
512 static void release_generic_request(struct kref *kref)
513 {
514         struct ceph_mon_generic_request *req =
515                 container_of(kref, struct ceph_mon_generic_request, kref);
516
517         if (req->reply)
518                 ceph_msg_put(req->reply);
519         if (req->request)
520                 ceph_msg_put(req->request);
521
522         kfree(req);
523 }
524
525 static void put_generic_request(struct ceph_mon_generic_request *req)
526 {
527         kref_put(&req->kref, release_generic_request);
528 }
529
530 static void get_generic_request(struct ceph_mon_generic_request *req)
531 {
532         kref_get(&req->kref);
533 }
534
535 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
536                                          struct ceph_msg_header *hdr,
537                                          int *skip)
538 {
539         struct ceph_mon_client *monc = con->private;
540         struct ceph_mon_generic_request *req;
541         u64 tid = le64_to_cpu(hdr->tid);
542         struct ceph_msg *m;
543
544         mutex_lock(&monc->mutex);
545         req = __lookup_generic_req(monc, tid);
546         if (!req) {
547                 dout("get_generic_reply %lld dne\n", tid);
548                 *skip = 1;
549                 m = NULL;
550         } else {
551                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
552                 *skip = 0;
553                 m = ceph_msg_get(req->reply);
554                 /*
555                  * we don't need to track the connection reading into
556                  * this reply because we only have one open connection
557                  * at a time, ever.
558                  */
559         }
560         mutex_unlock(&monc->mutex);
561         return m;
562 }
563
564 static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
565                                 struct ceph_mon_generic_request *req)
566 {
567         int err;
568
569         /* register request */
570         req->tid = tid != 0 ? tid : ++monc->last_tid;
571         req->request->hdr.tid = cpu_to_le64(req->tid);
572         __insert_generic_request(monc, req);
573         monc->num_generic_requests++;
574         ceph_con_send(&monc->con, ceph_msg_get(req->request));
575         mutex_unlock(&monc->mutex);
576
577         err = wait_for_completion_interruptible(&req->completion);
578
579         mutex_lock(&monc->mutex);
580         rb_erase(&req->node, &monc->generic_request_tree);
581         monc->num_generic_requests--;
582
583         if (!err)
584                 err = req->result;
585         return err;
586 }
587
588 static int do_generic_request(struct ceph_mon_client *monc,
589                               struct ceph_mon_generic_request *req)
590 {
591         int err;
592
593         mutex_lock(&monc->mutex);
594         err = __do_generic_request(monc, 0, req);
595         mutex_unlock(&monc->mutex);
596
597         return err;
598 }
599
600 /*
601  * statfs
602  */
603 static void handle_statfs_reply(struct ceph_mon_client *monc,
604                                 struct ceph_msg *msg)
605 {
606         struct ceph_mon_generic_request *req;
607         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
608         u64 tid = le64_to_cpu(msg->hdr.tid);
609
610         if (msg->front.iov_len != sizeof(*reply))
611                 goto bad;
612         dout("handle_statfs_reply %p tid %llu\n", msg, tid);
613
614         mutex_lock(&monc->mutex);
615         req = __lookup_generic_req(monc, tid);
616         if (req) {
617                 *(struct ceph_statfs *)req->buf = reply->st;
618                 req->result = 0;
619                 get_generic_request(req);
620         }
621         mutex_unlock(&monc->mutex);
622         if (req) {
623                 complete_all(&req->completion);
624                 put_generic_request(req);
625         }
626         return;
627
628 bad:
629         pr_err("corrupt statfs reply, tid %llu\n", tid);
630         ceph_msg_dump(msg);
631 }
632
633 /*
634  * Do a synchronous statfs().
635  */
636 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
637 {
638         struct ceph_mon_generic_request *req;
639         struct ceph_mon_statfs *h;
640         int err;
641
642         req = kzalloc(sizeof(*req), GFP_NOFS);
643         if (!req)
644                 return -ENOMEM;
645
646         kref_init(&req->kref);
647         req->buf = buf;
648         init_completion(&req->completion);
649
650         err = -ENOMEM;
651         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
652                                     true);
653         if (!req->request)
654                 goto out;
655         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
656                                   true);
657         if (!req->reply)
658                 goto out;
659
660         /* fill out request */
661         h = req->request->front.iov_base;
662         h->monhdr.have_version = 0;
663         h->monhdr.session_mon = cpu_to_le16(-1);
664         h->monhdr.session_mon_tid = 0;
665         h->fsid = monc->monmap->fsid;
666
667         err = do_generic_request(monc, req);
668
669 out:
670         put_generic_request(req);
671         return err;
672 }
673 EXPORT_SYMBOL(ceph_monc_do_statfs);
674
675 static void handle_get_version_reply(struct ceph_mon_client *monc,
676                                      struct ceph_msg *msg)
677 {
678         struct ceph_mon_generic_request *req;
679         u64 tid = le64_to_cpu(msg->hdr.tid);
680         void *p = msg->front.iov_base;
681         void *end = p + msg->front_alloc_len;
682         u64 handle;
683
684         dout("%s %p tid %llu\n", __func__, msg, tid);
685
686         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
687         handle = ceph_decode_64(&p);
688         if (tid != 0 && tid != handle)
689                 goto bad;
690
691         mutex_lock(&monc->mutex);
692         req = __lookup_generic_req(monc, handle);
693         if (req) {
694                 *(u64 *)req->buf = ceph_decode_64(&p);
695                 req->result = 0;
696                 get_generic_request(req);
697         }
698         mutex_unlock(&monc->mutex);
699         if (req) {
700                 complete_all(&req->completion);
701                 put_generic_request(req);
702         }
703
704         return;
705 bad:
706         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
707         ceph_msg_dump(msg);
708 }
709
710 /*
711  * Send MMonGetVersion and wait for the reply.
712  *
713  * @what: one of "mdsmap", "osdmap" or "monmap"
714  */
715 int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
716                              u64 *newest)
717 {
718         struct ceph_mon_generic_request *req;
719         void *p, *end;
720         u64 tid;
721         int err;
722
723         req = kzalloc(sizeof(*req), GFP_NOFS);
724         if (!req)
725                 return -ENOMEM;
726
727         kref_init(&req->kref);
728         req->buf = newest;
729         init_completion(&req->completion);
730
731         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
732                                     sizeof(u64) + sizeof(u32) + strlen(what),
733                                     GFP_NOFS, true);
734         if (!req->request) {
735                 err = -ENOMEM;
736                 goto out;
737         }
738
739         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
740                                   GFP_NOFS, true);
741         if (!req->reply) {
742                 err = -ENOMEM;
743                 goto out;
744         }
745
746         p = req->request->front.iov_base;
747         end = p + req->request->front_alloc_len;
748
749         /* fill out request */
750         mutex_lock(&monc->mutex);
751         tid = ++monc->last_tid;
752         ceph_encode_64(&p, tid); /* handle */
753         ceph_encode_string(&p, end, what, strlen(what));
754
755         err = __do_generic_request(monc, tid, req);
756
757         mutex_unlock(&monc->mutex);
758 out:
759         put_generic_request(req);
760         return err;
761 }
762 EXPORT_SYMBOL(ceph_monc_do_get_version);
763
764 /*
765  * Resend pending generic requests.
766  */
767 static void __resend_generic_request(struct ceph_mon_client *monc)
768 {
769         struct ceph_mon_generic_request *req;
770         struct rb_node *p;
771
772         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
773                 req = rb_entry(p, struct ceph_mon_generic_request, node);
774                 ceph_msg_revoke(req->request);
775                 ceph_msg_revoke_incoming(req->reply);
776                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
777         }
778 }
779
780 /*
781  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
782  * renew/retry subscription as needed (in case it is timing out, or we
783  * got an ENOMEM).  And keep the monitor connection alive.
784  */
785 static void delayed_work(struct work_struct *work)
786 {
787         struct ceph_mon_client *monc =
788                 container_of(work, struct ceph_mon_client, delayed_work.work);
789
790         dout("monc delayed_work\n");
791         mutex_lock(&monc->mutex);
792         if (monc->hunting) {
793                 __close_session(monc);
794                 __open_session(monc);  /* continue hunting */
795         } else {
796                 struct ceph_options *opt = monc->client->options;
797                 int is_auth = ceph_auth_is_authenticated(monc->auth);
798                 if (ceph_con_keepalive_expired(&monc->con,
799                                                opt->monc_ping_timeout)) {
800                         dout("monc keepalive timeout\n");
801                         is_auth = 0;
802                         __close_session(monc);
803                         monc->hunting = true;
804                         __open_session(monc);
805                 }
806
807                 if (!monc->hunting) {
808                         ceph_con_keepalive(&monc->con);
809                         __validate_auth(monc);
810                 }
811
812                 if (is_auth) {
813                         unsigned long now = jiffies;
814
815                         dout("%s renew subs? now %lu renew after %lu\n",
816                              __func__, now, monc->sub_renew_after);
817                         if (time_after_eq(now, monc->sub_renew_after))
818                                 __send_subscribe(monc);
819                 }
820         }
821         __schedule_delayed(monc);
822         mutex_unlock(&monc->mutex);
823 }
824
825 /*
826  * On startup, we build a temporary monmap populated with the IPs
827  * provided by mount(2).
828  */
829 static int build_initial_monmap(struct ceph_mon_client *monc)
830 {
831         struct ceph_options *opt = monc->client->options;
832         struct ceph_entity_addr *mon_addr = opt->mon_addr;
833         int num_mon = opt->num_mon;
834         int i;
835
836         /* build initial monmap */
837         monc->monmap = kzalloc(sizeof(*monc->monmap) +
838                                num_mon*sizeof(monc->monmap->mon_inst[0]),
839                                GFP_KERNEL);
840         if (!monc->monmap)
841                 return -ENOMEM;
842         for (i = 0; i < num_mon; i++) {
843                 monc->monmap->mon_inst[i].addr = mon_addr[i];
844                 monc->monmap->mon_inst[i].addr.nonce = 0;
845                 monc->monmap->mon_inst[i].name.type =
846                         CEPH_ENTITY_TYPE_MON;
847                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
848         }
849         monc->monmap->num_mon = num_mon;
850         return 0;
851 }
852
853 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
854 {
855         int err = 0;
856
857         dout("init\n");
858         memset(monc, 0, sizeof(*monc));
859         monc->client = cl;
860         monc->monmap = NULL;
861         mutex_init(&monc->mutex);
862
863         err = build_initial_monmap(monc);
864         if (err)
865                 goto out;
866
867         /* connection */
868         /* authentication */
869         monc->auth = ceph_auth_init(cl->options->name,
870                                     cl->options->key);
871         if (IS_ERR(monc->auth)) {
872                 err = PTR_ERR(monc->auth);
873                 goto out_monmap;
874         }
875         monc->auth->want_keys =
876                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
877                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
878
879         /* msgs */
880         err = -ENOMEM;
881         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
882                                      sizeof(struct ceph_mon_subscribe_ack),
883                                      GFP_NOFS, true);
884         if (!monc->m_subscribe_ack)
885                 goto out_auth;
886
887         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
888                                          true);
889         if (!monc->m_subscribe)
890                 goto out_subscribe_ack;
891
892         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
893                                           true);
894         if (!monc->m_auth_reply)
895                 goto out_subscribe;
896
897         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
898         monc->pending_auth = 0;
899         if (!monc->m_auth)
900                 goto out_auth_reply;
901
902         ceph_con_init(&monc->con, monc, &mon_con_ops,
903                       &monc->client->msgr);
904
905         monc->cur_mon = -1;
906         monc->hunting = true;
907         monc->sub_renew_after = jiffies;
908         monc->sub_renew_sent = 0;
909
910         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
911         monc->generic_request_tree = RB_ROOT;
912         monc->num_generic_requests = 0;
913         monc->last_tid = 0;
914
915         return 0;
916
917 out_auth_reply:
918         ceph_msg_put(monc->m_auth_reply);
919 out_subscribe:
920         ceph_msg_put(monc->m_subscribe);
921 out_subscribe_ack:
922         ceph_msg_put(monc->m_subscribe_ack);
923 out_auth:
924         ceph_auth_destroy(monc->auth);
925 out_monmap:
926         kfree(monc->monmap);
927 out:
928         return err;
929 }
930 EXPORT_SYMBOL(ceph_monc_init);
931
932 void ceph_monc_stop(struct ceph_mon_client *monc)
933 {
934         dout("stop\n");
935         cancel_delayed_work_sync(&monc->delayed_work);
936
937         mutex_lock(&monc->mutex);
938         __close_session(monc);
939         monc->cur_mon = -1;
940         mutex_unlock(&monc->mutex);
941
942         /*
943          * flush msgr queue before we destroy ourselves to ensure that:
944          *  - any work that references our embedded con is finished.
945          *  - any osd_client or other work that may reference an authorizer
946          *    finishes before we shut down the auth subsystem.
947          */
948         ceph_msgr_flush();
949
950         ceph_auth_destroy(monc->auth);
951
952         ceph_msg_put(monc->m_auth);
953         ceph_msg_put(monc->m_auth_reply);
954         ceph_msg_put(monc->m_subscribe);
955         ceph_msg_put(monc->m_subscribe_ack);
956
957         kfree(monc->monmap);
958 }
959 EXPORT_SYMBOL(ceph_monc_stop);
960
961 static void finish_hunting(struct ceph_mon_client *monc)
962 {
963         if (monc->hunting) {
964                 dout("%s found mon%d\n", __func__, monc->cur_mon);
965                 monc->hunting = false;
966         }
967 }
968
969 static void handle_auth_reply(struct ceph_mon_client *monc,
970                               struct ceph_msg *msg)
971 {
972         int ret;
973         int was_auth = 0;
974
975         mutex_lock(&monc->mutex);
976         was_auth = ceph_auth_is_authenticated(monc->auth);
977         monc->pending_auth = 0;
978         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
979                                      msg->front.iov_len,
980                                      monc->m_auth->front.iov_base,
981                                      monc->m_auth->front_alloc_len);
982         if (ret > 0) {
983                 __send_prepared_auth_request(monc, ret);
984                 goto out;
985         }
986
987         finish_hunting(monc);
988
989         if (ret < 0) {
990                 monc->client->auth_err = ret;
991         } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
992                 dout("authenticated, starting session\n");
993
994                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
995                 monc->client->msgr.inst.name.num =
996                                         cpu_to_le64(monc->auth->global_id);
997
998                 __send_subscribe(monc);
999                 __resend_generic_request(monc);
1000
1001                 pr_info("mon%d %s session established\n", monc->cur_mon,
1002                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
1003         }
1004
1005 out:
1006         mutex_unlock(&monc->mutex);
1007         if (monc->client->auth_err < 0)
1008                 wake_up_all(&monc->client->auth_wq);
1009 }
1010
1011 static int __validate_auth(struct ceph_mon_client *monc)
1012 {
1013         int ret;
1014
1015         if (monc->pending_auth)
1016                 return 0;
1017
1018         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1019                               monc->m_auth->front_alloc_len);
1020         if (ret <= 0)
1021                 return ret; /* either an error, or no need to authenticate */
1022         __send_prepared_auth_request(monc, ret);
1023         return 0;
1024 }
1025
1026 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1027 {
1028         int ret;
1029
1030         mutex_lock(&monc->mutex);
1031         ret = __validate_auth(monc);
1032         mutex_unlock(&monc->mutex);
1033         return ret;
1034 }
1035 EXPORT_SYMBOL(ceph_monc_validate_auth);
1036
1037 /*
1038  * handle incoming message
1039  */
1040 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1041 {
1042         struct ceph_mon_client *monc = con->private;
1043         int type = le16_to_cpu(msg->hdr.type);
1044
1045         if (!monc)
1046                 return;
1047
1048         switch (type) {
1049         case CEPH_MSG_AUTH_REPLY:
1050                 handle_auth_reply(monc, msg);
1051                 break;
1052
1053         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1054                 handle_subscribe_ack(monc, msg);
1055                 break;
1056
1057         case CEPH_MSG_STATFS_REPLY:
1058                 handle_statfs_reply(monc, msg);
1059                 break;
1060
1061         case CEPH_MSG_MON_GET_VERSION_REPLY:
1062                 handle_get_version_reply(monc, msg);
1063                 break;
1064
1065         case CEPH_MSG_MON_MAP:
1066                 ceph_monc_handle_map(monc, msg);
1067                 break;
1068
1069         case CEPH_MSG_OSD_MAP:
1070                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1071                 break;
1072
1073         default:
1074                 /* can the chained handler handle it? */
1075                 if (monc->client->extra_mon_dispatch &&
1076                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1077                         break;
1078                         
1079                 pr_err("received unknown message type %d %s\n", type,
1080                        ceph_msg_type_name(type));
1081         }
1082         ceph_msg_put(msg);
1083 }
1084
1085 /*
1086  * Allocate memory for incoming message
1087  */
1088 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1089                                       struct ceph_msg_header *hdr,
1090                                       int *skip)
1091 {
1092         struct ceph_mon_client *monc = con->private;
1093         int type = le16_to_cpu(hdr->type);
1094         int front_len = le32_to_cpu(hdr->front_len);
1095         struct ceph_msg *m = NULL;
1096
1097         *skip = 0;
1098
1099         switch (type) {
1100         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1101                 m = ceph_msg_get(monc->m_subscribe_ack);
1102                 break;
1103         case CEPH_MSG_STATFS_REPLY:
1104                 return get_generic_reply(con, hdr, skip);
1105         case CEPH_MSG_AUTH_REPLY:
1106                 m = ceph_msg_get(monc->m_auth_reply);
1107                 break;
1108         case CEPH_MSG_MON_GET_VERSION_REPLY:
1109                 if (le64_to_cpu(hdr->tid) != 0)
1110                         return get_generic_reply(con, hdr, skip);
1111
1112                 /*
1113                  * Older OSDs don't set reply tid even if the orignal
1114                  * request had a non-zero tid.  Workaround this weirdness
1115                  * by falling through to the allocate case.
1116                  */
1117         case CEPH_MSG_MON_MAP:
1118         case CEPH_MSG_MDS_MAP:
1119         case CEPH_MSG_OSD_MAP:
1120                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1121                 if (!m)
1122                         return NULL;    /* ENOMEM--return skip == 0 */
1123                 break;
1124         }
1125
1126         if (!m) {
1127                 pr_info("alloc_msg unknown type %d\n", type);
1128                 *skip = 1;
1129         } else if (front_len > m->front_alloc_len) {
1130                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1131                         front_len, m->front_alloc_len,
1132                         (unsigned int)con->peer_name.type,
1133                         le64_to_cpu(con->peer_name.num));
1134                 ceph_msg_put(m);
1135                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1136         }
1137
1138         return m;
1139 }
1140
1141 /*
1142  * If the monitor connection resets, pick a new monitor and resubmit
1143  * any pending requests.
1144  */
1145 static void mon_fault(struct ceph_connection *con)
1146 {
1147         struct ceph_mon_client *monc = con->private;
1148
1149         if (!monc)
1150                 return;
1151
1152         dout("mon_fault\n");
1153         mutex_lock(&monc->mutex);
1154         if (!con->private)
1155                 goto out;
1156
1157         if (!monc->hunting)
1158                 pr_info("mon%d %s session lost, "
1159                         "hunting for new mon\n", monc->cur_mon,
1160                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
1161
1162         __close_session(monc);
1163         if (!monc->hunting) {
1164                 /* start hunting */
1165                 monc->hunting = true;
1166                 __open_session(monc);
1167         } else {
1168                 /* already hunting, let's wait a bit */
1169                 __schedule_delayed(monc);
1170         }
1171 out:
1172         mutex_unlock(&monc->mutex);
1173 }
1174
1175 /*
1176  * We can ignore refcounting on the connection struct, as all references
1177  * will come from the messenger workqueue, which is drained prior to
1178  * mon_client destruction.
1179  */
1180 static struct ceph_connection *con_get(struct ceph_connection *con)
1181 {
1182         return con;
1183 }
1184
1185 static void con_put(struct ceph_connection *con)
1186 {
1187 }
1188
1189 static const struct ceph_connection_operations mon_con_ops = {
1190         .get = con_get,
1191         .put = con_put,
1192         .dispatch = dispatch,
1193         .fault = mon_fault,
1194         .alloc_msg = mon_alloc_msg,
1195 };