Merge branch 'etnaviv/fixes' of https://git.pengutronix.de/git/lst/linux into drm...
[linux-2.6-block.git] / net / ceph / mon_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/types.h>
5 #include <linux/slab.h>
6 #include <linux/random.h>
7 #include <linux/sched.h>
8
9 #include <linux/ceph/ceph_features.h>
10 #include <linux/ceph/mon_client.h>
11 #include <linux/ceph/libceph.h>
12 #include <linux/ceph/debugfs.h>
13 #include <linux/ceph/decode.h>
14 #include <linux/ceph/auth.h>
15
16 /*
17  * Interact with Ceph monitor cluster.  Handle requests for new map
18  * versions, and periodically resend as needed.  Also implement
19  * statfs() and umount().
20  *
21  * A small cluster of Ceph "monitors" are responsible for managing critical
22  * cluster configuration and state information.  An odd number (e.g., 3, 5)
23  * of cmon daemons use a modified version of the Paxos part-time parliament
24  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
25  * list of clients who have mounted the file system.
26  *
27  * We maintain an open, active session with a monitor at all times in order to
28  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
29  * TCP socket to ensure we detect a failure.  If the connection does break, we
30  * randomly hunt for a new monitor.  Once the connection is reestablished, we
31  * resend any outstanding requests.
32  */
33
34 static const struct ceph_connection_operations mon_con_ops;
35
36 static int __validate_auth(struct ceph_mon_client *monc);
37
38 /*
39  * Decode a monmap blob (e.g., during mount).
40  */
41 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
42 {
43         struct ceph_monmap *m = NULL;
44         int i, err = -EINVAL;
45         struct ceph_fsid fsid;
46         u32 epoch, num_mon;
47         u32 len;
48
49         ceph_decode_32_safe(&p, end, len, bad);
50         ceph_decode_need(&p, end, len, bad);
51
52         dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
53         p += sizeof(u16);  /* skip version */
54
55         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
56         ceph_decode_copy(&p, &fsid, sizeof(fsid));
57         epoch = ceph_decode_32(&p);
58
59         num_mon = ceph_decode_32(&p);
60         ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
61
62         if (num_mon >= CEPH_MAX_MON)
63                 goto bad;
64         m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
65         if (m == NULL)
66                 return ERR_PTR(-ENOMEM);
67         m->fsid = fsid;
68         m->epoch = epoch;
69         m->num_mon = num_mon;
70         ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
71         for (i = 0; i < num_mon; i++)
72                 ceph_decode_addr(&m->mon_inst[i].addr);
73
74         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
75              m->num_mon);
76         for (i = 0; i < m->num_mon; i++)
77                 dout("monmap_decode  mon%d is %s\n", i,
78                      ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
79         return m;
80
81 bad:
82         dout("monmap_decode failed with %d\n", err);
83         kfree(m);
84         return ERR_PTR(err);
85 }
86
87 /*
88  * return true if *addr is included in the monmap.
89  */
90 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
91 {
92         int i;
93
94         for (i = 0; i < m->num_mon; i++)
95                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
96                         return 1;
97         return 0;
98 }
99
100 /*
101  * Send an auth request.
102  */
103 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
104 {
105         monc->pending_auth = 1;
106         monc->m_auth->front.iov_len = len;
107         monc->m_auth->hdr.front_len = cpu_to_le32(len);
108         ceph_msg_revoke(monc->m_auth);
109         ceph_msg_get(monc->m_auth);  /* keep our ref */
110         ceph_con_send(&monc->con, monc->m_auth);
111 }
112
113 /*
114  * Close monitor session, if any.
115  */
116 static void __close_session(struct ceph_mon_client *monc)
117 {
118         dout("__close_session closing mon%d\n", monc->cur_mon);
119         ceph_msg_revoke(monc->m_auth);
120         ceph_msg_revoke_incoming(monc->m_auth_reply);
121         ceph_msg_revoke(monc->m_subscribe);
122         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
123         ceph_con_close(&monc->con);
124
125         monc->pending_auth = 0;
126         ceph_auth_reset(monc->auth);
127 }
128
129 /*
130  * Pick a new monitor at random and set cur_mon.  If we are repicking
131  * (i.e. cur_mon is already set), be sure to pick a different one.
132  */
133 static void pick_new_mon(struct ceph_mon_client *monc)
134 {
135         int old_mon = monc->cur_mon;
136
137         BUG_ON(monc->monmap->num_mon < 1);
138
139         if (monc->monmap->num_mon == 1) {
140                 monc->cur_mon = 0;
141         } else {
142                 int max = monc->monmap->num_mon;
143                 int o = -1;
144                 int n;
145
146                 if (monc->cur_mon >= 0) {
147                         if (monc->cur_mon < monc->monmap->num_mon)
148                                 o = monc->cur_mon;
149                         if (o >= 0)
150                                 max--;
151                 }
152
153                 n = prandom_u32() % max;
154                 if (o >= 0 && n >= o)
155                         n++;
156
157                 monc->cur_mon = n;
158         }
159
160         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
161              monc->cur_mon, monc->monmap->num_mon);
162 }
163
164 /*
165  * Open a session with a new monitor.
166  */
167 static void __open_session(struct ceph_mon_client *monc)
168 {
169         int ret;
170
171         pick_new_mon(monc);
172
173         monc->hunting = true;
174         if (monc->had_a_connection) {
175                 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
176                 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
177                         monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
178         }
179
180         monc->sub_renew_after = jiffies; /* i.e., expired */
181         monc->sub_renew_sent = 0;
182
183         dout("%s opening mon%d\n", __func__, monc->cur_mon);
184         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
185                       &monc->monmap->mon_inst[monc->cur_mon].addr);
186
187         /*
188          * send an initial keepalive to ensure our timestamp is valid
189          * by the time we are in an OPENED state
190          */
191         ceph_con_keepalive(&monc->con);
192
193         /* initiate authentication handshake */
194         ret = ceph_auth_build_hello(monc->auth,
195                                     monc->m_auth->front.iov_base,
196                                     monc->m_auth->front_alloc_len);
197         BUG_ON(ret <= 0);
198         __send_prepared_auth_request(monc, ret);
199 }
200
201 static void reopen_session(struct ceph_mon_client *monc)
202 {
203         if (!monc->hunting)
204                 pr_info("mon%d %s session lost, hunting for new mon\n",
205                     monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
206
207         __close_session(monc);
208         __open_session(monc);
209 }
210
211 /*
212  * Reschedule delayed work timer.
213  */
214 static void __schedule_delayed(struct ceph_mon_client *monc)
215 {
216         unsigned long delay;
217
218         if (monc->hunting)
219                 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
220         else
221                 delay = CEPH_MONC_PING_INTERVAL;
222
223         dout("__schedule_delayed after %lu\n", delay);
224         mod_delayed_work(system_wq, &monc->delayed_work,
225                          round_jiffies_relative(delay));
226 }
227
228 const char *ceph_sub_str[] = {
229         [CEPH_SUB_MONMAP] = "monmap",
230         [CEPH_SUB_OSDMAP] = "osdmap",
231         [CEPH_SUB_FSMAP]  = "fsmap.user",
232         [CEPH_SUB_MDSMAP] = "mdsmap",
233 };
234
235 /*
236  * Send subscribe request for one or more maps, according to
237  * monc->subs.
238  */
239 static void __send_subscribe(struct ceph_mon_client *monc)
240 {
241         struct ceph_msg *msg = monc->m_subscribe;
242         void *p = msg->front.iov_base;
243         void *const end = p + msg->front_alloc_len;
244         int num = 0;
245         int i;
246
247         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
248
249         BUG_ON(monc->cur_mon < 0);
250
251         if (!monc->sub_renew_sent)
252                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
253
254         msg->hdr.version = cpu_to_le16(2);
255
256         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
257                 if (monc->subs[i].want)
258                         num++;
259         }
260         BUG_ON(num < 1); /* monmap sub is always there */
261         ceph_encode_32(&p, num);
262         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
263                 char buf[32];
264                 int len;
265
266                 if (!monc->subs[i].want)
267                         continue;
268
269                 len = sprintf(buf, "%s", ceph_sub_str[i]);
270                 if (i == CEPH_SUB_MDSMAP &&
271                     monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
272                         len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
273
274                 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
275                      le64_to_cpu(monc->subs[i].item.start),
276                      monc->subs[i].item.flags);
277                 ceph_encode_string(&p, end, buf, len);
278                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
279                 p += sizeof(monc->subs[i].item);
280         }
281
282         BUG_ON(p > end);
283         msg->front.iov_len = p - msg->front.iov_base;
284         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
285         ceph_msg_revoke(msg);
286         ceph_con_send(&monc->con, ceph_msg_get(msg));
287 }
288
289 static void handle_subscribe_ack(struct ceph_mon_client *monc,
290                                  struct ceph_msg *msg)
291 {
292         unsigned int seconds;
293         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
294
295         if (msg->front.iov_len < sizeof(*h))
296                 goto bad;
297         seconds = le32_to_cpu(h->duration);
298
299         mutex_lock(&monc->mutex);
300         if (monc->sub_renew_sent) {
301                 /*
302                  * This is only needed for legacy (infernalis or older)
303                  * MONs -- see delayed_work().
304                  */
305                 monc->sub_renew_after = monc->sub_renew_sent +
306                                             (seconds >> 1) * HZ - 1;
307                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
308                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
309                 monc->sub_renew_sent = 0;
310         } else {
311                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
312                      monc->sub_renew_sent, monc->sub_renew_after);
313         }
314         mutex_unlock(&monc->mutex);
315         return;
316 bad:
317         pr_err("got corrupt subscribe-ack msg\n");
318         ceph_msg_dump(msg);
319 }
320
321 /*
322  * Register interest in a map
323  *
324  * @sub: one of CEPH_SUB_*
325  * @epoch: X for "every map since X", or 0 for "just the latest"
326  */
327 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
328                                  u32 epoch, bool continuous)
329 {
330         __le64 start = cpu_to_le64(epoch);
331         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
332
333         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
334              epoch, continuous);
335
336         if (monc->subs[sub].want &&
337             monc->subs[sub].item.start == start &&
338             monc->subs[sub].item.flags == flags)
339                 return false;
340
341         monc->subs[sub].item.start = start;
342         monc->subs[sub].item.flags = flags;
343         monc->subs[sub].want = true;
344
345         return true;
346 }
347
348 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
349                         bool continuous)
350 {
351         bool need_request;
352
353         mutex_lock(&monc->mutex);
354         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
355         mutex_unlock(&monc->mutex);
356
357         return need_request;
358 }
359 EXPORT_SYMBOL(ceph_monc_want_map);
360
361 /*
362  * Keep track of which maps we have
363  *
364  * @sub: one of CEPH_SUB_*
365  */
366 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
367                                 u32 epoch)
368 {
369         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
370
371         if (monc->subs[sub].want) {
372                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
373                         monc->subs[sub].want = false;
374                 else
375                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
376         }
377
378         monc->subs[sub].have = epoch;
379 }
380
381 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
382 {
383         mutex_lock(&monc->mutex);
384         __ceph_monc_got_map(monc, sub, epoch);
385         mutex_unlock(&monc->mutex);
386 }
387 EXPORT_SYMBOL(ceph_monc_got_map);
388
389 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
390 {
391         mutex_lock(&monc->mutex);
392         __send_subscribe(monc);
393         mutex_unlock(&monc->mutex);
394 }
395 EXPORT_SYMBOL(ceph_monc_renew_subs);
396
397 /*
398  * Wait for an osdmap with a given epoch.
399  *
400  * @epoch: epoch to wait for
401  * @timeout: in jiffies, 0 means "wait forever"
402  */
403 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
404                           unsigned long timeout)
405 {
406         unsigned long started = jiffies;
407         long ret;
408
409         mutex_lock(&monc->mutex);
410         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
411                 mutex_unlock(&monc->mutex);
412
413                 if (timeout && time_after_eq(jiffies, started + timeout))
414                         return -ETIMEDOUT;
415
416                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
417                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
418                                      ceph_timeout_jiffies(timeout));
419                 if (ret < 0)
420                         return ret;
421
422                 mutex_lock(&monc->mutex);
423         }
424
425         mutex_unlock(&monc->mutex);
426         return 0;
427 }
428 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
429
430 /*
431  * Open a session with a random monitor.  Request monmap and osdmap,
432  * which are waited upon in __ceph_open_session().
433  */
434 int ceph_monc_open_session(struct ceph_mon_client *monc)
435 {
436         mutex_lock(&monc->mutex);
437         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
438         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
439         __open_session(monc);
440         __schedule_delayed(monc);
441         mutex_unlock(&monc->mutex);
442         return 0;
443 }
444 EXPORT_SYMBOL(ceph_monc_open_session);
445
446 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
447                                  struct ceph_msg *msg)
448 {
449         struct ceph_client *client = monc->client;
450         struct ceph_monmap *monmap = NULL, *old = monc->monmap;
451         void *p, *end;
452
453         mutex_lock(&monc->mutex);
454
455         dout("handle_monmap\n");
456         p = msg->front.iov_base;
457         end = p + msg->front.iov_len;
458
459         monmap = ceph_monmap_decode(p, end);
460         if (IS_ERR(monmap)) {
461                 pr_err("problem decoding monmap, %d\n",
462                        (int)PTR_ERR(monmap));
463                 goto out;
464         }
465
466         if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
467                 kfree(monmap);
468                 goto out;
469         }
470
471         client->monc.monmap = monmap;
472         kfree(old);
473
474         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
475         client->have_fsid = true;
476
477 out:
478         mutex_unlock(&monc->mutex);
479         wake_up_all(&client->auth_wq);
480 }
481
482 /*
483  * generic requests (currently statfs, mon_get_version)
484  */
485 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
486
487 static void release_generic_request(struct kref *kref)
488 {
489         struct ceph_mon_generic_request *req =
490                 container_of(kref, struct ceph_mon_generic_request, kref);
491
492         dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
493              req->reply);
494         WARN_ON(!RB_EMPTY_NODE(&req->node));
495
496         if (req->reply)
497                 ceph_msg_put(req->reply);
498         if (req->request)
499                 ceph_msg_put(req->request);
500
501         kfree(req);
502 }
503
504 static void put_generic_request(struct ceph_mon_generic_request *req)
505 {
506         if (req)
507                 kref_put(&req->kref, release_generic_request);
508 }
509
510 static void get_generic_request(struct ceph_mon_generic_request *req)
511 {
512         kref_get(&req->kref);
513 }
514
515 static struct ceph_mon_generic_request *
516 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
517 {
518         struct ceph_mon_generic_request *req;
519
520         req = kzalloc(sizeof(*req), gfp);
521         if (!req)
522                 return NULL;
523
524         req->monc = monc;
525         kref_init(&req->kref);
526         RB_CLEAR_NODE(&req->node);
527         init_completion(&req->completion);
528
529         dout("%s greq %p\n", __func__, req);
530         return req;
531 }
532
533 static void register_generic_request(struct ceph_mon_generic_request *req)
534 {
535         struct ceph_mon_client *monc = req->monc;
536
537         WARN_ON(req->tid);
538
539         get_generic_request(req);
540         req->tid = ++monc->last_tid;
541         insert_generic_request(&monc->generic_request_tree, req);
542 }
543
544 static void send_generic_request(struct ceph_mon_client *monc,
545                                  struct ceph_mon_generic_request *req)
546 {
547         WARN_ON(!req->tid);
548
549         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
550         req->request->hdr.tid = cpu_to_le64(req->tid);
551         ceph_con_send(&monc->con, ceph_msg_get(req->request));
552 }
553
554 static void __finish_generic_request(struct ceph_mon_generic_request *req)
555 {
556         struct ceph_mon_client *monc = req->monc;
557
558         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
559         erase_generic_request(&monc->generic_request_tree, req);
560
561         ceph_msg_revoke(req->request);
562         ceph_msg_revoke_incoming(req->reply);
563 }
564
565 static void finish_generic_request(struct ceph_mon_generic_request *req)
566 {
567         __finish_generic_request(req);
568         put_generic_request(req);
569 }
570
571 static void complete_generic_request(struct ceph_mon_generic_request *req)
572 {
573         if (req->complete_cb)
574                 req->complete_cb(req);
575         else
576                 complete_all(&req->completion);
577         put_generic_request(req);
578 }
579
580 static void cancel_generic_request(struct ceph_mon_generic_request *req)
581 {
582         struct ceph_mon_client *monc = req->monc;
583         struct ceph_mon_generic_request *lookup_req;
584
585         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
586
587         mutex_lock(&monc->mutex);
588         lookup_req = lookup_generic_request(&monc->generic_request_tree,
589                                             req->tid);
590         if (lookup_req) {
591                 WARN_ON(lookup_req != req);
592                 finish_generic_request(req);
593         }
594
595         mutex_unlock(&monc->mutex);
596 }
597
598 static int wait_generic_request(struct ceph_mon_generic_request *req)
599 {
600         int ret;
601
602         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
603         ret = wait_for_completion_interruptible(&req->completion);
604         if (ret)
605                 cancel_generic_request(req);
606         else
607                 ret = req->result; /* completed */
608
609         return ret;
610 }
611
612 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
613                                          struct ceph_msg_header *hdr,
614                                          int *skip)
615 {
616         struct ceph_mon_client *monc = con->private;
617         struct ceph_mon_generic_request *req;
618         u64 tid = le64_to_cpu(hdr->tid);
619         struct ceph_msg *m;
620
621         mutex_lock(&monc->mutex);
622         req = lookup_generic_request(&monc->generic_request_tree, tid);
623         if (!req) {
624                 dout("get_generic_reply %lld dne\n", tid);
625                 *skip = 1;
626                 m = NULL;
627         } else {
628                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
629                 *skip = 0;
630                 m = ceph_msg_get(req->reply);
631                 /*
632                  * we don't need to track the connection reading into
633                  * this reply because we only have one open connection
634                  * at a time, ever.
635                  */
636         }
637         mutex_unlock(&monc->mutex);
638         return m;
639 }
640
641 /*
642  * statfs
643  */
644 static void handle_statfs_reply(struct ceph_mon_client *monc,
645                                 struct ceph_msg *msg)
646 {
647         struct ceph_mon_generic_request *req;
648         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
649         u64 tid = le64_to_cpu(msg->hdr.tid);
650
651         dout("%s msg %p tid %llu\n", __func__, msg, tid);
652
653         if (msg->front.iov_len != sizeof(*reply))
654                 goto bad;
655
656         mutex_lock(&monc->mutex);
657         req = lookup_generic_request(&monc->generic_request_tree, tid);
658         if (!req) {
659                 mutex_unlock(&monc->mutex);
660                 return;
661         }
662
663         req->result = 0;
664         *req->u.st = reply->st; /* struct */
665         __finish_generic_request(req);
666         mutex_unlock(&monc->mutex);
667
668         complete_generic_request(req);
669         return;
670
671 bad:
672         pr_err("corrupt statfs reply, tid %llu\n", tid);
673         ceph_msg_dump(msg);
674 }
675
676 /*
677  * Do a synchronous statfs().
678  */
679 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
680                         struct ceph_statfs *buf)
681 {
682         struct ceph_mon_generic_request *req;
683         struct ceph_mon_statfs *h;
684         int ret = -ENOMEM;
685
686         req = alloc_generic_request(monc, GFP_NOFS);
687         if (!req)
688                 goto out;
689
690         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
691                                     true);
692         if (!req->request)
693                 goto out;
694
695         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
696         if (!req->reply)
697                 goto out;
698
699         req->u.st = buf;
700         req->request->hdr.version = cpu_to_le16(2);
701
702         mutex_lock(&monc->mutex);
703         register_generic_request(req);
704         /* fill out request */
705         h = req->request->front.iov_base;
706         h->monhdr.have_version = 0;
707         h->monhdr.session_mon = cpu_to_le16(-1);
708         h->monhdr.session_mon_tid = 0;
709         h->fsid = monc->monmap->fsid;
710         h->contains_data_pool = (data_pool != CEPH_NOPOOL);
711         h->data_pool = cpu_to_le64(data_pool);
712         send_generic_request(monc, req);
713         mutex_unlock(&monc->mutex);
714
715         ret = wait_generic_request(req);
716 out:
717         put_generic_request(req);
718         return ret;
719 }
720 EXPORT_SYMBOL(ceph_monc_do_statfs);
721
722 static void handle_get_version_reply(struct ceph_mon_client *monc,
723                                      struct ceph_msg *msg)
724 {
725         struct ceph_mon_generic_request *req;
726         u64 tid = le64_to_cpu(msg->hdr.tid);
727         void *p = msg->front.iov_base;
728         void *end = p + msg->front_alloc_len;
729         u64 handle;
730
731         dout("%s msg %p tid %llu\n", __func__, msg, tid);
732
733         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
734         handle = ceph_decode_64(&p);
735         if (tid != 0 && tid != handle)
736                 goto bad;
737
738         mutex_lock(&monc->mutex);
739         req = lookup_generic_request(&monc->generic_request_tree, handle);
740         if (!req) {
741                 mutex_unlock(&monc->mutex);
742                 return;
743         }
744
745         req->result = 0;
746         req->u.newest = ceph_decode_64(&p);
747         __finish_generic_request(req);
748         mutex_unlock(&monc->mutex);
749
750         complete_generic_request(req);
751         return;
752
753 bad:
754         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
755         ceph_msg_dump(msg);
756 }
757
758 static struct ceph_mon_generic_request *
759 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
760                         ceph_monc_callback_t cb, u64 private_data)
761 {
762         struct ceph_mon_generic_request *req;
763
764         req = alloc_generic_request(monc, GFP_NOIO);
765         if (!req)
766                 goto err_put_req;
767
768         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
769                                     sizeof(u64) + sizeof(u32) + strlen(what),
770                                     GFP_NOIO, true);
771         if (!req->request)
772                 goto err_put_req;
773
774         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
775                                   true);
776         if (!req->reply)
777                 goto err_put_req;
778
779         req->complete_cb = cb;
780         req->private_data = private_data;
781
782         mutex_lock(&monc->mutex);
783         register_generic_request(req);
784         {
785                 void *p = req->request->front.iov_base;
786                 void *const end = p + req->request->front_alloc_len;
787
788                 ceph_encode_64(&p, req->tid); /* handle */
789                 ceph_encode_string(&p, end, what, strlen(what));
790                 WARN_ON(p != end);
791         }
792         send_generic_request(monc, req);
793         mutex_unlock(&monc->mutex);
794
795         return req;
796
797 err_put_req:
798         put_generic_request(req);
799         return ERR_PTR(-ENOMEM);
800 }
801
802 /*
803  * Send MMonGetVersion and wait for the reply.
804  *
805  * @what: one of "mdsmap", "osdmap" or "monmap"
806  */
807 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
808                           u64 *newest)
809 {
810         struct ceph_mon_generic_request *req;
811         int ret;
812
813         req = __ceph_monc_get_version(monc, what, NULL, 0);
814         if (IS_ERR(req))
815                 return PTR_ERR(req);
816
817         ret = wait_generic_request(req);
818         if (!ret)
819                 *newest = req->u.newest;
820
821         put_generic_request(req);
822         return ret;
823 }
824 EXPORT_SYMBOL(ceph_monc_get_version);
825
826 /*
827  * Send MMonGetVersion,
828  *
829  * @what: one of "mdsmap", "osdmap" or "monmap"
830  */
831 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
832                                 ceph_monc_callback_t cb, u64 private_data)
833 {
834         struct ceph_mon_generic_request *req;
835
836         req = __ceph_monc_get_version(monc, what, cb, private_data);
837         if (IS_ERR(req))
838                 return PTR_ERR(req);
839
840         put_generic_request(req);
841         return 0;
842 }
843 EXPORT_SYMBOL(ceph_monc_get_version_async);
844
845 static void handle_command_ack(struct ceph_mon_client *monc,
846                                struct ceph_msg *msg)
847 {
848         struct ceph_mon_generic_request *req;
849         void *p = msg->front.iov_base;
850         void *const end = p + msg->front_alloc_len;
851         u64 tid = le64_to_cpu(msg->hdr.tid);
852
853         dout("%s msg %p tid %llu\n", __func__, msg, tid);
854
855         ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
856                                                             sizeof(u32), bad);
857         p += sizeof(struct ceph_mon_request_header);
858
859         mutex_lock(&monc->mutex);
860         req = lookup_generic_request(&monc->generic_request_tree, tid);
861         if (!req) {
862                 mutex_unlock(&monc->mutex);
863                 return;
864         }
865
866         req->result = ceph_decode_32(&p);
867         __finish_generic_request(req);
868         mutex_unlock(&monc->mutex);
869
870         complete_generic_request(req);
871         return;
872
873 bad:
874         pr_err("corrupt mon_command ack, tid %llu\n", tid);
875         ceph_msg_dump(msg);
876 }
877
878 int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
879                             struct ceph_entity_addr *client_addr)
880 {
881         struct ceph_mon_generic_request *req;
882         struct ceph_mon_command *h;
883         int ret = -ENOMEM;
884         int len;
885
886         req = alloc_generic_request(monc, GFP_NOIO);
887         if (!req)
888                 goto out;
889
890         req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
891         if (!req->request)
892                 goto out;
893
894         req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
895                                   true);
896         if (!req->reply)
897                 goto out;
898
899         mutex_lock(&monc->mutex);
900         register_generic_request(req);
901         h = req->request->front.iov_base;
902         h->monhdr.have_version = 0;
903         h->monhdr.session_mon = cpu_to_le16(-1);
904         h->monhdr.session_mon_tid = 0;
905         h->fsid = monc->monmap->fsid;
906         h->num_strs = cpu_to_le32(1);
907         len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
908                                  \"blacklistop\": \"add\", \
909                                  \"addr\": \"%pISpc/%u\" }",
910                       &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
911         h->str_len = cpu_to_le32(len);
912         send_generic_request(monc, req);
913         mutex_unlock(&monc->mutex);
914
915         ret = wait_generic_request(req);
916 out:
917         put_generic_request(req);
918         return ret;
919 }
920 EXPORT_SYMBOL(ceph_monc_blacklist_add);
921
922 /*
923  * Resend pending generic requests.
924  */
925 static void __resend_generic_request(struct ceph_mon_client *monc)
926 {
927         struct ceph_mon_generic_request *req;
928         struct rb_node *p;
929
930         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
931                 req = rb_entry(p, struct ceph_mon_generic_request, node);
932                 ceph_msg_revoke(req->request);
933                 ceph_msg_revoke_incoming(req->reply);
934                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
935         }
936 }
937
938 /*
939  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
940  * renew/retry subscription as needed (in case it is timing out, or we
941  * got an ENOMEM).  And keep the monitor connection alive.
942  */
943 static void delayed_work(struct work_struct *work)
944 {
945         struct ceph_mon_client *monc =
946                 container_of(work, struct ceph_mon_client, delayed_work.work);
947
948         dout("monc delayed_work\n");
949         mutex_lock(&monc->mutex);
950         if (monc->hunting) {
951                 dout("%s continuing hunt\n", __func__);
952                 reopen_session(monc);
953         } else {
954                 int is_auth = ceph_auth_is_authenticated(monc->auth);
955                 if (ceph_con_keepalive_expired(&monc->con,
956                                                CEPH_MONC_PING_TIMEOUT)) {
957                         dout("monc keepalive timeout\n");
958                         is_auth = 0;
959                         reopen_session(monc);
960                 }
961
962                 if (!monc->hunting) {
963                         ceph_con_keepalive(&monc->con);
964                         __validate_auth(monc);
965                 }
966
967                 if (is_auth &&
968                     !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
969                         unsigned long now = jiffies;
970
971                         dout("%s renew subs? now %lu renew after %lu\n",
972                              __func__, now, monc->sub_renew_after);
973                         if (time_after_eq(now, monc->sub_renew_after))
974                                 __send_subscribe(monc);
975                 }
976         }
977         __schedule_delayed(monc);
978         mutex_unlock(&monc->mutex);
979 }
980
981 /*
982  * On startup, we build a temporary monmap populated with the IPs
983  * provided by mount(2).
984  */
985 static int build_initial_monmap(struct ceph_mon_client *monc)
986 {
987         struct ceph_options *opt = monc->client->options;
988         struct ceph_entity_addr *mon_addr = opt->mon_addr;
989         int num_mon = opt->num_mon;
990         int i;
991
992         /* build initial monmap */
993         monc->monmap = kzalloc(sizeof(*monc->monmap) +
994                                num_mon*sizeof(monc->monmap->mon_inst[0]),
995                                GFP_KERNEL);
996         if (!monc->monmap)
997                 return -ENOMEM;
998         for (i = 0; i < num_mon; i++) {
999                 monc->monmap->mon_inst[i].addr = mon_addr[i];
1000                 monc->monmap->mon_inst[i].addr.nonce = 0;
1001                 monc->monmap->mon_inst[i].name.type =
1002                         CEPH_ENTITY_TYPE_MON;
1003                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1004         }
1005         monc->monmap->num_mon = num_mon;
1006         return 0;
1007 }
1008
1009 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1010 {
1011         int err = 0;
1012
1013         dout("init\n");
1014         memset(monc, 0, sizeof(*monc));
1015         monc->client = cl;
1016         monc->monmap = NULL;
1017         mutex_init(&monc->mutex);
1018
1019         err = build_initial_monmap(monc);
1020         if (err)
1021                 goto out;
1022
1023         /* connection */
1024         /* authentication */
1025         monc->auth = ceph_auth_init(cl->options->name,
1026                                     cl->options->key);
1027         if (IS_ERR(monc->auth)) {
1028                 err = PTR_ERR(monc->auth);
1029                 goto out_monmap;
1030         }
1031         monc->auth->want_keys =
1032                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1033                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1034
1035         /* msgs */
1036         err = -ENOMEM;
1037         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1038                                      sizeof(struct ceph_mon_subscribe_ack),
1039                                      GFP_KERNEL, true);
1040         if (!monc->m_subscribe_ack)
1041                 goto out_auth;
1042
1043         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1044                                          GFP_KERNEL, true);
1045         if (!monc->m_subscribe)
1046                 goto out_subscribe_ack;
1047
1048         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1049                                           GFP_KERNEL, true);
1050         if (!monc->m_auth_reply)
1051                 goto out_subscribe;
1052
1053         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1054         monc->pending_auth = 0;
1055         if (!monc->m_auth)
1056                 goto out_auth_reply;
1057
1058         ceph_con_init(&monc->con, monc, &mon_con_ops,
1059                       &monc->client->msgr);
1060
1061         monc->cur_mon = -1;
1062         monc->had_a_connection = false;
1063         monc->hunt_mult = 1;
1064
1065         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1066         monc->generic_request_tree = RB_ROOT;
1067         monc->last_tid = 0;
1068
1069         monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1070
1071         return 0;
1072
1073 out_auth_reply:
1074         ceph_msg_put(monc->m_auth_reply);
1075 out_subscribe:
1076         ceph_msg_put(monc->m_subscribe);
1077 out_subscribe_ack:
1078         ceph_msg_put(monc->m_subscribe_ack);
1079 out_auth:
1080         ceph_auth_destroy(monc->auth);
1081 out_monmap:
1082         kfree(monc->monmap);
1083 out:
1084         return err;
1085 }
1086 EXPORT_SYMBOL(ceph_monc_init);
1087
1088 void ceph_monc_stop(struct ceph_mon_client *monc)
1089 {
1090         dout("stop\n");
1091         cancel_delayed_work_sync(&monc->delayed_work);
1092
1093         mutex_lock(&monc->mutex);
1094         __close_session(monc);
1095         monc->cur_mon = -1;
1096         mutex_unlock(&monc->mutex);
1097
1098         /*
1099          * flush msgr queue before we destroy ourselves to ensure that:
1100          *  - any work that references our embedded con is finished.
1101          *  - any osd_client or other work that may reference an authorizer
1102          *    finishes before we shut down the auth subsystem.
1103          */
1104         ceph_msgr_flush();
1105
1106         ceph_auth_destroy(monc->auth);
1107
1108         WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1109
1110         ceph_msg_put(monc->m_auth);
1111         ceph_msg_put(monc->m_auth_reply);
1112         ceph_msg_put(monc->m_subscribe);
1113         ceph_msg_put(monc->m_subscribe_ack);
1114
1115         kfree(monc->monmap);
1116 }
1117 EXPORT_SYMBOL(ceph_monc_stop);
1118
1119 static void finish_hunting(struct ceph_mon_client *monc)
1120 {
1121         if (monc->hunting) {
1122                 dout("%s found mon%d\n", __func__, monc->cur_mon);
1123                 monc->hunting = false;
1124                 monc->had_a_connection = true;
1125                 monc->hunt_mult /= 2; /* reduce by 50% */
1126                 if (monc->hunt_mult < 1)
1127                         monc->hunt_mult = 1;
1128         }
1129 }
1130
1131 static void handle_auth_reply(struct ceph_mon_client *monc,
1132                               struct ceph_msg *msg)
1133 {
1134         int ret;
1135         int was_auth = 0;
1136
1137         mutex_lock(&monc->mutex);
1138         was_auth = ceph_auth_is_authenticated(monc->auth);
1139         monc->pending_auth = 0;
1140         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1141                                      msg->front.iov_len,
1142                                      monc->m_auth->front.iov_base,
1143                                      monc->m_auth->front_alloc_len);
1144         if (ret > 0) {
1145                 __send_prepared_auth_request(monc, ret);
1146                 goto out;
1147         }
1148
1149         finish_hunting(monc);
1150
1151         if (ret < 0) {
1152                 monc->client->auth_err = ret;
1153         } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1154                 dout("authenticated, starting session\n");
1155
1156                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1157                 monc->client->msgr.inst.name.num =
1158                                         cpu_to_le64(monc->auth->global_id);
1159
1160                 __send_subscribe(monc);
1161                 __resend_generic_request(monc);
1162
1163                 pr_info("mon%d %s session established\n", monc->cur_mon,
1164                         ceph_pr_addr(&monc->con.peer_addr.in_addr));
1165         }
1166
1167 out:
1168         mutex_unlock(&monc->mutex);
1169         if (monc->client->auth_err < 0)
1170                 wake_up_all(&monc->client->auth_wq);
1171 }
1172
1173 static int __validate_auth(struct ceph_mon_client *monc)
1174 {
1175         int ret;
1176
1177         if (monc->pending_auth)
1178                 return 0;
1179
1180         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1181                               monc->m_auth->front_alloc_len);
1182         if (ret <= 0)
1183                 return ret; /* either an error, or no need to authenticate */
1184         __send_prepared_auth_request(monc, ret);
1185         return 0;
1186 }
1187
1188 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1189 {
1190         int ret;
1191
1192         mutex_lock(&monc->mutex);
1193         ret = __validate_auth(monc);
1194         mutex_unlock(&monc->mutex);
1195         return ret;
1196 }
1197 EXPORT_SYMBOL(ceph_monc_validate_auth);
1198
1199 /*
1200  * handle incoming message
1201  */
1202 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1203 {
1204         struct ceph_mon_client *monc = con->private;
1205         int type = le16_to_cpu(msg->hdr.type);
1206
1207         if (!monc)
1208                 return;
1209
1210         switch (type) {
1211         case CEPH_MSG_AUTH_REPLY:
1212                 handle_auth_reply(monc, msg);
1213                 break;
1214
1215         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1216                 handle_subscribe_ack(monc, msg);
1217                 break;
1218
1219         case CEPH_MSG_STATFS_REPLY:
1220                 handle_statfs_reply(monc, msg);
1221                 break;
1222
1223         case CEPH_MSG_MON_GET_VERSION_REPLY:
1224                 handle_get_version_reply(monc, msg);
1225                 break;
1226
1227         case CEPH_MSG_MON_COMMAND_ACK:
1228                 handle_command_ack(monc, msg);
1229                 break;
1230
1231         case CEPH_MSG_MON_MAP:
1232                 ceph_monc_handle_map(monc, msg);
1233                 break;
1234
1235         case CEPH_MSG_OSD_MAP:
1236                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1237                 break;
1238
1239         default:
1240                 /* can the chained handler handle it? */
1241                 if (monc->client->extra_mon_dispatch &&
1242                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1243                         break;
1244                         
1245                 pr_err("received unknown message type %d %s\n", type,
1246                        ceph_msg_type_name(type));
1247         }
1248         ceph_msg_put(msg);
1249 }
1250
1251 /*
1252  * Allocate memory for incoming message
1253  */
1254 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1255                                       struct ceph_msg_header *hdr,
1256                                       int *skip)
1257 {
1258         struct ceph_mon_client *monc = con->private;
1259         int type = le16_to_cpu(hdr->type);
1260         int front_len = le32_to_cpu(hdr->front_len);
1261         struct ceph_msg *m = NULL;
1262
1263         *skip = 0;
1264
1265         switch (type) {
1266         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1267                 m = ceph_msg_get(monc->m_subscribe_ack);
1268                 break;
1269         case CEPH_MSG_STATFS_REPLY:
1270         case CEPH_MSG_MON_COMMAND_ACK:
1271                 return get_generic_reply(con, hdr, skip);
1272         case CEPH_MSG_AUTH_REPLY:
1273                 m = ceph_msg_get(monc->m_auth_reply);
1274                 break;
1275         case CEPH_MSG_MON_GET_VERSION_REPLY:
1276                 if (le64_to_cpu(hdr->tid) != 0)
1277                         return get_generic_reply(con, hdr, skip);
1278
1279                 /*
1280                  * Older OSDs don't set reply tid even if the orignal
1281                  * request had a non-zero tid.  Workaround this weirdness
1282                  * by falling through to the allocate case.
1283                  */
1284         case CEPH_MSG_MON_MAP:
1285         case CEPH_MSG_MDS_MAP:
1286         case CEPH_MSG_OSD_MAP:
1287         case CEPH_MSG_FS_MAP_USER:
1288                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1289                 if (!m)
1290                         return NULL;    /* ENOMEM--return skip == 0 */
1291                 break;
1292         }
1293
1294         if (!m) {
1295                 pr_info("alloc_msg unknown type %d\n", type);
1296                 *skip = 1;
1297         } else if (front_len > m->front_alloc_len) {
1298                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1299                         front_len, m->front_alloc_len,
1300                         (unsigned int)con->peer_name.type,
1301                         le64_to_cpu(con->peer_name.num));
1302                 ceph_msg_put(m);
1303                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1304         }
1305
1306         return m;
1307 }
1308
1309 /*
1310  * If the monitor connection resets, pick a new monitor and resubmit
1311  * any pending requests.
1312  */
1313 static void mon_fault(struct ceph_connection *con)
1314 {
1315         struct ceph_mon_client *monc = con->private;
1316
1317         mutex_lock(&monc->mutex);
1318         dout("%s mon%d\n", __func__, monc->cur_mon);
1319         if (monc->cur_mon >= 0) {
1320                 if (!monc->hunting) {
1321                         dout("%s hunting for new mon\n", __func__);
1322                         reopen_session(monc);
1323                         __schedule_delayed(monc);
1324                 } else {
1325                         dout("%s already hunting\n", __func__);
1326                 }
1327         }
1328         mutex_unlock(&monc->mutex);
1329 }
1330
1331 /*
1332  * We can ignore refcounting on the connection struct, as all references
1333  * will come from the messenger workqueue, which is drained prior to
1334  * mon_client destruction.
1335  */
1336 static struct ceph_connection *con_get(struct ceph_connection *con)
1337 {
1338         return con;
1339 }
1340
1341 static void con_put(struct ceph_connection *con)
1342 {
1343 }
1344
1345 static const struct ceph_connection_operations mon_con_ops = {
1346         .get = con_get,
1347         .put = con_put,
1348         .dispatch = dispatch,
1349         .fault = mon_fault,
1350         .alloc_msg = mon_alloc_msg,
1351 };