Merge commit 'v2.6.34-rc2' into perf/core
[linux-2.6-block.git] / fs / ceph / mon_client.c
1 #include "ceph_debug.h"
2
3 #include <linux/types.h>
4 #include <linux/random.h>
5 #include <linux/sched.h>
6
7 #include "mon_client.h"
8 #include "super.h"
9 #include "auth.h"
10 #include "decode.h"
11
12 /*
13  * Interact with Ceph monitor cluster.  Handle requests for new map
14  * versions, and periodically resend as needed.  Also implement
15  * statfs() and umount().
16  *
17  * A small cluster of Ceph "monitors" are responsible for managing critical
18  * cluster configuration and state information.  An odd number (e.g., 3, 5)
19  * of cmon daemons use a modified version of the Paxos part-time parliament
20  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
21  * list of clients who have mounted the file system.
22  *
23  * We maintain an open, active session with a monitor at all times in order to
24  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
25  * TCP socket to ensure we detect a failure.  If the connection does break, we
26  * randomly hunt for a new monitor.  Once the connection is reestablished, we
27  * resend any outstanding requests.
28  */
29
30 const static struct ceph_connection_operations mon_con_ops;
31
32 static int __validate_auth(struct ceph_mon_client *monc);
33
34 /*
35  * Decode a monmap blob (e.g., during mount).
36  */
37 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
38 {
39         struct ceph_monmap *m = NULL;
40         int i, err = -EINVAL;
41         struct ceph_fsid fsid;
42         u32 epoch, num_mon;
43         u16 version;
44         u32 len;
45
46         ceph_decode_32_safe(&p, end, len, bad);
47         ceph_decode_need(&p, end, len, bad);
48
49         dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
50
51         ceph_decode_16_safe(&p, end, version, bad);
52
53         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
54         ceph_decode_copy(&p, &fsid, sizeof(fsid));
55         epoch = ceph_decode_32(&p);
56
57         num_mon = ceph_decode_32(&p);
58         ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
59
60         if (num_mon >= CEPH_MAX_MON)
61                 goto bad;
62         m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
63         if (m == NULL)
64                 return ERR_PTR(-ENOMEM);
65         m->fsid = fsid;
66         m->epoch = epoch;
67         m->num_mon = num_mon;
68         ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
69         for (i = 0; i < num_mon; i++)
70                 ceph_decode_addr(&m->mon_inst[i].addr);
71
72         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
73              m->num_mon);
74         for (i = 0; i < m->num_mon; i++)
75                 dout("monmap_decode  mon%d is %s\n", i,
76                      pr_addr(&m->mon_inst[i].addr.in_addr));
77         return m;
78
79 bad:
80         dout("monmap_decode failed with %d\n", err);
81         kfree(m);
82         return ERR_PTR(err);
83 }
84
85 /*
86  * return true if *addr is included in the monmap.
87  */
88 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
89 {
90         int i;
91
92         for (i = 0; i < m->num_mon; i++)
93                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
94                         return 1;
95         return 0;
96 }
97
98 /*
99  * Send an auth request.
100  */
101 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
102 {
103         monc->pending_auth = 1;
104         monc->m_auth->front.iov_len = len;
105         monc->m_auth->hdr.front_len = cpu_to_le32(len);
106         ceph_msg_get(monc->m_auth);  /* keep our ref */
107         ceph_con_send(monc->con, monc->m_auth);
108 }
109
110 /*
111  * Close monitor session, if any.
112  */
113 static void __close_session(struct ceph_mon_client *monc)
114 {
115         if (monc->con) {
116                 dout("__close_session closing mon%d\n", monc->cur_mon);
117                 ceph_con_revoke(monc->con, monc->m_auth);
118                 ceph_con_close(monc->con);
119                 monc->cur_mon = -1;
120                 monc->pending_auth = 0;
121                 ceph_auth_reset(monc->auth);
122         }
123 }
124
125 /*
126  * Open a session with a (new) monitor.
127  */
128 static int __open_session(struct ceph_mon_client *monc)
129 {
130         char r;
131         int ret;
132
133         if (monc->cur_mon < 0) {
134                 get_random_bytes(&r, 1);
135                 monc->cur_mon = r % monc->monmap->num_mon;
136                 dout("open_session num=%d r=%d -> mon%d\n",
137                      monc->monmap->num_mon, r, monc->cur_mon);
138                 monc->sub_sent = 0;
139                 monc->sub_renew_after = jiffies;  /* i.e., expired */
140                 monc->want_next_osdmap = !!monc->want_next_osdmap;
141
142                 dout("open_session mon%d opening\n", monc->cur_mon);
143                 monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
144                 monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
145                 ceph_con_open(monc->con,
146                               &monc->monmap->mon_inst[monc->cur_mon].addr);
147
148                 /* initiatiate authentication handshake */
149                 ret = ceph_auth_build_hello(monc->auth,
150                                             monc->m_auth->front.iov_base,
151                                             monc->m_auth->front_max);
152                 __send_prepared_auth_request(monc, ret);
153         } else {
154                 dout("open_session mon%d already open\n", monc->cur_mon);
155         }
156         return 0;
157 }
158
159 static bool __sub_expired(struct ceph_mon_client *monc)
160 {
161         return time_after_eq(jiffies, monc->sub_renew_after);
162 }
163
164 /*
165  * Reschedule delayed work timer.
166  */
167 static void __schedule_delayed(struct ceph_mon_client *monc)
168 {
169         unsigned delay;
170
171         if (monc->cur_mon < 0 || __sub_expired(monc))
172                 delay = 10 * HZ;
173         else
174                 delay = 20 * HZ;
175         dout("__schedule_delayed after %u\n", delay);
176         schedule_delayed_work(&monc->delayed_work, delay);
177 }
178
179 /*
180  * Send subscribe request for mdsmap and/or osdmap.
181  */
182 static void __send_subscribe(struct ceph_mon_client *monc)
183 {
184         dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
185              (unsigned)monc->sub_sent, __sub_expired(monc),
186              monc->want_next_osdmap);
187         if ((__sub_expired(monc) && !monc->sub_sent) ||
188             monc->want_next_osdmap == 1) {
189                 struct ceph_msg *msg;
190                 struct ceph_mon_subscribe_item *i;
191                 void *p, *end;
192
193                 msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, 0, 0, NULL);
194                 if (!msg)
195                         return;
196
197                 p = msg->front.iov_base;
198                 end = p + msg->front.iov_len;
199
200                 dout("__send_subscribe to 'mdsmap' %u+\n",
201                      (unsigned)monc->have_mdsmap);
202                 if (monc->want_next_osdmap) {
203                         dout("__send_subscribe to 'osdmap' %u\n",
204                              (unsigned)monc->have_osdmap);
205                         ceph_encode_32(&p, 3);
206                         ceph_encode_string(&p, end, "osdmap", 6);
207                         i = p;
208                         i->have = cpu_to_le64(monc->have_osdmap);
209                         i->onetime = 1;
210                         p += sizeof(*i);
211                         monc->want_next_osdmap = 2;  /* requested */
212                 } else {
213                         ceph_encode_32(&p, 2);
214                 }
215                 ceph_encode_string(&p, end, "mdsmap", 6);
216                 i = p;
217                 i->have = cpu_to_le64(monc->have_mdsmap);
218                 i->onetime = 0;
219                 p += sizeof(*i);
220                 ceph_encode_string(&p, end, "monmap", 6);
221                 i = p;
222                 i->have = 0;
223                 i->onetime = 0;
224                 p += sizeof(*i);
225
226                 msg->front.iov_len = p - msg->front.iov_base;
227                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
228                 ceph_con_send(monc->con, msg);
229
230                 monc->sub_sent = jiffies | 1;  /* never 0 */
231         }
232 }
233
234 static void handle_subscribe_ack(struct ceph_mon_client *monc,
235                                  struct ceph_msg *msg)
236 {
237         unsigned seconds;
238         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
239
240         if (msg->front.iov_len < sizeof(*h))
241                 goto bad;
242         seconds = le32_to_cpu(h->duration);
243
244         mutex_lock(&monc->mutex);
245         if (monc->hunting) {
246                 pr_info("mon%d %s session established\n",
247                         monc->cur_mon, pr_addr(&monc->con->peer_addr.in_addr));
248                 monc->hunting = false;
249         }
250         dout("handle_subscribe_ack after %d seconds\n", seconds);
251         monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
252         monc->sub_sent = 0;
253         mutex_unlock(&monc->mutex);
254         return;
255 bad:
256         pr_err("got corrupt subscribe-ack msg\n");
257         ceph_msg_dump(msg);
258 }
259
260 /*
261  * Keep track of which maps we have
262  */
263 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
264 {
265         mutex_lock(&monc->mutex);
266         monc->have_mdsmap = got;
267         mutex_unlock(&monc->mutex);
268         return 0;
269 }
270
271 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
272 {
273         mutex_lock(&monc->mutex);
274         monc->have_osdmap = got;
275         monc->want_next_osdmap = 0;
276         mutex_unlock(&monc->mutex);
277         return 0;
278 }
279
280 /*
281  * Register interest in the next osdmap
282  */
283 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
284 {
285         dout("request_next_osdmap have %u\n", monc->have_osdmap);
286         mutex_lock(&monc->mutex);
287         if (!monc->want_next_osdmap)
288                 monc->want_next_osdmap = 1;
289         if (monc->want_next_osdmap < 2)
290                 __send_subscribe(monc);
291         mutex_unlock(&monc->mutex);
292 }
293
294 /*
295  *
296  */
297 int ceph_monc_open_session(struct ceph_mon_client *monc)
298 {
299         if (!monc->con) {
300                 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
301                 if (!monc->con)
302                         return -ENOMEM;
303                 ceph_con_init(monc->client->msgr, monc->con);
304                 monc->con->private = monc;
305                 monc->con->ops = &mon_con_ops;
306         }
307
308         mutex_lock(&monc->mutex);
309         __open_session(monc);
310         __schedule_delayed(monc);
311         mutex_unlock(&monc->mutex);
312         return 0;
313 }
314
315 /*
316  * The monitor responds with mount ack indicate mount success.  The
317  * included client ticket allows the client to talk to MDSs and OSDs.
318  */
319 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
320                                  struct ceph_msg *msg)
321 {
322         struct ceph_client *client = monc->client;
323         struct ceph_monmap *monmap = NULL, *old = monc->monmap;
324         void *p, *end;
325
326         mutex_lock(&monc->mutex);
327
328         dout("handle_monmap\n");
329         p = msg->front.iov_base;
330         end = p + msg->front.iov_len;
331
332         monmap = ceph_monmap_decode(p, end);
333         if (IS_ERR(monmap)) {
334                 pr_err("problem decoding monmap, %d\n",
335                        (int)PTR_ERR(monmap));
336                 goto out;
337         }
338
339         if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
340                 kfree(monmap);
341                 goto out;
342         }
343
344         client->monc.monmap = monmap;
345         kfree(old);
346
347 out:
348         mutex_unlock(&monc->mutex);
349         wake_up(&client->auth_wq);
350 }
351
352 /*
353  * statfs
354  */
355 static struct ceph_mon_statfs_request *__lookup_statfs(
356         struct ceph_mon_client *monc, u64 tid)
357 {
358         struct ceph_mon_statfs_request *req;
359         struct rb_node *n = monc->statfs_request_tree.rb_node;
360
361         while (n) {
362                 req = rb_entry(n, struct ceph_mon_statfs_request, node);
363                 if (tid < req->tid)
364                         n = n->rb_left;
365                 else if (tid > req->tid)
366                         n = n->rb_right;
367                 else
368                         return req;
369         }
370         return NULL;
371 }
372
373 static void __insert_statfs(struct ceph_mon_client *monc,
374                             struct ceph_mon_statfs_request *new)
375 {
376         struct rb_node **p = &monc->statfs_request_tree.rb_node;
377         struct rb_node *parent = NULL;
378         struct ceph_mon_statfs_request *req = NULL;
379
380         while (*p) {
381                 parent = *p;
382                 req = rb_entry(parent, struct ceph_mon_statfs_request, node);
383                 if (new->tid < req->tid)
384                         p = &(*p)->rb_left;
385                 else if (new->tid > req->tid)
386                         p = &(*p)->rb_right;
387                 else
388                         BUG();
389         }
390
391         rb_link_node(&new->node, parent, p);
392         rb_insert_color(&new->node, &monc->statfs_request_tree);
393 }
394
395 static void handle_statfs_reply(struct ceph_mon_client *monc,
396                                 struct ceph_msg *msg)
397 {
398         struct ceph_mon_statfs_request *req;
399         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
400         u64 tid;
401
402         if (msg->front.iov_len != sizeof(*reply))
403                 goto bad;
404         tid = le64_to_cpu(msg->hdr.tid);
405         dout("handle_statfs_reply %p tid %llu\n", msg, tid);
406
407         mutex_lock(&monc->mutex);
408         req = __lookup_statfs(monc, tid);
409         if (req) {
410                 *req->buf = reply->st;
411                 req->result = 0;
412         }
413         mutex_unlock(&monc->mutex);
414         if (req)
415                 complete(&req->completion);
416         return;
417
418 bad:
419         pr_err("corrupt statfs reply, no tid\n");
420         ceph_msg_dump(msg);
421 }
422
423 /*
424  * (re)send a statfs request
425  */
426 static int send_statfs(struct ceph_mon_client *monc,
427                        struct ceph_mon_statfs_request *req)
428 {
429         struct ceph_msg *msg;
430         struct ceph_mon_statfs *h;
431
432         dout("send_statfs tid %llu\n", req->tid);
433         msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
434         if (IS_ERR(msg))
435                 return PTR_ERR(msg);
436         req->request = msg;
437         msg->hdr.tid = cpu_to_le64(req->tid);
438         h = msg->front.iov_base;
439         h->monhdr.have_version = 0;
440         h->monhdr.session_mon = cpu_to_le16(-1);
441         h->monhdr.session_mon_tid = 0;
442         h->fsid = monc->monmap->fsid;
443         ceph_con_send(monc->con, msg);
444         return 0;
445 }
446
447 /*
448  * Do a synchronous statfs().
449  */
450 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
451 {
452         struct ceph_mon_statfs_request req;
453         int err;
454
455         req.buf = buf;
456         init_completion(&req.completion);
457
458         /* allocate memory for reply */
459         err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1);
460         if (err)
461                 return err;
462
463         /* register request */
464         mutex_lock(&monc->mutex);
465         req.tid = ++monc->last_tid;
466         req.last_attempt = jiffies;
467         req.delay = BASE_DELAY_INTERVAL;
468         __insert_statfs(monc, &req);
469         monc->num_statfs_requests++;
470         mutex_unlock(&monc->mutex);
471
472         /* send request and wait */
473         err = send_statfs(monc, &req);
474         if (!err)
475                 err = wait_for_completion_interruptible(&req.completion);
476
477         mutex_lock(&monc->mutex);
478         rb_erase(&req.node, &monc->statfs_request_tree);
479         monc->num_statfs_requests--;
480         ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1);
481         mutex_unlock(&monc->mutex);
482
483         if (!err)
484                 err = req.result;
485         return err;
486 }
487
488 /*
489  * Resend pending statfs requests.
490  */
491 static void __resend_statfs(struct ceph_mon_client *monc)
492 {
493         struct ceph_mon_statfs_request *req;
494         struct rb_node *p;
495
496         for (p = rb_first(&monc->statfs_request_tree); p; p = rb_next(p)) {
497                 req = rb_entry(p, struct ceph_mon_statfs_request, node);
498                 send_statfs(monc, req);
499         }
500 }
501
502 /*
503  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
504  * renew/retry subscription as needed (in case it is timing out, or we
505  * got an ENOMEM).  And keep the monitor connection alive.
506  */
507 static void delayed_work(struct work_struct *work)
508 {
509         struct ceph_mon_client *monc =
510                 container_of(work, struct ceph_mon_client, delayed_work.work);
511
512         dout("monc delayed_work\n");
513         mutex_lock(&monc->mutex);
514         if (monc->hunting) {
515                 __close_session(monc);
516                 __open_session(monc);  /* continue hunting */
517         } else {
518                 ceph_con_keepalive(monc->con);
519
520                 __validate_auth(monc);
521
522                 if (monc->auth->ops->is_authenticated(monc->auth))
523                         __send_subscribe(monc);
524         }
525         __schedule_delayed(monc);
526         mutex_unlock(&monc->mutex);
527 }
528
529 /*
530  * On startup, we build a temporary monmap populated with the IPs
531  * provided by mount(2).
532  */
533 static int build_initial_monmap(struct ceph_mon_client *monc)
534 {
535         struct ceph_mount_args *args = monc->client->mount_args;
536         struct ceph_entity_addr *mon_addr = args->mon_addr;
537         int num_mon = args->num_mon;
538         int i;
539
540         /* build initial monmap */
541         monc->monmap = kzalloc(sizeof(*monc->monmap) +
542                                num_mon*sizeof(monc->monmap->mon_inst[0]),
543                                GFP_KERNEL);
544         if (!monc->monmap)
545                 return -ENOMEM;
546         for (i = 0; i < num_mon; i++) {
547                 monc->monmap->mon_inst[i].addr = mon_addr[i];
548                 monc->monmap->mon_inst[i].addr.nonce = 0;
549                 monc->monmap->mon_inst[i].name.type =
550                         CEPH_ENTITY_TYPE_MON;
551                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
552         }
553         monc->monmap->num_mon = num_mon;
554         monc->have_fsid = false;
555
556         /* release addr memory */
557         kfree(args->mon_addr);
558         args->mon_addr = NULL;
559         args->num_mon = 0;
560         return 0;
561 }
562
563 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
564 {
565         int err = 0;
566
567         dout("init\n");
568         memset(monc, 0, sizeof(*monc));
569         monc->client = cl;
570         monc->monmap = NULL;
571         mutex_init(&monc->mutex);
572
573         err = build_initial_monmap(monc);
574         if (err)
575                 goto out;
576
577         monc->con = NULL;
578
579         /* authentication */
580         monc->auth = ceph_auth_init(cl->mount_args->name,
581                                     cl->mount_args->secret);
582         if (IS_ERR(monc->auth))
583                 return PTR_ERR(monc->auth);
584         monc->auth->want_keys =
585                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
586                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
587
588         /* msg pools */
589         err = ceph_msgpool_init(&monc->msgpool_subscribe_ack,
590                                sizeof(struct ceph_mon_subscribe_ack), 1, false);
591         if (err < 0)
592                 goto out_monmap;
593         err = ceph_msgpool_init(&monc->msgpool_statfs_reply,
594                                 sizeof(struct ceph_mon_statfs_reply), 0, false);
595         if (err < 0)
596                 goto out_pool1;
597         err = ceph_msgpool_init(&monc->msgpool_auth_reply, 4096, 1, false);
598         if (err < 0)
599                 goto out_pool2;
600
601         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, 0, 0, NULL);
602         monc->pending_auth = 0;
603         if (IS_ERR(monc->m_auth)) {
604                 err = PTR_ERR(monc->m_auth);
605                 monc->m_auth = NULL;
606                 goto out_pool3;
607         }
608
609         monc->cur_mon = -1;
610         monc->hunting = true;
611         monc->sub_renew_after = jiffies;
612         monc->sub_sent = 0;
613
614         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
615         monc->statfs_request_tree = RB_ROOT;
616         monc->num_statfs_requests = 0;
617         monc->last_tid = 0;
618
619         monc->have_mdsmap = 0;
620         monc->have_osdmap = 0;
621         monc->want_next_osdmap = 1;
622         return 0;
623
624 out_pool3:
625         ceph_msgpool_destroy(&monc->msgpool_auth_reply);
626 out_pool2:
627         ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
628 out_pool1:
629         ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
630 out_monmap:
631         kfree(monc->monmap);
632 out:
633         return err;
634 }
635
636 void ceph_monc_stop(struct ceph_mon_client *monc)
637 {
638         dout("stop\n");
639         cancel_delayed_work_sync(&monc->delayed_work);
640
641         mutex_lock(&monc->mutex);
642         __close_session(monc);
643         if (monc->con) {
644                 monc->con->private = NULL;
645                 monc->con->ops->put(monc->con);
646                 monc->con = NULL;
647         }
648         mutex_unlock(&monc->mutex);
649
650         ceph_auth_destroy(monc->auth);
651
652         ceph_msg_put(monc->m_auth);
653         ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
654         ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
655         ceph_msgpool_destroy(&monc->msgpool_auth_reply);
656
657         kfree(monc->monmap);
658 }
659
660 static void handle_auth_reply(struct ceph_mon_client *monc,
661                               struct ceph_msg *msg)
662 {
663         int ret;
664
665         mutex_lock(&monc->mutex);
666         monc->pending_auth = 0;
667         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
668                                      msg->front.iov_len,
669                                      monc->m_auth->front.iov_base,
670                                      monc->m_auth->front_max);
671         if (ret < 0) {
672                 monc->client->auth_err = ret;
673                 wake_up(&monc->client->auth_wq);
674         } else if (ret > 0) {
675                 __send_prepared_auth_request(monc, ret);
676         } else if (monc->auth->ops->is_authenticated(monc->auth)) {
677                 dout("authenticated, starting session\n");
678
679                 monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
680                 monc->client->msgr->inst.name.num = monc->auth->global_id;
681
682                 __send_subscribe(monc);
683                 __resend_statfs(monc);
684         }
685         mutex_unlock(&monc->mutex);
686 }
687
688 static int __validate_auth(struct ceph_mon_client *monc)
689 {
690         int ret;
691
692         if (monc->pending_auth)
693                 return 0;
694
695         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
696                               monc->m_auth->front_max);
697         if (ret <= 0)
698                 return ret; /* either an error, or no need to authenticate */
699         __send_prepared_auth_request(monc, ret);
700         return 0;
701 }
702
703 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
704 {
705         int ret;
706
707         mutex_lock(&monc->mutex);
708         ret = __validate_auth(monc);
709         mutex_unlock(&monc->mutex);
710         return ret;
711 }
712
713 /*
714  * handle incoming message
715  */
716 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
717 {
718         struct ceph_mon_client *monc = con->private;
719         int type = le16_to_cpu(msg->hdr.type);
720
721         if (!monc)
722                 return;
723
724         switch (type) {
725         case CEPH_MSG_AUTH_REPLY:
726                 handle_auth_reply(monc, msg);
727                 break;
728
729         case CEPH_MSG_MON_SUBSCRIBE_ACK:
730                 handle_subscribe_ack(monc, msg);
731                 break;
732
733         case CEPH_MSG_STATFS_REPLY:
734                 handle_statfs_reply(monc, msg);
735                 break;
736
737         case CEPH_MSG_MON_MAP:
738                 ceph_monc_handle_map(monc, msg);
739                 break;
740
741         case CEPH_MSG_MDS_MAP:
742                 ceph_mdsc_handle_map(&monc->client->mdsc, msg);
743                 break;
744
745         case CEPH_MSG_OSD_MAP:
746                 ceph_osdc_handle_map(&monc->client->osdc, msg);
747                 break;
748
749         default:
750                 pr_err("received unknown message type %d %s\n", type,
751                        ceph_msg_type_name(type));
752         }
753         ceph_msg_put(msg);
754 }
755
756 /*
757  * Allocate memory for incoming message
758  */
759 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
760                                       struct ceph_msg_header *hdr,
761                                       int *skip)
762 {
763         struct ceph_mon_client *monc = con->private;
764         int type = le16_to_cpu(hdr->type);
765         int front_len = le32_to_cpu(hdr->front_len);
766         struct ceph_msg *m = NULL;
767
768         *skip = 0;
769
770         switch (type) {
771         case CEPH_MSG_MON_SUBSCRIBE_ACK:
772                 m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
773                 break;
774         case CEPH_MSG_STATFS_REPLY:
775                 m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len);
776                 break;
777         case CEPH_MSG_AUTH_REPLY:
778                 m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len);
779                 break;
780         case CEPH_MSG_MON_MAP:
781         case CEPH_MSG_MDS_MAP:
782         case CEPH_MSG_OSD_MAP:
783                 m = ceph_msg_new(type, front_len, 0, 0, NULL);
784                 break;
785         }
786
787         if (!m) {
788                 pr_info("alloc_msg unknown type %d\n", type);
789                 *skip = 1;
790         }
791         return m;
792 }
793
794 /*
795  * If the monitor connection resets, pick a new monitor and resubmit
796  * any pending requests.
797  */
798 static void mon_fault(struct ceph_connection *con)
799 {
800         struct ceph_mon_client *monc = con->private;
801
802         if (!monc)
803                 return;
804
805         dout("mon_fault\n");
806         mutex_lock(&monc->mutex);
807         if (!con->private)
808                 goto out;
809
810         if (monc->con && !monc->hunting)
811                 pr_info("mon%d %s session lost, "
812                         "hunting for new mon\n", monc->cur_mon,
813                         pr_addr(&monc->con->peer_addr.in_addr));
814
815         __close_session(monc);
816         if (!monc->hunting) {
817                 /* start hunting */
818                 monc->hunting = true;
819                 __open_session(monc);
820         } else {
821                 /* already hunting, let's wait a bit */
822                 __schedule_delayed(monc);
823         }
824 out:
825         mutex_unlock(&monc->mutex);
826 }
827
828 const static struct ceph_connection_operations mon_con_ops = {
829         .get = ceph_con_get,
830         .put = ceph_con_put,
831         .dispatch = dispatch,
832         .fault = mon_fault,
833         .alloc_msg = mon_alloc_msg,
834 };